Skip to main content

Workflows & Steps

Workflows

DBOS.workflow

DBOS.workflow(
config: WorkflowConfig = {}
)
export interface WorkflowConfig {
maxRecoveryAttempts?: number;
}

A decorator that marks a function as a DBOS durable workflow.

Example:

export class Example {
@DBOS.workflow()
static async exampleWorkflow() {
await Example.stepOne();
await Example.stepTwo();
}
}

// The workflow function can be called normally
await Example.exampleWorkflow();

Parameters:

  • config:
    • max_recovery_attempts: The maximum number of times the workflow may be attempted. This acts as a dead letter queue so that a buggy workflow that crashes its application (for example, by running it out of memory) does not do so infinitely. If a workflow exceeds this limit, its status is set to RETRIES_EXCEEDED and it is no longer automatically recovered.

DBOS.registerWorkflow

DBOS.registerWorkflow<This, Args extends unknown[], Return>(
func: (this: This, ...args: Args) => Promise<Return>,
config?: FunctionName & WorkflowConfig,
): (this: This, ...args: Args) => Promise<Return> => Promise<Return>
interface FunctionName {
name?: string;
}

Wrap a function in a DBOS workflow. Returns the wrapped function.

Example:

async function exampleWorkflowFunction() {
await stepOne();
await stepTwo();
}

const workflow = DBOS.registerWorkflow(exampleWorkflowFunction, {"name": "exampleWorkflow"})
// The registered workflow can be called normally
await workflow();

Parameters:

  • func: The function to be wrapped in a workflow.
  • name: A name to give the workflow.
  • config:
    • name: The name with which to register the workflow. Defaults to the function name.
    • max_recovery_attempts: The maximum number of times the workflow may be attempted. This acts as a dead letter queue so that a buggy workflow that crashes its application (for example, by running it out of memory) does not do so infinitely. If a workflow exceeds this limit, its status is set to RETRIES_EXCEEDED and it is no longer automatically recovered.

DBOS.scheduled

DBOS.scheduled(
schedulerConfig: SchedulerConfig
);
class SchedulerConfig {
crontab: string;
mode?: SchedulerMode = SchedulerMode.ExactlyOncePerIntervalWhenActive;
queueName?: string;
}

A decorator directing DBOS to run a workflow on a schedule specified using crontab syntax. See here for a guide to cron syntax and here for a crontab editor.

The annotated function must take in two parameters: The time that the run was scheduled (as a Date) and the time that the run was actually started (also a Date). For example:

import { DBOS } from '@dbos-inc/dbos-sdk';

class ScheduledExample{
@DBOS.workflow()
@DBOS.scheduled({crontab: '*/30 * * * * *'})
static async scheduledWorkflow(schedTime: Date, startTime: Date) {
DBOS.logger.info(`I am a workflow scheduled to run every 30 seconds`);
}
}

Parameters:

  • schedulerConfig:
    • crontab: The schedule in crontab syntax. The DBOS variant contains 5 or 6 items, separated by spaces:
 ┌────────────── second (optional)
│ ┌──────────── minute
│ │ ┌────────── hour
│ │ │ ┌──────── day of month
│ │ │ │ ┌────── month
│ │ │ │ │ ┌──── day of week
│ │ │ │ │ │
│ │ │ │ │ │
* * * * * *
  • mode: Whether or not to retroactively start workflows that were scheduled during times when the app was not running. Set to SchedulerMode.ExactlyOncePerInterval to enable this behavior.
  • queueName: If set, workflows will be enqueued on the named queue, rather than being started immediately.

DBOS.registerScheduled

registerScheduled<This, Return>(
func: (this: This, ...args: ScheduledArgs) => Promise<Return>,
config: SchedulerConfig,
)

Register a workflow to run on a schedule. The semantics are the same as for the DBOS.scheduled decorator. For example:

async function scheduledFunction(schedTime: Date, startTime: Date) {
DBOS.logger.info(`I am a workflow scheduled to run every 30 seconds`);
}

const scheduledWorkflow = DBOS.registerWorkflow(scheduledFunction);
DBOS.registerScheduled(scheduledWorkflow, {crontab: '*/30 * * * * *'});

Steps

DBOS.step

DBOS.step(
config: StepConfig = {}
)
interface StepConfig {
retriesAllowed?: boolean;
intervalSeconds?: number;
maxAttempts?: number;
backoffRate?: number;
}

A decorator that marks a function as a step in a durable workflow.

Example:

export class Example {
@DBOS.step()
static async stepOne() {
DBOS.logger.info("Step one completed!");
}

@DBOS.step()
static async stepTwo() {
DBOS.logger.info("Step two completed!");
}

// Call steps from workflows
@DBOS.workflow()
static async exampleWorkflow() {
await Toolbox.stepOne();
await Toolbox.stepTwo();
}
}

Parameters:

  • config:
    • retriesAllowed: Whether to retry the step if it throws an exception.
    • intervalSeconds: How long to wait before the initial retry.
    • maxAttempts: How many times to retry a step that is throwing exceptions.
    • backoffRate: How much to multiplicatively increase intervalSeconds between retries.

DBOS.registerStep

DBOS.registerStep<This, Args extends unknown[], Return>(
func: (this: This, ...args: Args) => Promise<Return>,
config: StepConfig & FunctionName = {},
): (this: This, ...args: Args) => Promise<Return>

Wrap a function in a step to safely call it from a durable workflow. Returns the wrapped function.

Example:

async function stepOneFunction() {
DBOS.logger.info("Step one completed!");
}
const stepOne = DBOS.registerStep(stepOneFunction, {"name": "stepOne"});

async function stepTwoFunction() {
DBOS.logger.info("Step two completed!");
}
const stepTwo = DBOS.registerStep(stepTwoFunction, {"name": "stepTwo"});

// Call steps from workflows
async function workflowFunction() {
await stepOne();
await stepTwo();
}
const workflow = DBOS.registerWorkflow(workflowFunction, {"name": "exampleWorkflow"})

Parameters:

  • func: The function to be wrapped in a step.
  • config:
    • name: A name to give the step. If not provided, use the function name.
    • retriesAllowed: Whether to retry the step if it throws an exception.
    • intervalSeconds: How long to wait before the initial retry.
    • maxAttempts: How many times to retry a step that is throwing exceptions.
    • backoffRate: How much to multiplicatively increase intervalSeconds between retries.

DBOS.runStep

runStep<Return>(
func: () => Promise<Return>,
config: StepConfig & { name?: string } = {}
): Promise<Return>

Run a function as a step in a workflow. Can only be called from a durable workflow. Returns the output of the step.

Example:

async function stepOne() {
DBOS.logger.info("Step one completed!");
}

async function stepTwo() {
DBOS.logger.info("Step two completed!");
}

// Use DBOS.runStep to run any function as a step
async function exampleWorkflow() {
await DBOS.runStep(() => stepOne(), {name: "stepOne"});
await DBOS.runStep(() => stepTwo(), {name: "stepTwo"});
}

Parameters:

  • func: The function to run as a step.
  • config:
    • name: A name to give the step.
    • retriesAllowed: Whether to retry the step if it throws an exception.
    • intervalSeconds: How long to wait before the initial retry.
    • maxAttempts: How many times to retry a step that is throwing exceptions.
    • backoffRate: How much to multiplicatively increase intervalSeconds between retries.

Instance Method Workflows

abstract class ConfiguredInstance {
constructor(name: string)
}

You can register or decorate class instance methods. However, if a class has any workflow methods, that class must inherit from ConfiguredInstance, which takes an instance name and registers the instance.

When you create a new instance of the class, the constructor for the base ConfiguredInstance must be called with a name. This name should be unique among instances of the same class. Additionally, all ConfiguredInstance classes must be instantiated before DBOS.launch() is called.

For example:

class MyClass extends ConfiguredInstance {
cfg: MyConfig;
constructor(name: string, config: MyConfig) {
super(name);
this.cfg = cfg;
}

@DBOS.workflow()
async testWorkflow(p: string): Promise<void> {
// ... Operations that use this.cfg
}
}

const myClassInstance = new MyClass('instanceA');

The reason for these requirements is to enable workflow recovery. When you create a new instance of, DBOS stores it in a global registry indexed by name. When DBOS needs to recover a workflow belonging to that class, it looks up the name so it can run the workflow using the right class instance. While names are used by DBOS Transact internally to find the correct object instance across system restarts, they are also potentially useful for monitoring, tracing, and debugging.