DBOS Methods & Variables
DBOS provides a number of useful context methods and variables.
All are accessed through the syntax DBOS.<method>
and can only be used once a DBOS class object has been initialized.
Context Methods
send
DBOS.send(
destination_id: str,
message: Any,
topic: Optional[str] = None
) -> None
Send a message to the workflow identified by destination_id
.
Messages can optionally be associated with a topic.
The send
function should not be used in coroutine workflows, send_async
should be used instead.
Parameters:
destination_id
: The workflow to which to send the message.message
: The message to send. Must be serializable.topic
: A topic with which to associate the message. Messages are enqueued per-topic on the receiver.
send_async
DBOS.send_async(
destination_id: str,
message: Any,
topic: Optional[str] = None
) -> Coroutine[Any, Any, None]
Coroutine version of send
recv
DBOS.recv(
topic: Optional[str] = None,
timeout_seconds: float = 60,
) -> Any
Receive and return a message sent to this workflow.
Can only be called from within a workflow.
Messages are dequeued first-in, first-out from a queue associated with the topic.
Calls to recv
wait for the next message in the queue, returning None
if the wait times out.
If no topic is specified, recv
can only access messages sent without a topic.
The recv
function should not be used in coroutine workflows, recv_async
should be used instead.
Parameters:
topic
: A topic queue on which to wait.timeout_seconds
: A timeout in seconds. If the wait times out, returnNone
.
Returns:
- The first message enqueued on the input topic, or
None
if the wait times out.
recv_async
DBOS.recv_async(
topic: Optional[str] = None,
timeout_seconds: float = 60,
) -> Coroutine[Any, Any, Any]
Coroutine version of recv
set_event
DBOS.set_event(
key: str,
value: Any,
) -> None
Create and associate with this workflow an event with key key
and value value
.
If the event already exists, update its value.
Can only be called from within a workflow.
The set_event
function should not be used in coroutine workflows, set_event_async
should be used instead.
Parameters:
key
: The key of the event.value
: The value of the event. Must be serializable.
set_event_async
DBOS.set_event(
key: str,
value: Any,
) -> Coroutine[Any, Any, None]
Coroutine version of set_event
get_event
DBOS.get_event(
workflow_id: str,
key: str,
timeout_seconds: float = 60,
) -> Any
Retrieve the latest value of an event published by the workflow identified by workflow_id
to the key key
.
If the event does not yet exist, wait for it to be published, returning None
if the wait times out.
The get_event
function should not be used in coroutine workflows, get_event_async
should be used instead.
Parameters:
workflow_id
: The identifier of the workflow whose events to retrieve.key
: The key of the event to retrieve.timeout_seconds
: A timeout in seconds. If the wait times out, returnNone
.
Returns:
- The value of the event published by
workflow_id
with namekey
, orNone
if the wait times out.
get_event_async
DBOS.get_event_async(
workflow_id: str,
key: str,
timeout_seconds: float = 60,
) -> Coroutine[Any, Any, Any]
Coroutine version of get_event
sleep
DBOS.sleep(
seconds: float
) -> None
Sleep for the given number of seconds.
May only be called from within a workflow.
This sleep is durable—it records its intended wake-up time in the database so if it is interrupted and recovers, it still wakes up at the intended time.
The sleep
function should not be used in coroutine workflows, sleep_async
should be used instead.
Parameters:
seconds
: The number of seconds to sleep.
sleep_async
DBOS.sleep_async(
seconds: float
) -> Coroutine[Any, Any, None]
Coroutine version of sleep
retrieve_workflow
DBOS.retrieve_workflow(
workflow_id: str,
existing_workflow: bool = True,
) -> WorkflowHandle[R]
Retrieve the handle of a workflow with identity workflow_id
.
Parameters:
workflow_id
: The identifier of the workflow whose handle to retrieve.existing_workflow
: Whether to throw an exception if the workflow does not yet exist, or to wait for its creation. If set toFalse
and the workflow does not exist, will wait for the workflow to be created, then return its handle.
Returns:
- The handle of the workflow whose ID is
workflow_id
.
retrieve_workflow_async
DBOS.retrieve_workflow(
workflow_id: str,
existing_workflow: bool = True,
) -> WorkflowHandleAsync[R]
Coroutine version of DBOS.retrieve_workflow
, retrieving an async workflow handle.
start_workflow
DBOS.start_workflow(
func: Workflow[P, R],
*args: P.args,
**kwargs: P.kwargs,
) -> WorkflowHandle[R]
Start a workflow in the background and return a handle to it.
The DBOS.start_workflow
method resolves after the handle is durably created; at this point the workflow is guaranteed to run to completion even if the app is interrupted.
Example syntax:
@DBOS.workflow()
def example_workflow(var1: str, var2: str):
DBOS.logger.info("I am a workflow")
# Start example_workflow in the background
handle: WorkflowHandle = DBOS.start_workflow(example_workflow, "var1", "var2")
start_workflow_async
DBOS.start_workflow_async(
func: Workflow[P, Coroutine[Any, Any, R]],
*args: P.args,
**kwargs: P.kwargs,
) -> Coroutine[Any, Any, WorkflowHandleAsync[R]]
Start an asynchronous workflow in the background and return a handle to it.
The DBOS.start_workflow_async
method resolves after the handle is durably created; at this point the workflow is guaranteed to run to completion even if the app is interrupted.
Example syntax:
@DBOS.workflow()
async def example_workflow(var1: str, var2: str):
DBOS.logger.info("I am a workflow")
# Start example_workflow in the background
handle: WorkflowHandleAsync = await DBOS.start_workflow_async(example_workflow, "var1", "var2")
Workflow Management Methods
list_workflows
def list_workflows(
*,
workflow_ids: Optional[List[str]] = None,
status: Optional[str] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
name: Optional[str] = None,
app_version: Optional[str] = None,
user: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
sort_desc: bool = False,
workflow_id_prefix: Optional[str] = None,
) -> List[WorkflowStatus]:
Retrieve a list of WorkflowStatus
of all workflows matching specified criteria.
Parameters:
- workflow_ids: Retrieve workflows with these IDs.
- workflow_id_prefix: Retrieve workflows whose IDs start with the specified string.
- status: Retrieve workflows with this status (Must be
ENQUEUED
,PENDING
,SUCCESS
,ERROR
,CANCELLED
, orRETRIES_EXCEEDED
) - start_time: Retrieve workflows started after this (RFC 3339-compliant) timestamp.
- end_time: Retrieve workflows started before this (RFC 3339-compliant) timestamp.
- name: Retrieve workflows with this fully-qualified name.
- app_version: Retrieve workflows tagged with this application version.
- user: Retrieve workflows run by this authenticated user.
- limit: Retrieve up to this many workflows.
- offset: Skip this many workflows from the results returned (for pagination).
- sort_desc: Whether to sort the results in descending (
True
) or ascending (False
) order by workflow start time.
list_queued_workflows
def list_queued_workflows(
*,
queue_name: Optional[str] = None,
status: Optional[str] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
name: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
sort_desc: bool = False,
) -> List[WorkflowStatus]:
Retrieve a list of WorkflowStatus
of all currently enqueued workflows matching specified criteria.
Parameters:
- queue_name: Retrieve workflows running on this queue.
- status: Retrieve workflows with this status (Must be
ENQUEUED
orPENDING
) - start_time: Retrieve workflows enqueued after this (RFC 3339-compliant) timestamp.
- end_time: Retrieve workflows enqueued before this (RFC 3339-compliant) timestamp.
- name: Retrieve workflows with this fully-qualified name.
- limit: Retrieve up to this many workflows.
- offset: Skip this many workflows from the results returned (for pagination).
list_workflow_steps
def list_workflow_steps(
workflow_id: str,
) -> List[StepInfo]
Retrieve the steps of a workflow.
This is a list of StepInfo
objects, with the following structure:
class StepInfo(TypedDict):
# The unique ID of the step in the workflow. One-indexed.
function_id: int
# The (fully qualified) name of the step
function_name: str
# The step's output, if any
output: Optional[Any]
# The error the step threw, if any
error: Optional[Exception]
# If the step starts or retrieves the result of a workflow, its ID
child_workflow_id: Optional[str]
cancel_workflow
DBOS.cancel_workflow(
workflow_id: str,
) -> None
Cancel a workflow.
This sets is status to CANCELLED
, removes it from its queue (if it is enqueued) and preempts its execution (interrupting it at the beginning of its next step)
resume_workflow
DBOS.resume_workflow(
workflow_id: str
) -> WorkflowHandle[R]
Resume a workflow. This immediately starts it from its last completed step. You can use this to resume workflows that are cancelled or have exceeded their maximum recovery attempts. You can also use this to start an enqueued workflow immediately, bypassing its queue.
fork_workflow
DBOS.fork_workflow(
workflow_id: str,
start_step: int,
*,
application_version: Optional[str] = None,
) -> WorkflowHandle[R]
Start a new execution of a workflow from a specific step.
The input step ID must match the function_id
of the step returned by list_workflow_steps
.
The specified start_step
is the step from which the new workflow will start, so any steps whose ID is less than start_step
will not be re-executed.
The forked workflow will have a new workflow ID, which can be set with SetWorkflowID
.
It is possible to specify the application version on which the forked workflow will run by setting application_version
, this is useful for "patching" workflows that failed due to a bug in a previous application version.
Workflow Status
Some workflow introspection and management methods return a WorkflowStatus
.
This object has the following definition:
class WorkflowStatus:
# The workflow ID
workflow_id: str
# The workflow status. Must be one of ENQUEUED, PENDING, SUCCESS, ERROR, CANCELLED, or RETRIES_EXCEEDED
status: str
# The name of the workflow function
name: str
# The number of times the workflow was scheduled to run
recovery_attempts: int
# The name of the workflow's class, if any
class_name: Optional[str]
# The name with which the workflow's class instance was configured, if any
config_name: Optional[str]
# The user who ran the workflow, if specified
authenticated_user: Optional[str]
# The role with which the workflow ran, if specified
assumed_role: Optional[str]
# All roles which the authenticated user could assume
authenticated_roles: Optional[list[str]]
# The deserialized workflow input object
input: Optional[WorkflowInputs]
# The workflow's output, if any
output: Optional[Any]
# The error the workflow threw, if any
error: Optional[Exception]
# Workflow start time, as a Unix epoch timestamp in ms
created_at: Optional[int]
# Last time the workflow status was updated, as a Unix epoch timestamp in ms
updated_at: Optional[int]
# If this workflow was enqueued, on which queue
queue_name: Optional[str]
# The executor to most recently executed this workflow (DBOS__VMID if set else "local")
executor_id: Optional[str]
# The application version on which this workflow was started
app_version: Optional[str]
# The ID of the application (DBOS__APPID if set)
app_id: Optional[str]
Context Variables
logger
DBOS.logger: Logger
Retrieve the DBOS logger. This is a pre-configured Python logger provided as a convenience.
sql_session
DBOS.sql_session: sqlalchemy.Session
May only be accessed from within a transaction. Retrieves the SQLAlchemy session of the transaction, a database connection the transaction can use to interact with the database.
DBOS automatically wraps your transaction functions in a SQLAlchemy "begin once" block. Transaction functions automatically commit when they successfully complete and roll back if they throw an exception. Therefore, do not use DBOS.sql_session.commit()
or DBOS.sql_session.rollback()
in your transaction functions. Otherwise, you might see a sqlalchemy.exc.InvalidRequestError: Can't operate on closed transaction inside context manager
error.
workflow_id
DBOS.workflow_id: str
May only be accessed from within a workflow, step, or transaction. Return the identity of the current workflow.
step_id
DBOS.step_id: int
Returns the unique ID of the current step within a workflow.
step_status
DBOS.step_status: StepStatus
Return the status of the currently executing step. This object has the following properties:
class StepStatus:
# The unique ID of this step in its workflow.
step_id: int
# For steps with automatic retries, which attempt number (zero-indexed) is currently executing.
current_attempt: Optional[int]
# For steps with automatic retries, the maximum number of attempts that will be made before the step fails.
max_attempts: Optional[int]
span
DBOS.span: opentelemetry.trace.Span
Retrieve the OpenTelemetry span associated with the curent request. You can use this to set custom attributes in your span.
Authentication
authenticated_user
DBOS.authenticated_user: Optional[str]
Return the current authenticated user, if any, associated with the current context.
authenticated_roles
DBOS.authenticated_roles: Optional[List[str]]
Return the roles granted to the current authenticated user, if any, associated with the current context.
assumed_role
DBOS.assumed_role: Optional[str]
Return the role currently assumed by the authenticated user, if any, associated with the current context.
set_authentication
DBOS.set_authentication(
authenticated_user: Optional[str],
authenticated_roles: Optional[List[str]]
) -> None
Set the current authenticated user and granted roles into the current context. This would generally be done by HTTP middleware
Context Management
SetWorkflowID
SetWorkflowID(
wfid: str
)
Set the workflow ID of the next workflow to run.
Should be used in a with
statement.
Example syntax:
@DBOS.workflow()
def example_workflow():
DBOS.logger.info(f"I am a workflow with ID {DBOS.workflow_id}")
# The workflow will run with the supplied ID
with SetWorkflowID("very-unique-id"):
example_workflow()
SetWorkflowTimeout
SetWorkflowTimeout(
workflow_timeout_sec: Optional[float]
)
Set a timeout for all enclosed workflow invocations or enqueues.
When the timeout expires, the workflow and all its children are cancelled.
Cancelling a workflow sets its status to CANCELLED
and preempts its execution at the beginning of its next step.
Timeouts are start-to-completion: if a workflow is enqueued, the timeout does not begin until the workflow is dequeued and starts execution. Also, timeouts are durable: they are stored in the database and persist across restarts, so workflows can have very long timeouts.
Timeout deadlines are propagated to child workflows by default, so when a workflow's deadline expires all of its child workflows (and their children, and so on) are also cancelled.
If you want to detach a child workflow from its parent's timeout, you can start it with SetWorkflowTimeout(custom_timeout)
to override the propagated timeout.
You can use SetWorkflowTimeout(None)
to start a child workflow with no timeout.
Example syntax:
@DBOS.workflow()
def example_workflow():
...
# If the workflow does not complete within 10 seconds, it times out and is cancelled
with SetWorkflowTimeout(10):
example_workflow()
SetEnqueueOptions
SetEnqueueOptions(
*,
deduplication_id: Optional[str] = None,
priority: Optional[int] = None,
)
Set options for enclosed workflow enqueue operations. These options are not propagated to child workflows.
Parameters:
deduplication_id
: At any given time, only one workflow with a specific deduplication ID can be enqueued in the specified queue. If a workflow with a deduplication ID is currently enqueued or actively executing (statusENQUEUED
orPENDING
), subsequent workflow enqueue attempt with the same deduplication ID in the same queue will raise aDBOSQueueDeduplicatedError
exception. Defaults toNone
.priority
: The priority of the enqueued workflow in the specified queue. Workflows with the same priority are dequeued in FIFO (first in, first out) order. Priority values can range from1
to2,147,483,647
, where a low number indicates a higher priority. Defaults toNone
. Workflows without assigned priorities have the highest priority and are dequeued before workflows with assigned priorities.
Deduplication Example
with SetEnqueueOptions(deduplication_id="my_dedup_id"):
try:
handle = queue.enqueue(example_workflow, ...)
except DBOSQueueDeduplicatedError as e:
# Handle deduplication error
Priority Example
with SetEnqueueOptions(priority=10):
# All workflows are enqueued with priority set to 10
# They will be dequeued in FIFO order
for task in tasks:
queue.enqueue(task_workflow, task)
with SetEnqueueOptions(priority=1):
queue.enqueue(first_workflow)
# first_workflow (priority=1) will be dequeued before all task_workflows (priority=10)
DBOSContextEnsure
DBOSContextEnsure()
# Code inside will run with a DBOS context available
with DBOSContextEnsure():
# Call DBOS functions
pass
Use of DBOSContextEnsure
ensures that there is a DBOS context associated with the enclosed code prior to calling DBOS functions. DBOSContextEnsure
is generally not used by applications directly, but used by event dispatchers, HTTP server middleware, etc., to set up the DBOS context prior to entry into function calls.
DBOSContextSetAuth
DBOSContextSetAuth(user: Optional[str], roles: Optional[List[str]])
# Code inside will run with `curuser` and `curroles`
with DBOSContextSetAuth(curuser, curroles):
# Call DBOS functions
pass
with DBOSContextSetAuth
sets the current authorized user and roles for the code inside the with
block. Similar to DBOSContextEnsure
, DBOSContextSetAuth
also ensures that there is a DBOS context associated with the enclosed code prior to calling DBOS functions.
DBOSContextSetAuth
is generally not used by applications directly, but used by event dispatchers, HTTP server middleware, etc., to set up the DBOS context prior to entry into function calls.