Steps
When using DBOS workflows, we recommend annotating any function that performs complex operations or accesses external APIs or services as a step.
You can turn any Python function into a step by annotating it with the @DBOS.step
decorator.
The only requirement is that its inputs and outputs should be serializable (pickle-able).
Here's a simple example:
@DBOS.step()
def example_step():
return requests.get("https://example.com").text
You should make a function a step if you're using it in a DBOS workflow and it accesses an external API or service, like serving a file from AWS S3, calling an external API like Stripe, or accessing an external data store like Elasticsearch.
Making a function a step has two benefits:
-
If a workflow is interrupted, upon restart it automatically resumes execution from the last completed step. Therefore, making a function a step guarantees that a workflow will never re-execute it once it completes.
-
DBOS provides configurable automatic retries for steps to more easily handle transient errors.
Configurable Retries
You can optionally configure a step to automatically retry any exception a set number of times with exponential backoff. Retries are configurable through arguments to the step decorator:
DBOS.step(
retries_allowed: bool = False,
interval_seconds: float = 1.0,
max_attempts: int = 3,
backoff_rate: float = 2.0
)
For example, we configure this step to retry exceptions (such as if example.com
is temporarily down) up to 10 times:
@DBOS.step(retries_allowed=True, max_attempts=10)
def example_step():
return requests.get("https://example.com").text
Coroutine Steps
You may also decorate coroutines (functions defined with async def
, also known as async functions) with @DBOS.step
.
Coroutine steps can use Python's asynchronous language capabilities such as await, async for and async with.
Like syncronous step functions, async steps suppport configurable automatic retries and require their inputs and outputs to be serializable.
For example, here is an asynchronous version of the example_step
function from above, using the aiohttp
library instead of requests
.
@DBOS.step(retries_allowed=True, max_attempts=10)
async def example_step():
async with aiohttp.ClientSession() as session:
async with session.get("https://example.com") as response:
return await response.text()