Skip to main content

DBOS Methods & Variables

DBOS Methods

GetEvent

func GetEvent[R any](ctx DBOSContext, targetWorkflowID, key string, timeout time.Duration) (R, error)

Retrieve the latest value of an event published by the workflow identified by targetWorkflowID to the key key. If the event does not yet exist, wait for it to be published, an error if the wait times out.

Parameters:

  • ctx: The DBOS context.
  • targetWorkflowID: The identifier of the workflow whose events to retrieve.
  • key: The key of the event to retrieve.
  • timeout: A timeout. If the wait times out, return an error.

SetEvent

func SetEvent[P any](ctx DBOSContext, key string, message P) error

Create and associate with this workflow an event with key key and value value. If the event already exists, update its value. Can only be called from within a workflow.

Parameters:

  • ctx: The DBOS context.
  • key: The key of the event.
  • message: The value of the event. Must be serializable.

Send

func Send[P any](ctx DBOSContext, destinationID string, message P, topic string) error

Send a message to the workflow identified by destinationID. Messages can optionally be associated with a topic.

Parameters:

  • ctx: The DBOS context.
  • destinationID: The workflow to which to send the message.
  • message: The message to send. Must be serializable.
  • topic: A topic with which to associate the message. Messages are enqueued per-topic on the receiver.

Recv

func Recv[R any](ctx DBOSContext, topic string, timeout time.Duration) (R, error)

Receive and return a message sent to this workflow. Can only be called from within a workflow. Messages are dequeued first-in, first-out from a queue associated with the topic. Calls to recv wait for the next message in the queue, returning an error if the wait times out.

Parameters:

  • ctx: The DBOS context.
  • topic: A topic queue on which to wait.
  • timeoutSeconds: A timeout in seconds. If the wait times out, return an error.

Sleep

func Sleep(ctx DBOSContext, duration time.Duration) (time.Duration, error)

Sleep for the given duration. May only be called from within a workflow. This sleep is durable—it records its intended wake-up time in the database so if it is interrupted and recovers, it still wakes up at the intended time.

Parameters:

  • ctx: The DBOS context.
  • duration: The duration to sleep.

RetrieveWorkflow

func RetrieveWorkflow[R any](ctx DBOSContext, workflowID string) (*workflowPollingHandle[R], error)

Retrieve the handle of a workflow.

Parameters:

  • ctx: The DBOS context.
  • workflowID: The ID of the workflow whose handle to retrieve.

Workflow Management Methods

ListWorkflows

func ListWorkflows(ctx DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error)

Retrieve a list of WorkflowStatus of all workflows matching specified criteria.

Example usage:

// List all successful workflows from the last 24 hours
workflows, err := dbos.ListWorkflows(ctx,
dbos.WithStatus([]dbos.WorkflowStatusType{dbos.WorkflowStatusSuccess}),
dbos.WithStartTime(time.Now().Add(-24*time.Hour)),
dbos.WithLimit(100))
if err != nil {
log.Fatal(err)
}

// List workflows by specific IDs without loading input/output data
workflows, err := dbos.ListWorkflows(ctx,
dbos.WithWorkflowIDs([]string{"workflow1", "workflow2"}),
dbos.WithLoadInput(false),
dbos.WithLoadOutput(false))
if err != nil {
log.Fatal(err)
}

WithAppVersion

func WithAppVersion(appVersion string) ListWorkflowsOption

Retrieve workflows tagged with this application version.

WithEndTime

func WithEndTime(endTime time.Time) ListWorkflowsOption

Retrieve workflows started before this timestamp.

WithLimit

func WithLimit(limit int) ListWorkflowsOption

Retrieve up to this many workflows.

WithLoadInput

func WithLoadInput(loadInput bool) ListWorkflowsOption

WithLoadInput controls whether to load workflow input data (default: true).

WithLoadOutput

func WithLoadOutput(loadOutput bool) ListWorkflowsOption

WithLoadOutput controls whether to load workflow output data (default: true).

WithName

func WithName(name string) ListWorkflowsOption

Filter workflows by the specified workflow function name.

WithOffset

func WithOffset(offset int) ListWorkflowsOption

Skip this many workflows from the results returned (for pagination).

WithSortDesc

func WithSortDesc(sortDesc bool) ListWorkflowsOption

Sort the results in descending (true) or ascending (false) order by workflow start time.

WithStartTime

func WithStartTime(startTime time.Time) ListWorkflowsOption

Retrieve workflows started after this timestamp.

WithStatus

func WithStatus(status []WorkflowStatusType) ListWorkflowsOption

Filter workflows by status. Multiple statuses can be specified.

WithUser

func WithUser(user string) ListWorkflowsOption

Filter workflows run by this authenticated user.

WithWorkflowIDs

func WithWorkflowIDs(workflowIDs []string) ListWorkflowsOption

Filter workflows by specific workflow IDs.

WithWorkflowIDPrefix

func WithWorkflowIDPrefix(prefix string) ListWorkflowsOption

Filter workflows whose IDs start with the specified prefix.

WithQueuesOnly

func WithQueuesOnly() ListWorkflowsOption

Return only workflows that are currently in a queue (queue name is not null, status is ENQUEUED or PENDING).

GetWorkflowSteps

func GetWorkflowSteps(ctx DBOSContext, workflowID string) ([]StepInfo, error)

GetWorkflowSteps retrieves the execution steps of a workflow. This is a list of StepInfo objects, with the following structure:

type StepInfo struct {
StepID int // The sequential ID of the step within the workflow
StepName string // The name of the step function
Output any // The output returned by the step (if any)
Error error // The error returned by the step (if any)
ChildWorkflowID string // If the step starts or retrieves the result of a workflow, its ID
}

Parameters:

  • ctx: The DBOS context.
  • workflowID: The ID of the workflow to cancel.

CancelWorkflow

func CancelWorkflow(ctx DBOSContext, workflowID string) error

Cancel a workflow. This sets its status to CANCELLED, removes it from its queue (if it is enqueued) and preempts its execution (interrupting it at the beginning of its next step).

Parameters:

  • ctx: The DBOS context.
  • workflowID: The ID of the workflow to cancel.

ResumeWorkflow

func ResumeWorkflow[R any](ctx DBOSContext, workflowID string) (*WorkflowHandle[R], error)

Resume a workflow. This immediately starts it from its last completed step. You can use this to resume workflows that are cancelled or have exceeded their maximum recovery attempts. You can also use this to start an enqueued workflow immediately, bypassing its queue.

Parameters:

  • ctx: The DBOS context.
  • workflowID: The ID of the workflow to resume.

ForkWorkflow

func ForkWorkflow[R any](ctx DBOSContext, input ForkWorkflowInput) (WorkflowHandle[R], error)

Start a new execution of a workflow from a specific step. The input step ID (startStep) must match the step number of the step returned by workflow introspection. The specified startStep is the step from which the new workflow will start, so any steps whose ID is less than startStep will not be re-executed.

Parameters:

  • ctx: The DBOS context.
  • input: A ForkWorkflowInput struct where OriginalWorkflowID is mandatory.
type ForkWorkflowInput struct {
OriginalWorkflowID string // Required: The UUID of the original workflow to fork from
ForkedWorkflowID string // Optional: Custom workflow ID for the forked workflow (auto-generated if empty)
StartStep uint // Optional: Step to start the forked workflow from (default: 0)
ApplicationVersion string // Optional: Application version for the forked workflow (inherits from original if empty)
}

Workflow Status

Some workflow introspection and management methods return a WorkflowStatus. This object has the following definition:

type WorkflowStatus struct {
ID string `json:"workflow_uuid"` // Unique identifier for the workflow
Status WorkflowStatusType `json:"status"` // Current execution status
Name string `json:"name"` // Function name of the workflow
AuthenticatedUser *string `json:"authenticated_user"` // User who initiated the workflow (if applicable)
AssumedRole *string `json:"assumed_role"` // Role assumed during execution (if applicable)
AuthenticatedRoles *string `json:"authenticated_roles"` // Roles available to the user (if applicable)
Output any `json:"output"` // Workflow output (available after completion)
Error error `json:"error"` // Error information (if status is ERROR)
ExecutorID string `json:"executor_id"` // ID of the executor running this workflow
CreatedAt time.Time `json:"created_at"` // When the workflow was created
UpdatedAt time.Time `json:"updated_at"` // When the workflow status was last updated
ApplicationVersion string `json:"application_version"` // Version of the application that created this workflow
ApplicationID string `json:"application_id"` // Application identifier
Attempts int `json:"attempts"` // Number of execution attempts
QueueName string `json:"queue_name"` // Queue name (if workflow was enqueued)
Timeout time.Duration `json:"timeout"` // Workflow timeout duration
Deadline time.Time `json:"deadline"` // Absolute deadline for workflow completion
StartedAt time.Time `json:"started_at"` // When the workflow execution actually started
DeduplicationID string `json:"deduplication_id"` // Deduplication identifier (if applicable)
Input any `json:"input"` // Input parameters passed to the workflow
Priority int `json:"priority"` // Execution priority (lower numbers have higher priority)
}

WorkflowStatusType

The WorkflowStatusType represents the execution status of a workflow:

type WorkflowStatusType string

const (
WorkflowStatusPending WorkflowStatusType = "PENDING" // Workflow is running or ready to run
WorkflowStatusEnqueued WorkflowStatusType = "ENQUEUED" // Workflow is queued and waiting for execution
WorkflowStatusSuccess WorkflowStatusType = "SUCCESS" // Workflow completed successfully
WorkflowStatusError WorkflowStatusType = "ERROR" // Workflow completed with an error
WorkflowStatusCancelled WorkflowStatusType = "CANCELLED" // Workflow was cancelled (manually or due to timeout)
WorkflowStatusMaxRecoveryAttemptsExceeded WorkflowStatusType = "MAX_RECOVERY_ATTEMPTS_EXCEEDED" // Workflow exceeded maximum retry attempts
)

DBOS Variables

GetWorkflowID

func GetWorkflowID(ctx DBOSContext) (string, error)

Return the ID of the current workflow, if in a workflow. Returns an error if not called from within a workflow context.

Parameters:

  • ctx: The DBOS context.

GetStepID

func GetStepID(ctx DBOSContext) (string, error)

Return the unique ID of the current step within a workflow. Returns an error if not called from within a step context.

Parameters:

  • ctx: The DBOS context.