Skip to main content

LlamaIndex Durable Workflows with DBOS

LlamaIndex Workflows is a Python framework for orchestrating AI agents by composing steps and events into structured workflows. This guide explains how to build durable LlamaIndex agents using the DBOS runtime, enabling fault-tolerant, persistent AI workflows that can safely recover from crashes or restarts.

By integrating DBOS with LlamaIndex Workflows through the llama-agents-dbos package, every workflow transition is automatically persisted. This allows long-running AI workflows to resume exactly where they left off, without requiring manual checkpointing or snapshot logic.

This is especially useful for:

  • AI agents with long-running tasks
  • Multi-step LlamaIndex workflows
  • LLM pipelines that must survive failures
  • Production AI systems that require reliability
info

Also check out the integration guide in the LlamaIndex docs!

Installation

To get started, install the llama-agents-dbos package.

pip install llama-agents-dbos

Quick Start: Standalone Durable Workflow

The example below defines a simple workflow that counts from 0 to 20. DBOS persists each transition so the workflow can safely resume after a crash or restart.

import asyncio

from dbos import DBOS, DBOSConfig
from llama_agents.dbos import DBOSRuntime
from pydantic import Field
from workflows import Context, Workflow, step
from workflows.events import Event, StartEvent, StopEvent


# 1. Configure DBOS — SQLite by default
config: DBOSConfig = {
"name": "llamaindex-counter-example",
"system_database_url": "sqlite:///counter_example.sqlite",
"run_admin_server": False
}
DBOS(config=config)


# 2. Define events and workflow (nothing DBOS-specific here)
class Tick(Event):
count: int = Field(description="Current count")


class CounterResult(StopEvent):
final_count: int = Field(description="Final counter value")


class CounterWorkflow(Workflow):
@step
async def start(self, ctx: Context, ev: StartEvent) -> Tick:
await ctx.store.set("count", 0)
print("[Start] Initializing counter to 0")
return Tick(count=0)

@step
async def increment(self, ctx: Context, ev: Tick) -> Tick | CounterResult:
count = ev.count + 1
await ctx.store.set("count", count)
print(f"[Tick {count:2d}] count = {count}")

if count >= 20:
return CounterResult(final_count=count)

await asyncio.sleep(0.5)
return Tick(count=count)


# 3. Create runtime, attach to workflow, and launch
runtime = DBOSRuntime()
workflow = CounterWorkflow(runtime=runtime)


async def main() -> None:
await runtime.launch()
result = await workflow.run(run_id="counter-run-1")
print(f"Result: final_count = {result.final_count}")


asyncio.run(main())

If you kill the process mid-run (e.g. Ctrl+C at tick 8), calling workflow.run(run_id="counter-run-1") again will resume from tick 8 instead of restarting from zero.

Notes:

  1. Workflows must be defined before runtime.launch().
  2. The run_id parameter uniquely identifies a workflow run, which is equivalent to DBOS workflow ID.
  3. This example uses SQLite for ease of getting started. Postgres is recommended for production.

Durable Workflow Server

DBOSRuntime integrates with WorkflowServer to serve workflows over HTTP with durable execution out of the box. Pass runtime.create_workflow_store() as the persistence backend and runtime.build_server_runtime() as the execution engine:

import asyncio

from dbos import DBOS, DBOSConfig
from llama_agents.dbos import DBOSRuntime
from llama_agents.server import WorkflowServer
from pydantic import Field
from workflows import Context, Workflow, step
from workflows.events import Event, StartEvent, StopEvent

config: DBOSConfig = {
"name": "llamaindex-server",
"system_database_url": "sqlite:///server_example.sqlite",
"run_admin_server": False
}
DBOS(config=config)

class Tick(Event):
count: int = Field(description="Current count")


class CounterResult(StopEvent):
final_count: int = Field(description="Final counter value")


class CounterWorkflow(Workflow):
"""Counts to 5, emitting stream events along the way."""

@step
async def start(self, ctx: Context, ev: StartEvent) -> Tick:
return Tick(count=0)

@step
async def tick(self, ctx: Context, ev: Tick) -> Tick | CounterResult:
count = ev.count + 1
ctx.write_event_to_stream(Tick(count=count))
print(f" tick {count}")
await asyncio.sleep(0.5)
if count >= 5:
return CounterResult(final_count=count)
return Tick(count=count)


async def main() -> None:
runtime = DBOSRuntime()

server = WorkflowServer(
workflow_store=runtime.create_workflow_store(),
runtime=runtime.build_server_runtime(),
)
server.add_workflow("counter", CounterWorkflow(runtime=runtime))

print("Serving on http://localhost:8000")
print("Try: curl -X POST http://localhost:8000/workflows/counter/run")
await server.start()
try:
await server.serve(host="0.0.0.0", port=8000)
finally:
await server.stop()


asyncio.run(main())

The LlamaIndex workflow debugger UI at http://localhost:8000/ works exactly the same as with the default runtime: DBOS is transparent to the LlamaIndex workflow server layer.

Learn More

For more details on building agents and workflows with LlamaIndex, see the LlamaAgents documentation.

For information about durable execution and workflow design, see the DBOS programming guide.

Together, these resources cover everything from getting started with simple agents to designing production-ready, fault-tolerant applications.