DBOS Methods & Variables
DBOS Methods
GetEvent
func GetEvent[R any](ctx DBOSContext, targetWorkflowID, key string, timeout time.Duration) (R, error)
Retrieve the latest value of an event published by the workflow identified by targetWorkflowID
to the key key
.
If the event does not yet exist, wait for it to be published, an error if the wait times out.
Parameters:
- ctx: The DBOS context.
- targetWorkflowID: The identifier of the workflow whose events to retrieve.
- key: The key of the event to retrieve.
- timeout: A timeout. If the wait times out, return an error.
SetEvent
func SetEvent[P any](ctx DBOSContext, key string, message P) error
Create and associate with this workflow an event with key key
and value value
.
If the event already exists, update its value.
Can only be called from within a workflow.
Parameters:
- ctx: The DBOS context.
- key: The key of the event.
- message: The value of the event. Must be serializable.
Send
func Send[P any](ctx DBOSContext, destinationID string, message P, topic string) error
Send a message to the workflow identified by destinationID
.
Messages can optionally be associated with a topic.
Parameters:
- ctx: The DBOS context.
- destinationID: The workflow to which to send the message.
- message: The message to send. Must be serializable.
- topic: A topic with which to associate the message. Messages are enqueued per-topic on the receiver.
Recv
func Recv[R any](ctx DBOSContext, topic string, timeout time.Duration) (R, error)
Receive and return a message sent to this workflow.
Can only be called from within a workflow.
Messages are dequeued first-in, first-out from a queue associated with the topic.
Calls to recv
wait for the next message in the queue, returning an error if the wait times out.
Parameters:
- ctx: The DBOS context.
- topic: A topic queue on which to wait.
- timeoutSeconds: A timeout in seconds. If the wait times out, return an error.
Sleep
func Sleep(ctx DBOSContext, duration time.Duration) (time.Duration, error)
Sleep for the given duration. May only be called from within a workflow. This sleep is durable—it records its intended wake-up time in the database so if it is interrupted and recovers, it still wakes up at the intended time.
Parameters:
- ctx: The DBOS context.
- duration: The duration to sleep.
RetrieveWorkflow
func RetrieveWorkflow[R any](ctx DBOSContext, workflowID string) (*workflowPollingHandle[R], error)
Retrieve the handle of a workflow.
Parameters:
- ctx: The DBOS context.
- workflowID: The ID of the workflow whose handle to retrieve.
Workflow Management Methods
ListWorkflows
func ListWorkflows(ctx DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error)
Retrieve a list of WorkflowStatus
of all workflows matching specified criteria.
Example usage:
// List all successful workflows from the last 24 hours
workflows, err := dbos.ListWorkflows(ctx,
dbos.WithStatus([]dbos.WorkflowStatusType{dbos.WorkflowStatusSuccess}),
dbos.WithStartTime(time.Now().Add(-24*time.Hour)),
dbos.WithLimit(100))
if err != nil {
log.Fatal(err)
}
// List workflows by specific IDs without loading input/output data
workflows, err := dbos.ListWorkflows(ctx,
dbos.WithWorkflowIDs([]string{"workflow1", "workflow2"}),
dbos.WithLoadInput(false),
dbos.WithLoadOutput(false))
if err != nil {
log.Fatal(err)
}
WithAppVersion
func WithAppVersion(appVersion string) ListWorkflowsOption
Retrieve workflows tagged with this application version.
WithEndTime
func WithEndTime(endTime time.Time) ListWorkflowsOption
Retrieve workflows started before this timestamp.
WithLimit
func WithLimit(limit int) ListWorkflowsOption
Retrieve up to this many workflows.
WithLoadInput
func WithLoadInput(loadInput bool) ListWorkflowsOption
WithLoadInput controls whether to load workflow input data (default: true).
WithLoadOutput
func WithLoadOutput(loadOutput bool) ListWorkflowsOption
WithLoadOutput controls whether to load workflow output data (default: true).
WithName
func WithName(name string) ListWorkflowsOption
Filter workflows by the specified workflow function name.
WithOffset
func WithOffset(offset int) ListWorkflowsOption
Skip this many workflows from the results returned (for pagination).
WithSortDesc
func WithSortDesc(sortDesc bool) ListWorkflowsOption
Sort the results in descending (true) or ascending (false) order by workflow start time.
WithStartTime
func WithStartTime(startTime time.Time) ListWorkflowsOption
Retrieve workflows started after this timestamp.
WithStatus
func WithStatus(status []WorkflowStatusType) ListWorkflowsOption
Filter workflows by status. Multiple statuses can be specified.
WithUser
func WithUser(user string) ListWorkflowsOption
Filter workflows run by this authenticated user.
WithWorkflowIDs
func WithWorkflowIDs(workflowIDs []string) ListWorkflowsOption
Filter workflows by specific workflow IDs.
WithWorkflowIDPrefix
func WithWorkflowIDPrefix(prefix string) ListWorkflowsOption
Filter workflows whose IDs start with the specified prefix.
WithQueuesOnly
func WithQueuesOnly() ListWorkflowsOption
Return only workflows that are currently in a queue (queue name is not null, status is ENQUEUED
or PENDING
).
GetWorkflowSteps
func GetWorkflowSteps(ctx DBOSContext, workflowID string) ([]StepInfo, error)
GetWorkflowSteps retrieves the execution steps of a workflow.
This is a list of StepInfo
objects, with the following structure:
type StepInfo struct {
StepID int // The sequential ID of the step within the workflow
StepName string // The name of the step function
Output any // The output returned by the step (if any)
Error error // The error returned by the step (if any)
ChildWorkflowID string // If the step starts or retrieves the result of a workflow, its ID
}
Parameters:
- ctx: The DBOS context.
- workflowID: The ID of the workflow to cancel.
CancelWorkflow
func CancelWorkflow(ctx DBOSContext, workflowID string) error
Cancel a workflow. This sets its status to CANCELLED
, removes it from its queue (if it is enqueued) and preempts its execution (interrupting it at the beginning of its next step).
Parameters:
- ctx: The DBOS context.
- workflowID: The ID of the workflow to cancel.
ResumeWorkflow
func ResumeWorkflow[R any](ctx DBOSContext, workflowID string) (*WorkflowHandle[R], error)
Resume a workflow. This immediately starts it from its last completed step. You can use this to resume workflows that are cancelled or have exceeded their maximum recovery attempts. You can also use this to start an enqueued workflow immediately, bypassing its queue.
Parameters:
- ctx: The DBOS context.
- workflowID: The ID of the workflow to resume.
ForkWorkflow
func ForkWorkflow[R any](ctx DBOSContext, input ForkWorkflowInput) (WorkflowHandle[R], error)
Start a new execution of a workflow from a specific step. The input step ID (startStep
) must match the step number of the step returned by workflow introspection. The specified startStep
is the step from which the new workflow will start, so any steps whose ID is less than startStep
will not be re-executed.
Parameters:
- ctx: The DBOS context.
- input: A
ForkWorkflowInput
struct whereOriginalWorkflowID
is mandatory.
type ForkWorkflowInput struct {
OriginalWorkflowID string // Required: The UUID of the original workflow to fork from
ForkedWorkflowID string // Optional: Custom workflow ID for the forked workflow (auto-generated if empty)
StartStep uint // Optional: Step to start the forked workflow from (default: 0)
ApplicationVersion string // Optional: Application version for the forked workflow (inherits from original if empty)
}
Workflow Status
Some workflow introspection and management methods return a WorkflowStatus
.
This object has the following definition:
type WorkflowStatus struct {
ID string `json:"workflow_uuid"` // Unique identifier for the workflow
Status WorkflowStatusType `json:"status"` // Current execution status
Name string `json:"name"` // Function name of the workflow
AuthenticatedUser *string `json:"authenticated_user"` // User who initiated the workflow (if applicable)
AssumedRole *string `json:"assumed_role"` // Role assumed during execution (if applicable)
AuthenticatedRoles *string `json:"authenticated_roles"` // Roles available to the user (if applicable)
Output any `json:"output"` // Workflow output (available after completion)
Error error `json:"error"` // Error information (if status is ERROR)
ExecutorID string `json:"executor_id"` // ID of the executor running this workflow
CreatedAt time.Time `json:"created_at"` // When the workflow was created
UpdatedAt time.Time `json:"updated_at"` // When the workflow status was last updated
ApplicationVersion string `json:"application_version"` // Version of the application that created this workflow
ApplicationID string `json:"application_id"` // Application identifier
Attempts int `json:"attempts"` // Number of execution attempts
QueueName string `json:"queue_name"` // Queue name (if workflow was enqueued)
Timeout time.Duration `json:"timeout"` // Workflow timeout duration
Deadline time.Time `json:"deadline"` // Absolute deadline for workflow completion
StartedAt time.Time `json:"started_at"` // When the workflow execution actually started
DeduplicationID string `json:"deduplication_id"` // Deduplication identifier (if applicable)
Input any `json:"input"` // Input parameters passed to the workflow
Priority int `json:"priority"` // Execution priority (lower numbers have higher priority)
}
WorkflowStatusType
The WorkflowStatusType
represents the execution status of a workflow:
type WorkflowStatusType string
const (
WorkflowStatusPending WorkflowStatusType = "PENDING" // Workflow is running or ready to run
WorkflowStatusEnqueued WorkflowStatusType = "ENQUEUED" // Workflow is queued and waiting for execution
WorkflowStatusSuccess WorkflowStatusType = "SUCCESS" // Workflow completed successfully
WorkflowStatusError WorkflowStatusType = "ERROR" // Workflow completed with an error
WorkflowStatusCancelled WorkflowStatusType = "CANCELLED" // Workflow was cancelled (manually or due to timeout)
WorkflowStatusMaxRecoveryAttemptsExceeded WorkflowStatusType = "MAX_RECOVERY_ATTEMPTS_EXCEEDED" // Workflow exceeded maximum retry attempts
)
DBOS Variables
GetWorkflowID
func GetWorkflowID(ctx DBOSContext) (string, error)
Return the ID of the current workflow, if in a workflow. Returns an error if not called from within a workflow context.
Parameters:
- ctx: The DBOS context.
GetStepID
func GetStepID(ctx DBOSContext) (string, error)
Return the unique ID of the current step within a workflow. Returns an error if not called from within a step context.
Parameters:
- ctx: The DBOS context.