Skip to main content

Workflows & Steps

RegisterWorkflow

func RegisterWorkflow[P any, R any](ctx DBOSContext, fn Workflow[P, R], opts ...WorkflowRegistrationOption)

Register a function as a DBOS workflow. All workflows must be registered before the context is launched.

Workflow functions must be compatible with the following signature:

type Workflow[P any, R any] func(ctx DBOSContext, input P) (R, error)

Parameters:

  • ctx: The DBOSContext.
  • fn: The workflow function to register.
  • opts: Functional options for workflow registration, documented below.

WithMaxRetries

func WithMaxRetries(maxRetries int) WorkflowRegistrationOption

Configure the maximum number of times execution of a workflow may be attempted. This acts as a dead letter queue so that a buggy workflow that crashes its application (for example, by running it out of memory) does not do so infinitely. If a workflow exceeds this limit, its status is set to MAX_RECOVERY_ATTEMPTS_EXCEEDED and it may no longer be executed.

WithSchedule

func WithSchedule(schedule string) WorkflowRegistrationOption

Registers the workflow as a scheduled workflow using cron syntax. The schedule string follows standard cron format with second precision. Scheduled workflows automatically receive a time.Time input parameter.

WithWorkflowName

func WithWorkflowName(name string) WorkflowRegistrationOption

Register a workflow with a custom name. If not provided, the name of the workflow function is used.

RunWorkflow

func RunWorkflow[P any, R any](ctx DBOSContext, fn Workflow[P, R], input P, opts ...WorkflowOption) (WorkflowHandle[R], error)

Execute a workflow function. The workflow may execute immediately or be enqueued for later execution based on options. Returns a WorkflowHandle that can be used to check the workflow's status or wait for its completion and retrieve its results.

Parameters:

  • ctx: The DBOSContext.
  • fn: The workflow function to execute.
  • input The input to the workflow function.
  • opts: Functional options for workflow execution, documented below.

Example Syntax:

func workflow(ctx dbos.DBOSContext, input string) (string, error) {
return "success", err
}

func example(input string) error {
handle, err := dbos.RunWorkflow(dbosContext, workflow, input)
if err != nil {
return err
}
result, err := handle.GetResult()
if err != nil {
return err
}
fmt.Println("Workflow result:", result)
return nil
}

WithWorkflowID

func WithWorkflowID(id string) WorkflowOption

Run the workflow with a custom workflow ID. If not specified, a UUID workflow ID is generated.

WithQueue

func WithQueue(queueName string) WorkflowOption

Enqueue the workflow to the specified queue instead of executing it immediately. Queued workflows will be dequeued and executed according to the queue's configuration.

WithDeduplicationID

func WithDeduplicationID(id string) WorkflowOption

Set a deduplication ID for this workflow. Should be used alongside WithQueue. At any given time, only one workflow with a specific deduplication ID can be enqueued in a given queue. If a workflow with a deduplication ID is currently enqueued or actively executing (status ENQUEUED or PENDING), subsequent workflow enqueue attempt with the same deduplication ID in the same queue will raise an exception.

WithPriority

func WithPriority(priority uint) WorkflowOption

Set a queue priority for the workflow. Should be used alongside WithQueue. Workflows with the same priority are dequeued in FIFO (first in, first out) order. Priority values can range from 1 to 2,147,483,647, where a low number indicates a higher priority. Workflows without assigned priorities have the highest priority and are dequeued before workflows with assigned priorities.

WithQueuePartitionKey

func WithQueuePartitionKey(partitionKey string) WorkflowOption

Set a queue partition key for the workflow. Use if and only if the queue is partitioned (created with WithPartitionQueue). In partitioned queues, all flow control (including concurrency and rate limits) is applied to individual partitions instead of the queue as a whole.

Example Syntax:

// Create a partitioned queue
partitionedQueue := dbos.NewWorkflowQueue(ctx, "user-tasks",
dbos.WithPartitionQueue(),
)

// Enqueue workflows with partition keys
// Each user's tasks run with separate concurrency limits
handle, err := dbos.RunWorkflow(ctx, ProcessUserTask, taskData,
dbos.WithQueue("user-tasks"),
dbos.WithQueuePartitionKey(userID),
)
info
  • Partition keys are required when enqueueing to a partitioned queue.
  • Partition keys cannot be used with non-partitioned queues.
  • Partition keys and deduplication IDs cannot be used together.

WithApplicationVersion

func WithApplicationVersion(version string) WorkflowOption

Set the application version for this workflow, overriding the version in DBOSContext.

WithAuthenticatedUser

func WithAuthenticatedUser(user string) WorkflowOption

Associate the workflow execution with a user name. Useful to define workflow identity.

RunAsStep

func RunAsStep[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error)

Execute a function as a step in a durable workflow.

Parameters:

  • ctx: The DBOSContext.
  • fn: The step to execute, typically wrapped in an anonymous function. Syntax shown below.
  • opts: Functional options for step execution, documented below.

Example Syntax:

Any Go function can be a step as long as it outputs one json-encodable value and an error. To pass inputs into a function being called as a step, wrap it in an anonymous function as shown below:

func step(ctx context.Context, input string) (string, error) {
output := ...
return output
}

func workflow(ctx dbos.DBOSContext, input string) (string, error) {
output, err := dbos.RunAsStep(
ctx,
func(stepCtx context.Context) (string, error) {
return step(stepCtx, input)
}
)
}

WithStepName

func WithStepName(name string) StepOption

Set a custom name for a step.

WithStepMaxRetries

func WithStepMaxRetries(maxRetries int) StepOption

Set the maximum number of times this step is automatically retired on failure. A value of 0 (the default) indicates no retries.

WithMaxInterval

func WithMaxInterval(interval time.Duration) StepOption

WithMaxInterval sets the maximum delay between retries. Default value is 5s.

WithBackoffFactor

func WithBackoffFactor(factor float64) StepOption

WithBackoffFactor sets the exponential backoff multiplier between retries. Default value is 2.0.

WithBaseInterval

func WithBaseInterval(interval time.Duration) StepOption

WithBaseInterval sets the initial delay between retries. Default value is 100ms.

Concurrent steps.

Go

func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan StepOutcome[R], error)

Launch a step asynchronously and return a channel that will receive the result when the step completes. This is a durable alternative to Go's native goroutines that checkpoints results to the database for deterministic replay. Can only be called from within a workflow.

Parameters:

  • ctx: The DBOSContext.
  • fn: The step function to execute asynchronously.
  • opts: Functional options for step execution (same options as RunAsStep).

Returns:

  • A receive-only channel of StepOutcome[R] that will receive exactly one value when the step completes, then close.
  • An error if the step could not be launched (e.g., if called outside a workflow).
type StepOutcome[R any] struct {
Result R
Err error
}

Example Syntax:

func workflow(ctx dbos.DBOSContext, _ string) (string, error) {
// Launch step asynchronously
resultChan, err := dbos.Go(ctx, func(ctx context.Context) (string, error) {
return performWork(ctx)
})
if err != nil {
return "", err
}

// Do other work...

// Wait for result
outcome := <-resultChan
if outcome.Err != nil {
return "", outcome.Err
}
return outcome.Result, nil
}

Select

func Select[R any](ctx DBOSContext, channels []<-chan StepOutcome[R]) (R, error)

Wait for and return the first result from multiple channels obtained from Go. This is a durable alternative to Go's native select statement that checkpoints the selected result for deterministic replay. Can only be called from within a workflow. All channels must be of the same type R.

Parameters:

  • ctx: The DBOSContext.
  • channels: A slice of receive-only channels from Go calls.

Returns:

  • The result value from the first channel to produce a value.
  • An error if the selected step returned an error, if the context was cancelled, or if a channel was closed unexpectedly.

Behavior:

  • If channels is empty, returns the zero value of type R with no error.
  • If the context is cancelled while waiting, returns the context error.
  • The selected channel index and value are checkpointed, so workflow recovery returns the same result.

Example Syntax:

func workflow(ctx dbos.DBOSContext, _ string) (string, error) {
ch1, _ := dbos.Go(ctx, func(ctx context.Context) (string, error) {
return queryServiceA(ctx)
})
ch2, _ := dbos.Go(ctx, func(ctx context.Context) (string, error) {
return queryServiceB(ctx)
})

// Wait for the first result
result, err := dbos.Select(ctx, []<-chan dbos.StepOutcome[string]{ch1, ch2})
if err != nil {
return "", err
}
return result, nil
}

WorkflowHandle

type WorkflowHandle[R any] interface {
GetResult(opts ...GetResultOption) (R, error)
GetStatus() (WorkflowStatus, error)
GetWorkflowID() string
}

WorkflowHandle provides methods to interact with a running or completed workflow. The type parameter R represents the expected return type of the workflow. Handles can be used to wait for workflow completion, check status, and retrieve results.

WorkflowHandle.GetResult

WorkflowHandle.GetResult(opts ...GetResultOption) (R, error)

Wait for the workflow to complete and return its result.

WithHandleTimeout
func WithHandleTimeout(timeout time.Duration) GetResultOption

Specify a timeout for obtaining a workflow result.

WithHandlePollingInterval
func WithHandlePollingInterval(interval time.Duration) GetResultOption

Set the polling interval for checking workflow completion status in the database. Only positive interval values will be considered.

WorkflowHandle.GetStatus

WorkflowHandle.GetStatus() (WorkflowStatus, error)

Retrieve the WorkflowStatus of the workflow.

WorkflowHandle.GetWorkflowID

WorkflowHandle.GetWorkflowID() string

Retrieve the ID of the workflow.

Patching

Patch

func Patch(ctx DBOSContext, patchName string) (bool, error)

Insert a patch marker at the current point in workflow history, returning true if it was successfully inserted and false if there is already a checkpoint present at this point in history indicating that the workflow should run unpatched. Used to safely upgrade workflow code; see the patching tutorial for more detail.

Parameters:

  • ctx: The DBOSContext.
  • patchName: The name to give the patch marker that will be inserted into workflow history.
info

Patching must be enabled in your configuration by setting EnablePatching: true.

DeprecatePatch

func DeprecatePatch(ctx DBOSContext, patchName string) (bool, error)

Safely bypass a patch marker at the current point in workflow history if present. Always returns true. Used to safely deprecate patches; see the patching tutorial for more detail.

Parameters:

  • ctx: The DBOSContext.
  • patchName: The name of the patch marker to be bypassed.
info

Patching must be enabled in your configuration by setting EnablePatching: true.