Skip to main content

Steps

When using DBOS workflows, you should annotate any function that performs complex operations or accesses external APIs or services as a step. If a workflow is interrupted, upon restart it automatically resumes execution from the last completed step.

You can turn any Python function into a step by annotating it with the @DBOS.step decorator. The only requirement is that its outputs should be serializable. Here's a simple example:

@DBOS.step()
def example_step():
return requests.get("https://example.com").text

You should make a function a step if you're using it in a DBOS workflow and it performs a nondeterministic operation. A nondeterministic operation is one that may return different outputs given the same inputs. Common nondeterministic operations include:

  • Accessing an external API or service, like serving a file from AWS S3, calling an external API like Stripe, or accessing an external data store like Elasticsearch.
  • Accessing files on disk.
  • Generating a random number.
  • Getting the current time.

You cannot call, start, or enqueue workflows from within steps. These operations should be performed from workflow functions. You can call one step from another step, but the called step becomes part of the calling step's execution rather than functioning as a separate step.

Configurable Retries

You can optionally configure a step to automatically retry any exception a set number of times with exponential backoff. This is useful for automatically handling transient failures, like making requests to unreliable APIs. Retries are configurable through arguments to the step decorator:

DBOS.step(
retries_allowed: bool = False,
interval_seconds: float = 1.0,
max_attempts: int = 3,
backoff_rate: float = 2.0,
should_retry: Optional[Callable[[BaseException], Union[bool, Awaitable[bool]]]] = None,
)

For example, we configure this step to retry exceptions (such as if example.com is temporarily down) up to 10 times:

@DBOS.step(retries_allowed=True, max_attempts=10)
def example_step():
return requests.get("https://example.com").text

If a step exhausts all max_attempts retries, it throws an exception (DBOSMaxStepRetriesExceeded) to the calling workflow. If that exception is not caught, the workflow terminates.

Filtering Retries With should_retry

By default, every exception raised by the step is retried until max_attempts is reached. If you only want to retry certain exceptions; for example, transient network errors but not validation failures, pass a should_retry predicate. The predicate receives the raised exception. If it returns False, the exception is re-raised immediately and no further retries are attempted.

@DBOS.step(
retries_allowed=True,
max_attempts=10,
should_retry=lambda e: not isinstance(e, FatalError),
)
def example_step():
return requests.get("https://example.com").text

For async steps, should_retry may itself be an async function:

async def is_retryable(e: BaseException) -> bool:
return not isinstance(e, FatalError)

@DBOS.step(retries_allowed=True, max_attempts=10, should_retry=is_retryable)
async def example_step():
...

Async predicates are only supported for async steps; pairing an async should_retry with a sync step raises an exception.

Coroutine Steps

You may also decorate coroutines (functions defined with async def, also known as async functions) with @DBOS.step. Coroutine steps can use Python's asynchronous language capabilities such as await, async for and async with. Like syncronous step functions, async steps support configurable automatic retries and require their inputs and outputs to be serializable.

For example, here is an asynchronous version of the example_step function from above, using the aiohttp library instead of requests.

@DBOS.step(retries_allowed=True, max_attempts=10)
async def example_step():
async with aiohttp.ClientSession() as session:
async with session.get("https://example.com") as response:
return await response.text()

Running Steps In-Line With run_step

If a function is not decorated with @DBOS.step and you would prefer not to wrap it, you can call the code as a step using DBOS.run_step (or DBOS.run_step_async).

For example, if your code said:

res = send_email(user, msg)

It could be quickly changed to a checkpointed step:

res = DBOS.run_step(None, send_email, user, msg)

Or:

res = DBOS.run_step({"name": "send_email_to_user"}, lambda: send_email(user, msg))