Skip to main content

Kafka & SQS Integration

In this guide, you'll learn how to use DBOS workflows to process Kafka or Simple Queue Service (SQS) messages with exactly-once semantics.

Installation

First, install an event receiver library:

npm i @dbos-inc/kafkajs-receive

Creating a Receiver

The DBOS event receiver classes connect their underlying client libraries to workflows. First, construct a DBOS event receiver instance, which is requires an underlying library object or configuration:

import { Kafka } from 'kafkajs';
import { KafkaReceiver } from '@dbos-inc/kafkajs-receive';

const kafkaReceiver = new KafkaReceiver(kafkaConfig);

The KafkaReceiver constructor takes a KafkaJS configuration as its argument.

Registering Workflow Functions

Once a receiver object is created, it can be used to connect specific incoming messages to DBOS workflow functions:

The KafkaJS receiver can be used in two ways. The @consumer decorator connects a static class workflow method to the receiver:

@kafkaReceiver.consumer('my-topic')
@DBOS.workflow()
static async stringTopic(topic: string, partition: number, message: KafkaMessage) {
//...
}

Alternatively, the registerConsumer function on the receiver will connect a workflow function to the receiver.

async function myWorkflowFunction(topic: string, partition: number, message: KafkaMessage) { ... }
kafkaReceiver.registerConsumer(DBOS.registerWorkflow(myWorkflowFunction), 'my-topic');

Note that the function signatures should match those above, as these match the arguments that are provided by the event receivers.

Deduplicating Messages

DBOS event receivers use a workflow id to ensure that messages are processed exactly once. This key is computed from the message.

The message topic, partition, and offset uniquely identify a Kafka message, and are used to ensure that only one DBOS workflow is executed per message.

Rate-Limiting Message Processing

By default, event receivers start new workflows immediately upon message receipt. If message processing should be rate-limited, DBOS queues can be used. Generally, the queue name is provided as a parameter; see configuration for details.

Sending Messages

The DBOS libraries for Kafka and SQS do not include code for sending messages. Messages should be sent using the underlying messaging library, but wrapped in DBOS steps.

// Setup ...
const kafka = new Kafka(kafkaConfig);
producer = kafka.producer();

// ... produce messages during workflow processing
await DBOS.runStep(async () => {
await producer.send({ topic, messages: [{ value: message }] });
});

// ... shutdown
await producer?.disconnect();

Configuration Reference

DBOS receivers consume kafka messages from topics and initiate workflows. The topic(s) may be specified as a string, regular expression, or an array of strings and regular expressions.

export type ConsumerTopics = string | RegExp | Array<string | RegExp>;

Options for the decorator and registerConsumer are the same:

  • queueName: If specified, workflows for processing messages will be enqueued
  • config: Configuration, as specified by the underlying kafka library
registerConsumer<This, Return>(
func: (this: This, ...args: KafkaArgs) => Promise<Return>,
topics: ConsumerTopics,
options: {
queueName?: string;
config?: ConsumerConfig;
} = {},
);

consumer(
topics: ConsumerTopics,
options: {
queueName?: string;
config?: ConsumerConfig
}
);