DBOS Client
Client provides a programmatic way to interact with your DBOS application from external code.
Client includes methods similar to DBOSContext that can be used outside of a DBOS application.
Client is included in the dbos package, the same package that is used by DBOS applications.
Where DBOS applications use the DBOSContext methods,
external applications use Client methods instead.
type Client interface {
Enqueue(queueName, workflowName string, input any, opts ...EnqueueOption) (WorkflowHandle[any], error)
ListWorkflows(opts ...ListWorkflowsOption) ([]WorkflowStatus, error)
Send(destinationID string, message any, topic string) error
GetEvent(targetWorkflowID, key string, timeout time.Duration) (any, error)
RetrieveWorkflow(workflowID string) (WorkflowHandle[any], error)
CancelWorkflow(workflowID string) error
CancelWorkflows(workflowIDs []string) error
SetWorkflowDelay(workflowID string, opts ...SetWorkflowDelayOption) error
ResumeWorkflow(workflowID string, opts ...ResumeWorkflowOption) (WorkflowHandle[any], error)
ResumeWorkflows(workflowIDs []string, opts ...ResumeWorkflowOption) ([]WorkflowHandle[any], error)
ForkWorkflow(input ForkWorkflowInput) (WorkflowHandle[any], error)
ListApplicationVersions() ([]VersionInfo, error)
GetLatestApplicationVersion() (*VersionInfo, error)
SetLatestApplicationVersion(versionName string) error
GetWorkflowSteps(workflowID string) ([]StepInfo, error)
ClientReadStream(workflowID string, key string) ([]any, bool, error)
ClientReadStreamAsync(workflowID string, key string) (<-chan StreamValue[any], error)
CreateSchedule(input ClientScheduleInput) error
ApplySchedules(schedules []ClientScheduleInput) error
GetSchedule(scheduleName string) (*WorkflowSchedule, error)
ListSchedules(opts ...ListSchedulesOption) ([]WorkflowSchedule, error)
PauseSchedule(scheduleName string) error
ResumeSchedule(scheduleName string) error
DeleteSchedule(scheduleName string) error
BackfillSchedule(scheduleName string, start, end time.Time) ([]string, error)
TriggerSchedule(scheduleName string) (WorkflowHandle[any], error)
Shutdown(timeout time.Duration)
}
Constructor
func NewClient(ctx context.Context, config ClientConfig) (Client, error)
Parameters:
ctx: A context for initialization operationsconfig: AClientConfigobject with connection and application settings
type ClientConfig struct {
DatabaseURL string // Connection string to your system database. May be a PostgreSQL (postgres://...) or SQLite (sqlite:...) URL. Exactly one of DatabaseURL, SystemDBPool, or SqliteSystemDB is required.
SystemDBPool *pgxpool.Pool // A custom Postgres/CockroachDB connection pool. Optional; takes precedence over DatabaseURL. Mutually exclusive with SqliteSystemDB.
SqliteSystemDB *sql.DB // A custom SQLite handle (e.g. from modernc.org/sqlite). Optional; takes precedence over DatabaseURL. Mutually exclusive with SystemDBPool.
DatabaseSchema string // Database schema name (defaults to "dbos"; Postgres only)
Logger *slog.Logger // Optional custom logger
}
Returns:
- A new
Clientinstance or an error if initialization fails
Example syntax:
This DBOS client connects to the system database specified in the configuration:
config := dbos.ClientConfig{
DatabaseURL: os.Getenv("DBOS_SYSTEM_DATABASE_URL"),
}
client, err := dbos.NewClient(context.Background(), config)
if err != nil {
log.Fatal(err)
}
defer client.Shutdown(5 * time.Second)
A client manages a connection pool to the DBOS system database. Calling Shutdown on a client will release the connection pool.
Shutdown
Shutdown(timeout time.Duration)
Gracefully shuts down the client and releases the system database connection pool.
Parameters:
timeout: Maximum time to wait for graceful shutdown
Workflow Interaction Methods
Enqueue
func Enqueue[P any, R any](
c Client,
queueName string,
workflowName string,
input P,
opts ...EnqueueOption
) (WorkflowHandle[R], error)
Enqueue a workflow for processing and return a handle to it, similar to RunWorkflow with the WithQueue option. Returns a WorkflowHandle.
When enqueuing a workflow from the DBOS client, you must specify the name of the workflow to enqueue (rather than passing a workflow function as with RunWorkflow.)
Required parameters:
c: The DBOS client instancequeueName: The name of the queue on which to enqueue the workflowworkflowName: The name of the workflow function being enqueuedinput: The input to pass to the workflow
Optional configuration via EnqueueOption:
WithEnqueueWorkflowID(id string): The unique ID for the enqueued workflow. If left undefined, DBOS Client will generate a UUID. Please see Workflow IDs and Idempotency for more information.WithEnqueueApplicationVersion(version string): The version of your application that should process this workflow. If left undefined, it will use the current application version.WithEnqueueTimeout(timeout time.Duration): Set a timeout for the enqueued workflow. When the timeout expires, the workflow and all its children are cancelled (except if the child's context has been made uncancellable usingWithoutCancel). The timeout does not begin until the workflow is dequeued and starts execution.WithEnqueueDeduplicationID(id string): At any given time, only one workflow with a specific deduplication ID can be enqueued in the specified queue. If a workflow with a deduplication ID is currently enqueued or actively executing (statusENQUEUEDorPENDING), subsequent workflow enqueue attempts with the same deduplication ID in the same queue will fail.WithEnqueueDeduplicationPolicy(policy DeduplicationPolicy): Set how a colliding deduplication ID is handled. RequiresWithEnqueueDeduplicationID. With the defaultDeduplicationPolicyReject, a colliding enqueue fails with aQueueDeduplicatederror; withDeduplicationPolicyReturnExisting, it instead returns a handle to the existing workflow. SeeWithDeduplicationPolicy.WithEnqueuePriority(priority uint): The priority of the enqueued workflow in the specified queue. Workflows with the same priority are dequeued in FIFO (first in, first out) order. Priority values can range from1to2,147,483,647, where a low number indicates a higher priority. Workflows without assigned priorities have the highest priority and are dequeued before workflows with assigned priorities.WithEnqueueClassName(className string): The class/namespace name for the target workflow. Required when enqueueing to Python, TypeScript, or Java targets, which dispatch workflows by (class_name, workflow_name) pair.WithEnqueueConfigName(configName string): The config/instance name for the target workflow. Required when enqueueing to a workflow registered on a configured instance: a Go workflow registered withWithInstance, or a Python, TypeScript, or Java class instance workflow (e.g., Python'sDBOSConfiguredInstance, TypeScript'sConfiguredInstance). The value must match the instance name used by the target application.WithEnqueueDelay(delay time.Duration): Delay execution of the enqueued workflow by the specified duration. The workflow is initially placed inDELAYEDstatus and transitions toENQUEUEDafter the delay expires. The delay can later be updated viaSetWorkflowDelay.WithEnqueueQueuePartitionKey(partitionKey string): The partition key to enqueue under when the target queue is a partitioned queue. Each partition has its own concurrency limits.
To enqueue a workflow on a target application written in another language, pass a PortableWorkflowArgs as the input.
This automatically uses portable JSON serialization.
See Cross-Language Interaction for details.
Example syntax:
type ProcessInput struct {
TaskID string
Data string
}
type ProcessOutput struct {
Result string
Status string
}
handle, err := dbos.Enqueue[ProcessInput, ProcessOutput](
client,
"process_queue",
"ProcessWorkflow",
ProcessInput{TaskID: "task-123", Data: "data"},
dbos.WithEnqueueTimeout(30 * time.Minute),
dbos.WithEnqueuePriority(5),
)
if err != nil {
log.Fatal(err)
}
result, err := handle.GetResult()
if err != nil {
log.Printf("Workflow failed: %v", err)
} else {
log.Printf("Result: %+v", result)
}
RetrieveWorkflow
RetrieveWorkflow(workflowID string) (WorkflowHandle[any], error)
Retrieve the handle of a workflow with identity workflowID.
Similar to RetrieveWorkflow.
Parameters:
workflowID: The identifier of the workflow whose handle to retrieve
Returns:
- The WorkflowHandle of the workflow whose ID is
workflowID
Send
Send(destinationID string, message any, topic string) error
Sends a message to a specified workflow. Similar to Send.
Parameters:
destinationID: The workflow to which to send the messagemessage: The message to send. Must be serializabletopic: A topic with which to associate the message. Messages are enqueued per-topic on the receiver
GetEvent
GetEvent(targetWorkflowID, key string, timeout time.Duration) (any, error)
Retrieve the latest value of an event published by the workflow identified by targetWorkflowID to the key key.
If the event does not yet exist, wait for it to be published, returning an error if the wait times out.
Similar to GetEvent.
Parameters:
targetWorkflowID: The identifier of the workflow whose events to retrievekey: The key of the event to retrievetimeout: A timeout duration. If the wait times out, return an error
Returns:
- The value of the event published by
targetWorkflowIDwith namekey, or an error if the wait times out
Workflow Management Methods
ListWorkflows
ListWorkflows(opts ...ListWorkflowsOption) ([]WorkflowStatus, error)
Retrieve a list of WorkflowStatus of all workflows matching specified criteria.
Similar to ListWorkflows.
Options:
Options are provided via ListWorkflowsOption functions. See ListWorkflows for available options.
The client ListWorkflows method does not include workflow inputs and outputs in its results.
GetWorkflowSteps
GetWorkflowSteps(workflowID string) ([]StepInfo, error)
List the steps of a given workflow. Returned entries do not include step outputs.
CancelWorkflow
CancelWorkflow(workflowID string) error
Cancel a workflow.
This sets its status to CANCELLED, removes it from its queue (if it is enqueued) and preempts its execution (interrupting it at the beginning of its next step).
Similar to CancelWorkflow.
CancelWorkflows
CancelWorkflows(workflowIDs []string) error
Cancel multiple workflows in a single database round-trip.
Missing or already-terminal IDs are silently skipped.
Similar to CancelWorkflows.
SetWorkflowDelay
SetWorkflowDelay(workflowID string, opts ...SetWorkflowDelayOption) error
Set or update the delay on a DELAYED workflow.
Provide exactly one of WithDelayDuration (relative) or WithDelayUntil (absolute).
Similar to SetWorkflowDelay.
ResumeWorkflow
ResumeWorkflow(workflowID string, opts ...ResumeWorkflowOption) (WorkflowHandle[any], error)
Resume a workflow.
This immediately starts it from its last completed step.
You can use this to resume workflows that are cancelled or have exceeded their maximum recovery attempts.
You can also use this to start an enqueued workflow immediately, bypassing its queue.
Pass WithResumeQueue to re-enqueue the resumed workflow on a named queue instead of starting it immediately.
Similar to ResumeWorkflow.
ResumeWorkflows
ResumeWorkflows(workflowIDs []string, opts ...ResumeWorkflowOption) ([]WorkflowHandle[any], error)
Resume multiple workflows in a single database round-trip.
Accepts the same options as ResumeWorkflow.
Similar to ResumeWorkflows.
ForkWorkflow
ForkWorkflow(input ForkWorkflowInput) (WorkflowHandle[any], error)
Set QueueName on the input to enqueue the forked workflow on a named queue instead of starting it immediately.
Similar to ForkWorkflow.
Debouncer
NewDebouncerClient
func NewDebouncerClient[P any, R any](workflowName string, client Client, opts ...DebouncerOption) *DebouncerClient[P, R]
Create a new debouncer client for use from outside a DBOS application.
Similar to NewDebouncer but uses a Client instead of a DBOSContext and takes a workflow name string instead of a function reference.
Parameters:
workflowName: The name of the workflow to debounce.client: The DBOS client to use for operations.opts: Optional configuration, documented below.
WithDebouncerTimeout
func WithDebouncerTimeout(timeout time.Duration) DebouncerOption
Set the maximum time before starting the workflow, measured from the first debounce call for a given key. If the timeout is zero (the default), there is no maximum time limit and calling the workflow can be pushed back indefinitely.
WithDebouncerConfigName
func WithDebouncerConfigName(configName string) DebouncerOption
Target the workflow registration bound to the configured instance with the given config name (see WithInstance).
Required when the debounced workflow is a method of a configured instance.
Use with NewDebouncerClient, where the instance object itself is not available.
dc := dbos.NewDebouncerClient[string, string]("Send", client,
dbos.WithDebouncerConfigName("slack"))
DebouncerClient.Debounce
func (dc *DebouncerClient[P, R]) Debounce(key string, delay time.Duration, input P, opts ...WorkflowOption) (WorkflowHandle[R], error)
Debounce a workflow invocation from outside a DBOS application.
Behaves the same as Debouncer.Debounce but does not require a DBOSContext.
Parameters:
key: A unique key to group debounce calls.delay: Time by which to delay workflow execution.input: Input parameters to pass to the workflow.opts: Optional workflow options.
Schedule Management
Client exposes the same workflow scheduling operations as the DBOS context, with the important difference that workflows are identified by name (string) rather than by function reference.
See the scheduled workflows tutorial for an overview.
ClientScheduleInput
type ClientScheduleInput struct {
ScheduleName string // Required: unique schedule name
WorkflowName string // Required: target workflow name (FQN or custom name)
WorkflowClassName string // Optional: class/namespace name (required for Python/TS/Java targets that dispatch by class)
Schedule string // Required: cron expression
Context any // Optional: user-defined context (JSON-serialized)
AutomaticBackfill bool // Optional
CronTimezone string // Optional: IANA timezone name
QueueName string // Optional: target queue
}
Client.CreateSchedule
CreateSchedule(input ClientScheduleInput) error
Create a new schedule. Similar to CreateSchedule.
Example:
err := client.CreateSchedule(dbos.ClientScheduleInput{
ScheduleName: "my-schedule",
WorkflowName: "myPeriodicTask",
Schedule: "*/5 * * * *",
Context: "my context",
})
Client.ApplySchedules
ApplySchedules(schedules []ClientScheduleInput) error
Atomically create or replace the given schedules. Similar to ApplySchedules.
Client.GetSchedule
GetSchedule(scheduleName string) (*WorkflowSchedule, error)
Retrieve a WorkflowSchedule by name. Returns (nil, nil) if not found.
Client.ListSchedules
ListSchedules(opts ...ListSchedulesOption) ([]WorkflowSchedule, error)
List schedules, optionally filtered. Accepts the same options as ListSchedules.
Client.PauseSchedule
PauseSchedule(scheduleName string) error
Pause a schedule so it stops firing.
Client.ResumeSchedule
ResumeSchedule(scheduleName string) error
Resume a paused schedule.
Client.DeleteSchedule
DeleteSchedule(scheduleName string) error
Delete a schedule.
Client.BackfillSchedule
BackfillSchedule(scheduleName string, start, end time.Time) ([]string, error)
Backfill missed executions for the range [start, end], returning the IDs of the enqueued workflows. Similar to BackfillSchedule.
Client.TriggerSchedule
TriggerSchedule(scheduleName string) (WorkflowHandle[any], error)
Trigger a schedule to fire immediately, returning a handle for the enqueued workflow.