Workflows & Steps
RegisterWorkflow
func RegisterWorkflow[P any, R any](ctx DBOSContext, fn Workflow[P, R], opts ...WorkflowRegistrationOption)
Register a function as a DBOS workflow. All workflows must be registered before the context is launched.
Workflow functions must be compatible with the following signature:
type Workflow[P any, R any] func(ctx DBOSContext, input P) (R, error)
Parameters:
- ctx: The DBOSContext.
- fn: The workflow function to register.
- opts: Functional options for workflow registration, documented below.
WithMaxRetries
func WithMaxRetries(maxRetries int) WorkflowRegistrationOption
Configure the maximum number of times execution of a workflow may be attempted.
This acts as a dead letter queue so that a buggy workflow that crashes its application (for example, by running it out of memory) does not do so infinitely.
If a workflow exceeds this limit, its status is set to MAX_RECOVERY_ATTEMPTS_EXCEEDED and it may no longer be executed.
WithSchedule
func WithSchedule(schedule string) WorkflowRegistrationOption
Registers the workflow as a scheduled workflow using cron syntax.
The schedule string follows standard cron format with second precision.
Scheduled workflows automatically receive a time.Time input parameter.
WithWorkflowName
func WithWorkflowName(name string) WorkflowRegistrationOption
Register a workflow with a custom name. If not provided, the name of the workflow function is used.
RunWorkflow
func RunWorkflow[P any, R any](ctx DBOSContext, fn Workflow[P, R], input P, opts ...WorkflowOption) (WorkflowHandle[R], error)
Execute a workflow function. The workflow may execute immediately or be enqueued for later execution based on options. Returns a WorkflowHandle that can be used to check the workflow's status or wait for its completion and retrieve its results.
Parameters:
- ctx: The DBOSContext.
- fn: The workflow function to execute.
- input The input to the workflow function.
- opts: Functional options for workflow execution, documented below.
Example Syntax:
func workflow(ctx dbos.DBOSContext, input string) (string, error) {
return "success", err
}
func example(input string) error {
handle, err := dbos.RunWorkflow(dbosContext, workflow, input)
if err != nil {
return err
}
result, err := handle.GetResult()
if err != nil {
return err
}
fmt.Println("Workflow result:", result)
return nil
}
WithWorkflowID
func WithWorkflowID(id string) WorkflowOption
Run the workflow with a custom workflow ID. If not specified, a UUID workflow ID is generated.
WithQueue
func WithQueue(queueName string) WorkflowOption
Enqueue the workflow to the specified queue instead of executing it immediately. Queued workflows will be dequeued and executed according to the queue's configuration.
WithDeduplicationID
func WithDeduplicationID(id string) WorkflowOption
Set a deduplication ID for this workflow.
Should be used alongside WithQueue.
At any given time, only one workflow with a specific deduplication ID can be enqueued in a given queue.
If a workflow with a deduplication ID is currently enqueued or actively executing (status ENQUEUED or PENDING), subsequent workflow enqueue attempt with the same deduplication ID in the same queue will raise an exception.
WithPriority
func WithPriority(priority uint) WorkflowOption
Set a queue priority for the workflow.
Should be used alongside WithQueue.
Workflows with the same priority are dequeued in FIFO (first in, first out) order.
Priority values can range from 1 to 2,147,483,647, where a low number indicates a higher priority.
Workflows without assigned priorities have the highest priority and are dequeued before workflows with assigned priorities.
WithQueuePartitionKey
func WithQueuePartitionKey(partitionKey string) WorkflowOption
Set a queue partition key for the workflow.
Use if and only if the queue is partitioned (created with WithPartitionQueue).
In partitioned queues, all flow control (including concurrency and rate limits) is applied to individual partitions instead of the queue as a whole.
Example Syntax:
// Create a partitioned queue
partitionedQueue := dbos.NewWorkflowQueue(ctx, "user-tasks",
dbos.WithPartitionQueue(),
)
// Enqueue workflows with partition keys
// Each user's tasks run with separate concurrency limits
handle, err := dbos.RunWorkflow(ctx, ProcessUserTask, taskData,
dbos.WithQueue("user-tasks"),
dbos.WithQueuePartitionKey(userID),
)
- Partition keys are required when enqueueing to a partitioned queue.
- Partition keys cannot be used with non-partitioned queues.
- Partition keys and deduplication IDs cannot be used together.
WithApplicationVersion
func WithApplicationVersion(version string) WorkflowOption
Set the application version for this workflow, overriding the version in DBOSContext.
WithAuthenticatedUser
func WithAuthenticatedUser(user string) WorkflowOption
Associate the workflow execution with a user name. Useful to define workflow identity.
RunAsStep
func RunAsStep[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error)
Execute a function as a step in a durable workflow.
Parameters:
- ctx: The DBOSContext.
- fn: The step to execute, typically wrapped in an anonymous function. Syntax shown below.
- opts: Functional options for step execution, documented below.
Example Syntax:
Any Go function can be a step as long as it outputs one json-encodable value and an error. To pass inputs into a function being called as a step, wrap it in an anonymous function as shown below:
func step(ctx context.Context, input string) (string, error) {
output := ...
return output
}
func workflow(ctx dbos.DBOSContext, input string) (string, error) {
output, err := dbos.RunAsStep(
ctx,
func(stepCtx context.Context) (string, error) {
return step(stepCtx, input)
}
)
}
WithStepName
func WithStepName(name string) StepOption
Set a custom name for a step.
WithStepMaxRetries
func WithStepMaxRetries(maxRetries int) StepOption
Set the maximum number of times this step is automatically retired on failure. A value of 0 (the default) indicates no retries.
WithMaxInterval
func WithMaxInterval(interval time.Duration) StepOption
WithMaxInterval sets the maximum delay between retries. Default value is 5s.
WithBackoffFactor
func WithBackoffFactor(factor float64) StepOption
WithBackoffFactor sets the exponential backoff multiplier between retries. Default value is 2.0.
WithBaseInterval
func WithBaseInterval(interval time.Duration) StepOption
WithBaseInterval sets the initial delay between retries. Default value is 100ms.
Concurrent steps.
Go
func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan StepOutcome[R], error)
Launch a step asynchronously and return a channel that will receive the result when the step completes. This is a durable alternative to Go's native goroutines that checkpoints results to the database for deterministic replay. Can only be called from within a workflow.
Parameters:
- ctx: The DBOSContext.
- fn: The step function to execute asynchronously.
- opts: Functional options for step execution (same options as
RunAsStep).
Returns:
- A receive-only channel of
StepOutcome[R]that will receive exactly one value when the step completes, then close. - An error if the step could not be launched (e.g., if called outside a workflow).
type StepOutcome[R any] struct {
Result R
Err error
}
Example Syntax:
func workflow(ctx dbos.DBOSContext, _ string) (string, error) {
// Launch step asynchronously
resultChan, err := dbos.Go(ctx, func(ctx context.Context) (string, error) {
return performWork(ctx)
})
if err != nil {
return "", err
}
// Do other work...
// Wait for result
outcome := <-resultChan
if outcome.Err != nil {
return "", outcome.Err
}
return outcome.Result, nil
}
Select
func Select[R any](ctx DBOSContext, channels []<-chan StepOutcome[R]) (R, error)
Wait for and return the first result from multiple channels obtained from Go.
This is a durable alternative to Go's native select statement that checkpoints the selected result for deterministic replay.
Can only be called from within a workflow.
All channels must be of the same type R.
Parameters:
- ctx: The DBOSContext.
- channels: A slice of receive-only channels from
Gocalls.
Returns:
- The result value from the first channel to produce a value.
- An error if the selected step returned an error, if the context was cancelled, or if a channel was closed unexpectedly.
Behavior:
- If
channelsis empty, returns the zero value of typeRwith no error. - If the context is cancelled while waiting, returns the context error.
- The selected channel index and value are checkpointed, so workflow recovery returns the same result.
Example Syntax:
func workflow(ctx dbos.DBOSContext, _ string) (string, error) {
ch1, _ := dbos.Go(ctx, func(ctx context.Context) (string, error) {
return queryServiceA(ctx)
})
ch2, _ := dbos.Go(ctx, func(ctx context.Context) (string, error) {
return queryServiceB(ctx)
})
// Wait for the first result
result, err := dbos.Select(ctx, []<-chan dbos.StepOutcome[string]{ch1, ch2})
if err != nil {
return "", err
}
return result, nil
}
WorkflowHandle
type WorkflowHandle[R any] interface {
GetResult(opts ...GetResultOption) (R, error)
GetStatus() (WorkflowStatus, error)
GetWorkflowID() string
}
WorkflowHandle provides methods to interact with a running or completed workflow.
The type parameter R represents the expected return type of the workflow.
Handles can be used to wait for workflow completion, check status, and retrieve results.
WorkflowHandle.GetResult
WorkflowHandle.GetResult(opts ...GetResultOption) (R, error)
Wait for the workflow to complete and return its result.
WithHandleTimeout
func WithHandleTimeout(timeout time.Duration) GetResultOption
Specify a timeout for obtaining a workflow result.
WithHandlePollingInterval
func WithHandlePollingInterval(interval time.Duration) GetResultOption
Set the polling interval for checking workflow completion status in the database. Only positive interval values will be considered.
WorkflowHandle.GetStatus
WorkflowHandle.GetStatus() (WorkflowStatus, error)
Retrieve the WorkflowStatus of the workflow.
WorkflowHandle.GetWorkflowID
WorkflowHandle.GetWorkflowID() string
Retrieve the ID of the workflow.
Patching
Patch
func Patch(ctx DBOSContext, patchName string) (bool, error)
Insert a patch marker at the current point in workflow history, returning true if it was successfully inserted and false if there is already a checkpoint present at this point in history indicating that the workflow should run unpatched.
Used to safely upgrade workflow code; see the patching tutorial for more detail.
Parameters:
- ctx: The DBOSContext.
- patchName: The name to give the patch marker that will be inserted into workflow history.
Patching must be enabled in your configuration by setting EnablePatching: true.
DeprecatePatch
func DeprecatePatch(ctx DBOSContext, patchName string) (bool, error)
Safely bypass a patch marker at the current point in workflow history if present.
Always returns true.
Used to safely deprecate patches; see the patching tutorial for more detail.
Parameters:
- ctx: The DBOSContext.
- patchName: The name of the patch marker to be bypassed.
Patching must be enabled in your configuration by setting EnablePatching: true.