Skip to main content

Decorators

In DBOS, you annotate functions with decorators to give them properties.

Function Decorators

workflow

DBOS.workflow()

Durably execute this function as a DBOS workflow.

Example:

@DBOS.workflow()
def greeting_workflow(name: str, note: str):
sign_guestbook(name)
insert_greeting(name, note)

step

DBOS.step(
retries_allowed: bool = False,
interval_seconds: float = 1.0,
max_attempts: int = 3,
backoff_rate: float = 2.0
)

Mark a function as a step in a workflow. This has two benefits:

  1. It lets workflows know this function performs a complex operation or interacts with an external service, so the workflow can guarantee those operations or interactions happen exactly-once.

  2. DBOS provides configurable automatic retries with exponential backoff for steps to more easily handle transient errors.

Example:

@DBOS.step(retries_allowed=True, max_attempts=10)
def example_step():
return requests.get("https://example.com").text

Parameters:

  • retries_allowed: Whether to retry the step if it throws an exception.
  • interval_seconds: How long to wait before the initial retry.
  • max_attempts: How many times to retry a step that is throwing exceptions.
  • backoff_rate: How much to multiplicatively increase interval_seconds between retries.

transaction

DBOS.transaction(
isolation_level: str = "SERIALIZABLE"
)

Transactions are a special type of step that are optimized for database operations. They execute as a single database transaction. They provide database access through the DBOS.sql_session context variable.

Example:

@DBOS.transaction()
def example_insert(name: str, note: str) -> None:
# Insert a new greeting into the database
sql = text("INSERT INTO greetings (name, note) VALUES (:name, :note)")
DBOS.sql_session.execute(sql, {"name": name, "note": note})

Parameters:

  • isolation_level: The isolation level with which to run the transaction. Must be one of SERIALIZABLE, REPEATABLE READ, or READ COMMITTED. Defaults to SERIALIZABLE.

scheduled

DBOS.scheduled(
cron: str
)

Run a function on a schedule specified using crontab syntax. See here for a guide to cron syntax and here for a crontab editor.

The annotated function must take in two parameters: The time that the run was scheduled (as a datetime) and the time that the run was actually started (also a datetime).

Example:

@DBOS.scheduled('* * * * *') # crontab syntax to run once every minute
@DBOS.workflow()
def example_scheduled_workflow(scheduled_time: datetime, actual_time: datetime):
DBOS.logger.info("I am a workflow scheduled to run once a minute. ")

Parameters:

  • cron: The schedule in crontab syntax. DBOS uses croniter to parse cron schedules, which is able to do second repetition and by default we use seconds as the first field (second_at_beginning=True). The DBOS variant contains 5 or 6 items, separated by spaces:
 ┌────────────── second (optional)
│ ┌──────────── minute
│ │ ┌────────── hour
│ │ │ ┌──────── day of month
│ │ │ │ ┌────── month
│ │ │ │ │ ┌──── day of week
│ │ │ │ │ │
│ │ │ │ │ │
* * * * * *

required_roles

DBOS.required_roles(
roles: List[str]
)

The @DBOS.dbos_required_roles decorator applies role-based security to the decorated function. The authenticated user must have at least one of the roles on the roles list in order to access the function.

Parameters:

  • roles: List of required roles applied to the decorated function.

Example:

@DBOS.workflow()
@DBOS.required_roles(["support","admin")
def my_support_workflow():
pass # Function accessible only with "support" or "admin" role

kafka_consumer

DBOS.kafka_consumer(
config: dict[str, Any],
topics: list[str]
)

Runs a function for each Kafka message received on the specified topic(s). Uses the Kafka message's topic, partition and offset to create a unique workflow id to ensure once and only once execution. Takes a configuration dictionary and a list of topics to consume. The decorated function must take a KafkaMessage as its only parameter.

Parameters:

  • config: a dictionary of config settings. Information on required settings follows with full configuration setting details available in the official Kafka documentation.
    • bootstrap.servers: A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. This list should be in the form host1:port1,host2:port2,...
    • group.id: A unique string that identifies the consumer group this consumer belongs to.
  • topics: a list of Kafka topics to subscribe to

Example

@DBOS.kafka_consumer(
config={
"bootstrap.servers": "localhost:9092",
"group.id": "dbos-kafka-group",
},
topics=["example-topic"],
)
@DBOS.workflow()
def test_kafka_workflow(msg: KafkaMessage):
DBOS.logger.info(f"Message received: {msg.value.decode()}")

Classes and Decorators

Methods in classes can be decorated with any of the function decorators above. Functions marked as @classmethod or @staticmethod are supported in the same way as regular functions. Classes with instance methods should extend from DBOSConfiguredInstance.

dbos_class

DBOS.dbos_class()

The @DBOS.dbos_class decorator should be applied to all classes with DBOS workflow, transaction, and step functions. This decorator assists in making sure all functions are properly registered with the class and provided with class-level configuration information.

Example:

@DBOS.dbos_class()
class MyClass:
@staticmethod
@DBOS.workflow()
def my_class_wf():
pass

default_required_roles

DBOS.default_required_roles(
roles: List[str]
)

The @DBOS.default_required_roles decorator can be applied to a class to set the default list of required access roles for all functions in the class. The list of required roles for individual functions can be overridden with required_roles.

Parameters:

  • roles: List of required roles to apply to all functions not individually decorated with required_roles.

Example:

@DBOS.default_required_roles(["user"])
class MyClass:
@staticmethod
@DBOS.workflow()
def my_user_function() -> None:
pass # Must have "user" role to access

@staticmethod
@DBOS.workflow()
@DBOS.required_roles(["admin"])
def my_admin_function() -> None:
pass # Must have "admin" role to access

DBOSConfiguredInstance

DBOSConfiguredInstance(
instance_name: str
)

DBOSConfiguredInstance should be used as a base for classes with decorated instance member functions. DBOSConfiguredInstance collects the instance name; this name is recorded in the database workflow records so that recovery can be targeted to the correct instance. DBOSConfiguredInstance also registers the class instance with the DBOS recovery system.

Parameters:

  • instance_name: The name of the instance, for recording in workflow database records

Example:

    @DBOS.dbos_class()
class DBOSTestClass(DBOSConfiguredInstance):
def __init__(self) -> None:
super().__init__("instance1")