Skip to main content

Queues

Workflow queues allow you to ensure that workflow functions will be run, without starting them immediately. Queues are useful for controlling the number of workflows run in parallel, or the rate at which they are started.

All queues should be created before DBOS is launched.

NewWorkflowQueue

func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...queueOption) WorkflowQueue

NewWorkflowQueue creates a new workflow queue with the specified name and configuration options. Queues must be created before DBOS is launched. You can enqueue a workflow using the WithQueue parameter of RunWorkflow.

Parameters:

  • dbosCtx: The DBOSContext.
  • name: The name of the queue. Must be unique among all queues in the application.
  • options: Functional options for the queue, documented below.

Example Syntax:

queue := dbos.NewWorkflowQueue(ctx, "email-queue",
dbos.WithWorkerConcurrency(5),
dbos.WithRateLimiter(&dbos.RateLimiter{
Limit: 100,
Period: 60 * time.Second, // 100 workflows per minute
}),
dbos.WithPriorityEnabled(),
)

// Enqueue workflows to this queue:
handle, err := dbos.RunWorkflow(ctx, SendEmailWorkflow, emailData, dbos.WithQueue("email-queue"))

WithWorkerConcurrency

func WithWorkerConcurrency(concurrency int) queueOption

Set the maximum number of workflows from this queue that may run concurrently within a single DBOS process.

WithGlobalConcurrency

func WithGlobalConcurrency(concurrency int) queueOption

Set the maximum number of workflows from this queue that may run concurrently. Defaults to 0 (no limit). This concurrency limit is global across all DBOS processes using this queue.

WithMaxTasksPerIteration

func WithMaxTasksPerIteration(maxTasks int) queueOption

Sets the maximum number of workflows that can be dequeued in a single iteration. This controls batch sizes for queue processing.

WithPriorityEnabled

func WithPriorityEnabled() queueOption

Enable setting priority for workflows on this queue.

WithRateLimiter

func WithRateLimiter(limiter *RateLimiter) queueOption
type RateLimiter struct {
Limit int // Maximum number of workflows to start within the period
Period time.Duration // Time period for the rate limit
}

A limit on the maximum number of functions which may be started in a given period.