Skip to main content

Steps

When using DBOS workflows, we recommend annotating any function that performs complex operations or accesses external APIs or services as a step.

You can turn any Python function into a step by annotating it with the @DBOS.step decorator. The only requirement is that its inputs and outputs should be serializable (pickle-able). Here's a simple example:

@DBOS.step()
def example_step():
return requests.get("https://example.com").text

You should make a function a step if you're using it in a DBOS workflow and it accesses an external API or service, like serving a file from AWS S3, calling an external API like Stripe, or accessing an external data store like Elasticsearch.

Making a function a step has two benefits:

  1. If a workflow is interrupted, upon restart it automatically resumes execution from the last completed step. Therefore, making a function a step guarantees that a workflow will never re-execute it once it completes.

  2. DBOS provides configurable automatic retries for steps to more easily handle transient errors.

Configurable Retries

You can optionally configure a step to automatically retry any exception a set number of times with exponential backoff. Retries are configurable through arguments to the step decorator:

DBOS.step(
retries_allowed: bool = False,
interval_seconds: float = 1.0,
max_attempts: int = 3,
backoff_rate: float = 2.0
)

For example, we configure this step to retry exceptions (such as if example.com is temporarily down) up to 10 times:

@DBOS.step(retries_allowed=True, max_attempts=10)
def example_step():
return requests.get("https://example.com").text

Coroutine Steps

You may also decorate coroutines (functions defined with async def, also known as async functions) with @DBOS.step. Coroutine steps can use Python's asynchronous language capabilities such as await, async for and async with. Like syncronous step functions, async steps suppport configurable automatic retries and require their inputs and outputs to be serializable.

For example, here is an asynchronous version of the example_step function from above, using the aiohttp library instead of requests.

@DBOS.step(retries_allowed=True, max_attempts=10)
async def example_step():
async with aiohttp.ClientSession() as session:
async with session.get("https://example.com") as response:
return await response.text()