DBOS Client
DBOSClient
provides a programmatic way to interact with your DBOS application from external code.
DBOSClient
includes methods similar to DBOS
that can be used outside of a DBOS application,
such as enqueue
or getEvent
.
DBOSClient
is included in the dbos
package, the same package that used by DBOS applications.
Where DBOS applications use the DBOS
methods,
external applications use DBOSClient
instead.
class dbos.DBOSClient
DBOSClient(
database_url: str,
*,
system_database: Optional[str] = None,
)
Parameters:
database_url
: A connection string to a Postgres database. Please see Configuring DBOS for more info.system_database
: The name of your DBOS application's system database. The system database is stored on the same database server as the application database and typically has the same name as your application database, but suffixed with_dbos_sys
. If you are using a non-standard system database name in your DBOS application, you must also provide its name when creating aDBOSClient
.
Example syntax:
This DBOS client connects to the database specified in the DBOS_DATABASE_URL
environment variable.
client = DBOSClient(os.environ["DBOS_DATABASE_URL"])
enqueue
class EnqueueOptions(TypedDict):
workflow_name: str
queue_name: str
workflow_id: NotRequired[str]
app_version: NotRequired[str]
client.enqueue(
options: EnqueueOptions,
*args: Any,
**kwargs: Any
) -> WorkflowHandle[R]
Enqueue a workflow for processing and return a handle to it, similar to Queue.enqueue. Returns a WorkflowHandle.
When enqueuing a workflow from within a DBOS application, the workflow and queue metadata can be retrieved automatically.
However, since DBOSClient
runs outside the DBOS application, the metadata must be specified explicitly.
Required metadata includes:
workflow_name
: The name of the workflow method being enqueued.queue_name
: The name of the Queue to enqueue the workflow on.
Additional but optional metadata includes:
workflow_id
: The unique ID for the enqueued workflow. If left undefined, DBOS Client will generate a UUID. Please see Workflow IDs and Idempotency for more information.app_version
: The version of your application that should process this workflow. If left undefined, it will be updated to the current version when the workflow is first dequeued. Please see Managing Application Versions for more information.
At this time, DBOS Client cannot enqueue workflows that are methods on Python classes.
Example syntax:
options: EnqueueOptions = {
"queue_name": "process_task",
"workflow_name": "example_queue",
}
handle = client.enqueue(options, task)
result = handle.get_result()
enqueue_async
client.enqueue_async(
options: EnqueueOptions,
*args: Any,
**kwargs: Any
) -> WorkflowHandleAsync[R]
Similar to enqueue, but enqueues asynchronously and returns a WorkflowHandleAsync.
Example syntax:
options: EnqueueOptions = {
"queue_name": "process_task",
"workflow_name": "example_queue",
}
handle = await client.enqueue_async(options, task)
result = await handle.get_result()
retrieve_workflow
client.retrieve_workflow(
workflow_id: str,
) -> WorkflowHandle[R]
Retrieve the handle of a workflow with identity workflow_id
.
Similar to DBOS.retrieve_workflow
.
Parameters:
workflow_id
: The identifier of the workflow whose handle to retrieve.
Returns:
- The WorkflowHandle of the workflow whose ID is
workflow_id
.
retrieve_workflow_async
client.retrieve_workflow_async(
workflow_id: str,
) -> WorkflowHandleAsync[R]
Asynchronously retrieve the handle of a workflow with identity workflow_id
.
Similar to DBOS.retrieve_workflow
.
Parameters:
workflow_id
: The identifier of the workflow whose handle to retrieve.
Returns:
- The WorkflowHandleAsync of the workflow whose ID is
workflow_id
.
send
client.send(
destination_id: str,
message: Any,
topic: Optional[str] = None,
idempotency_key: Optional[str] = None,
) -> None
Sends a message to a specified workflow. Similar to DBOS.send
.
Parameters:
destination_id
: The workflow to which to send the message.message
: The message to send. Must be serializable.topic
: An optional topic with which to associate the message. Messages are enqueued per-topic on the receiver.idempotency_key
: An optional string used to ensure exactly-once delivery, even from outside of the DBOS application.
Since DBOS Client is running outside of a DBOS application,
it is highly recommended that you use the idempotencyKey
parameter with both send
and send_async
in order to get exactly-once behavior.
send_async
client.send_async(
destination_id: str,
message: Any,
topic: Optional[str] = None,
idempotency_key: Optional[str] = None,
) -> None
Asynchronously sends a message to a specified workflow. Similar to DBOS.send_async
.
Parameters:
destination_id
: The workflow to which to send the message.message
: The message to send. Must be serializable.topic
: An optional topic with which to associate the message. Messages are enqueued per-topic on the receiver.idempotency_key
: An optional string used to ensure exactly-once delivery, even from outside of the DBOS application.
get_event
client.get_event(
workflow_id: str,
key: str,
timeout_seconds: float = 60
) -> Any
Retrieve the latest value of an event published by the workflow identified by workflow_id
to the key key
.
If the event does not yet exist, wait for it to be published, returning None
if the wait times out.
Similar to `DBOS.get_event.
Parameters:
workflow_id
: The identifier of the workflow whose events to retrieve.key
: The key of the event to retrieve.timeout_seconds
: A timeout in seconds. If the wait times out, returnNone
.
Returns:
- The value of the event published by
workflow_id
with namekey
, orNone
if the wait times out.
get_event_async
client.get_event_async(
workflow_id: str,
key: str,
timeout_seconds: float = 60
) -> Any
Asynchronously retrieve the latest value of an event published by the workflow identified by workflow_id
to the key key
.
If the event does not yet exist, wait for it to be published, returning None
if the wait times out.
Similar to `DBOS.get_event_async.
Parameters:
workflow_id
: The identifier of the workflow whose events to retrieve.key
: The key of the event to retrieve.timeout_seconds
: A timeout in seconds. If the wait times out, returnNone
.
Returns:
- The value of the event published by
workflow_id
with namekey
, orNone
if the wait times out.