Skip to main content

DBOS Client

DBOSClient provides a programmatic way to interact with your DBOS application from external code. DBOSClient includes methods similar to DBOS that can be used outside of a DBOS application, such as enqueue or getEvent.

note

DBOSClient is included in the dbos package, the same package that used by DBOS applications. Where DBOS applications use the DBOS methods, external applications use DBOSClient instead.

class dbos.DBOSClient

DBOSClient(
database_url: str,
*,
system_database: Optional[str] = None,
)

Parameters:

  • database_url: A connection string to a Postgres database. Please see Configuring DBOS for more info.
  • system_database: The name of your DBOS application's system database. The system database is stored on the same database server as the application database and typically has the same name as your application database, but suffixed with _dbos_sys. If you are using a non-standard system database name in your DBOS application, you must also provide its name when creating a DBOSClient.

Example syntax:

This DBOS client connects to the database specified in the DBOS_DATABASE_URL environment variable.

client = DBOSClient(os.environ["DBOS_DATABASE_URL"])

enqueue

class EnqueueOptions(TypedDict):
workflow_name: str
queue_name: str
workflow_id: NotRequired[str]
app_version: NotRequired[str]

client.enqueue(
options: EnqueueOptions,
*args: Any,
**kwargs: Any
) -> WorkflowHandle[R]

Enqueue a workflow for processing and return a handle to it, similar to Queue.enqueue. Returns a WorkflowHandle.

When enqueuing a workflow from within a DBOS application, the workflow and queue metadata can be retrieved automatically. However, since DBOSClient runs outside the DBOS application, the metadata must be specified explicitly.

Required metadata includes:

  • workflow_name: The name of the workflow method being enqueued.
  • queue_name: The name of the Queue to enqueue the workflow on.

Additional but optional metadata includes:

  • workflow_id: The unique ID for the enqueued workflow. If left undefined, DBOS Client will generate a UUID. Please see Workflow IDs and Idempotency for more information.
  • app_version: The version of your application that should process this workflow. If left undefined, it will be updated to the current version when the workflow is first dequeued. Please see Managing Application Versions for more information.
warning

At this time, DBOS Client cannot enqueue workflows that are methods on Python classes.

Example syntax:

options: EnqueueOptions = {
"queue_name": "process_task",
"workflow_name": "example_queue",
}
handle = client.enqueue(options, task)
result = handle.get_result()

enqueue_async

client.enqueue_async(
options: EnqueueOptions,
*args: Any,
**kwargs: Any
) -> WorkflowHandleAsync[R]

Similar to enqueue, but enqueues asynchronously and returns a WorkflowHandleAsync.

Example syntax:

options: EnqueueOptions = {
"queue_name": "process_task",
"workflow_name": "example_queue",
}
handle = await client.enqueue_async(options, task)
result = await handle.get_result()

retrieve_workflow

client.retrieve_workflow(
workflow_id: str,
) -> WorkflowHandle[R]

Retrieve the handle of a workflow with identity workflow_id. Similar to DBOS.retrieve_workflow.

Parameters:

  • workflow_id: The identifier of the workflow whose handle to retrieve.

Returns:

retrieve_workflow_async

client.retrieve_workflow_async(
workflow_id: str,
) -> WorkflowHandleAsync[R]

Asynchronously retrieve the handle of a workflow with identity workflow_id. Similar to DBOS.retrieve_workflow.

Parameters:

  • workflow_id: The identifier of the workflow whose handle to retrieve.

Returns:

send

client.send(
destination_id: str,
message: Any,
topic: Optional[str] = None,
idempotency_key: Optional[str] = None,
) -> None

Sends a message to a specified workflow. Similar to DBOS.send.

Parameters:

  • destination_id: The workflow to which to send the message.
  • message: The message to send. Must be serializable.
  • topic: An optional topic with which to associate the message. Messages are enqueued per-topic on the receiver.
  • idempotency_key: An optional string used to ensure exactly-once delivery, even from outside of the DBOS application.
warning

Since DBOS Client is running outside of a DBOS application, it is highly recommended that you use the idempotencyKey parameter with both send and send_async in order to get exactly-once behavior.

send_async

client.send_async(
destination_id: str,
message: Any,
topic: Optional[str] = None,
idempotency_key: Optional[str] = None,
) -> None

Asynchronously sends a message to a specified workflow. Similar to DBOS.send_async.

Parameters:

  • destination_id: The workflow to which to send the message.
  • message: The message to send. Must be serializable.
  • topic: An optional topic with which to associate the message. Messages are enqueued per-topic on the receiver.
  • idempotency_key: An optional string used to ensure exactly-once delivery, even from outside of the DBOS application.

get_event

client.get_event(
workflow_id: str,
key: str,
timeout_seconds: float = 60
) -> Any

Retrieve the latest value of an event published by the workflow identified by workflow_id to the key key. If the event does not yet exist, wait for it to be published, returning None if the wait times out. Similar to `DBOS.get_event.

Parameters:

  • workflow_id: The identifier of the workflow whose events to retrieve.
  • key: The key of the event to retrieve.
  • timeout_seconds: A timeout in seconds. If the wait times out, return None.

Returns:

  • The value of the event published by workflow_id with name key, or None if the wait times out.

get_event_async

client.get_event_async(
workflow_id: str,
key: str,
timeout_seconds: float = 60
) -> Any

Asynchronously retrieve the latest value of an event published by the workflow identified by workflow_id to the key key. If the event does not yet exist, wait for it to be published, returning None if the wait times out. Similar to `DBOS.get_event_async.

Parameters:

  • workflow_id: The identifier of the workflow whose events to retrieve.
  • key: The key of the event to retrieve.
  • timeout_seconds: A timeout in seconds. If the wait times out, return None.

Returns:

  • The value of the event published by workflow_id with name key, or None if the wait times out.