Queues & Parallelism
Queues allow you to run functions with managed concurrency. They are useful for controlling the number of functions run in parallel, or the rate at which functions are started.
To create a queue, specify its name:
from dbos import Queue
queue = Queue("example_queue")
You can then enqueue any DBOS workflow, step, or transaction. Enqueuing a function submits it for execution and returns a handle to it. Queued tasks are started in first-in, first-out (FIFO) order.
queue = Queue("example_queue")
@DBOS.step()
def process_task(task):
...
task = ...
handle = queue.enqueue(process_task, task)
Queue Example
Here's an example of a workflow using a queue to process tasks concurrently:
from dbos import DBOS, Queue
queue = Queue("example_queue")
@DBOS.step()
def process_task(task):
...
@DBOS.workflow()
def process_tasks(tasks):
task_handles = []
# Enqueue each task so all tasks are processed concurrently.
for task in tasks:
handle = queue.enqueue(process_task, task)
task_handles.append(handle)
# Wait for each task to complete and retrieve its result.
# Return the results of all tasks.
return [handle.get_result() for handle in task_handles]
Enqueue with DBOSClient
DBOSClient
provides a way to programmatically interact with your DBOS application from external code.
Among other things, this allows you to enqueue workflows from outside your DBOS application by connecting to Postgres directly.
Since DBOSClient
is designed to be used from outside your DBOS application, workflow and queue metadata must be specified explicitly.
Example:
from dbos import DBOSClient, EnqueueOptions
client = DBOSClient(os.environ["DBOS_DATABASE_URL"])
options: EnqueueOptions = {
"queue_name": "process_task",
"workflow_name": "example_queue",
}
handle = client.enqueue(options, task)
result = handle.get_result()
Reliability Guarantees
Because queues use DBOS workflows, they provide the following reliability guarantees for enqueued functions. These guarantees assume that the application and database may crash and go offline at any point in time, but are always restarted and return online.
- Enqueued functions always run to completion. If a DBOS process crashes and is restarted at any point after a function is enqueued, it resumes the enqueued function from the last completed step.
- Enqueued steps (or steps called from enqueued workflows) are tried at least once but are never re-executed after they complete. If a failure occurs inside a step, the step may be retried, but once a step has completed, it will never be re-executed.
- Enqueued transactions (or transactions called from enqueued workflows) commit exactly once.
Managing Concurrency
You can specify the concurrency of a queue, the maximum number of functions from this queue that may run concurrently, at two scopes: global and per process. Global concurrency limits are applied across all DBOS processes using this queue. Per process concurrency limits are applied to each DBOS process using this queue. If no limit is provided, any number of functions may run concurrently. For example, this queue has a maximum global concurrency of 10 and a per process maximum concurrency of 5, so at most 10 functions submitted to it may run at once, up to 5 per process:
from dbos import Queue
queue = Queue("example_queue", concurrency=10, worker_concurrency=5)
You may want to specify a maximum concurrency if functions in your queue submit work to an external process with limited resources. The concurrency limit guarantees that even if many functions are submitted at once, they won't overwhelm the process.
Rate Limiting
You can set rate limits for a queue, limiting the number of functions that it can start in a given period. Rate limits are global across all DBOS processes using this queue. For example, this queue has a limit of 50 with a period of 30 seconds, so it may not start more than 50 functions in 30 seconds:
queue = Queue("example_queue", limiter={"limit": 50, "period": 30})
Rate limits are especially useful when working with a rate-limited API, such as many LLM APIs.
In-Order Processing
You can use a queue with concurrency=1
to guarantee sequential, in-order processing of events.
Only a single event will be processed at a time.
For example, this app processes events sequentially in the order of their arrival:
from fastapi import FastAPI
from dbos import DBOS, Queue
app = FastAPI()
DBOS(fastapi=app)
queue = Queue("in_order_queue", concurrency=1)
@DBOS.step()
def process_event(event: str):
...
@app.post("/events/{event}")
def event_endpoint(event: str):
queue.enqueue(process_event, event)
Setting Timeouts
You can set a timeout for an enqueued workflow with SetWorkflowTimeout
.
When the timeout expires, the workflow and all its children are cancelled.
Cancelling a workflow sets its status to CANCELLED
and preempts its execution at the beginning of its next step.
Timeouts are start-to-completion: a workflow's timeout does not begin until the workflow is dequeued and starts execution. Also, timeouts are durable: they are stored in the database and persist across restarts, so workflows can have very long timeouts.
Example syntax:
@DBOS.workflow()
def example_workflow():
...
queue = Queue("example-queue")
# If the workflow does not complete within 10 seconds after being dequeued, it times out and is cancelled
with SetWorkflowTimeout(10):
queue.enqueue(example_workflow)
Deduplication
You can set a deduplication ID for an enqueued workflow with SetEnqueueOptions
or DBOSClient
.
At any given time, only one workflow with a specific deduplication ID can be enqueued in the specified queue.
If a workflow with a deduplication ID is currently enqueued or actively executing (status ENQUEUED
or PENDING
), subsequent workflow enqueue attempt with the same deduplication ID in the same queue will raise a DBOSQueueDeduplicatedError
exception.
This is useful for long-running workflows that should only be triggered one at a time, such as critical updates that must not be duplicated if a workflow is already in progress.
Example syntax:
with SetEnqueueOptions(deduplication_id="my_dedup_id"):
try:
handle = queue.enqueue(example_workflow, ...)
except DBOSQueueDeduplicatedError as e:
# Handle deduplication error
Priority
You can set a priority for an enqueued workflow with SetEnqueueOptions
or DBOSClient
.
Workflows with the same priority are dequeued in FIFO (first in, first out) order. Priority values can range from 1
to 2,147,483,647
, where a low number indicates a higher priority.
Workflows without assigned priorities have the highest priority and are dequeued before workflows with assigned priorities.
Example syntax:
with SetEnqueueOptions(priority=10):
# All workflows are enqueued with priority set to 10
# They will be dequeued in FIFO order
for task in tasks:
queue.enqueue(task_workflow, task)
with SetEnqueueOptions(priority=1):
queue.enqueue(first_workflow)
# first_workflow (priority=1) will be dequeued before all task_workflows (priority=10)