Kafka & SQS Integration
In this guide, you'll learn how to use DBOS workflows to process Kafka or Simple Queue Service (SQS) messages with exactly-once semantics.
Installation
First, install an event receiver library:
- KafkaJS
- Confluent Kafka
- SQS
npm i @dbos-inc/kafkajs-receive
npm i @dbos-inc/confluent-kafka-receive
npm i @dbos-inc/sqs-receive
Creating a Receiver
The DBOS event receiver classes connect their underlying client libraries to workflows. First, construct a DBOS event receiver instance, which is requires an underlying library object or configuration:
- KafkaJS
- Confluent Kafka
- SQS
import { Kafka } from 'kafkajs';
import { KafkaReceiver } from '@dbos-inc/kafkajs-receive';
const kafkaReceiver = new KafkaReceiver(kafkaConfig);
The KafkaReceiver
constructor takes a KafkaJS configuration as its argument.
import { ConfluentKafkaReceiver } from '..';
import { KafkaJS as ConfluentKafkaJS } from '@confluentinc/kafka-javascript';
const kafkaReceiver = new ConfluentKafkaReceiver(kafkaConfig);
The ConfluentKafkaReceiver
constructor takes a configuration as its argument.
import { SQSClient } from '@aws-sdk/client-sqs';
import { SQSReceiver } from '@dbos-inc/sqs-receive';
function createSQS() {
return new SQSClient({ /*configuration per SQS library...*/ });
}
const sqsReceiver = new SQSReceiver({
client: createSQS,
});
The SQSReceiver
constructor takes either an SQSClient
instance, or a function to provide it. See the configuration reference below.
Registering Workflow Functions
Once a receiver object is created, it can be used to connect specific incoming messages to DBOS workflow functions:
- KafkaJS
- Confluent Kafka
- SQS
The KafkaJS receiver can be used in two ways. The @consumer
decorator connects a static
class workflow method to the receiver:
@kafkaReceiver.consumer('my-topic')
@DBOS.workflow()
static async stringTopic(topic: string, partition: number, message: KafkaMessage) {
//...
}
Alternatively, the registerConsumer
function on the receiver will connect a workflow function to the receiver.
async function myWorkflowFunction(topic: string, partition: number, message: KafkaMessage) { ... }
kafkaReceiver.registerConsumer(DBOS.registerWorkflow(myWorkflowFunction), 'my-topic');
The ConfluentKafkaReceiver
instance can be used in two ways. The @consumer
decorator connects a static
class workflow method to the receiver:
@kafkaReceiver.consumer('my-topic')
@DBOS.workflow()
static async stringTopic(topic: string, partition: number, message: ConfluentKafkaJS.Message) {
//...
}
Alternatively, the registerConsumer
function on the receiver will connect a workflow function to the receiver.
async function myWorkflowFunction(topic: string, partition: number, message: ConfluentKafkaJS.Message) { ... }
kafkaReceiver.registerConsumer(DBOS.registerWorkflow(myWorkflowFunction), 'my-topic');
The SQSReceiver
instance provides a decorator for connecting static
class workflow methods to message receipt:
@sqsReceiver.messageConsumer({ queueUrl: process.env['SQS_QUEUE_URL']})
@DBOS.workflow()
static async recvMessage(msg: Message) {
//...
}
Note that the messageConsumer
configuration can override all configuration provided to the receiver instance, including the client.
Note that the function signatures should match those above, as these match the arguments that are provided by the event receivers.
Deduplicating Messages
DBOS event receivers use a workflow id to ensure that messages are processed exactly once. This key is computed from the message.
- KafkaJS
- Confluent Kafka
- SQS
The message topic, partition, and offset uniquely identify a Kafka message, and are used to ensure that only one DBOS workflow is executed per message.
The message topic, partition, and offset uniquely identify a Kafka message, and are used to ensure that only one DBOS workflow is executed per message.
AWS SQS messages have unique IDs assigned, which are used by default to create workflow IDs. However, SQS messages may be sent more than once by the sender, so an option is provided to generate IDs from the message contents.
Rate-Limiting Message Processing
By default, event receivers start new workflows immediately upon message receipt. If message processing should be rate-limited, DBOS queues can be used. Generally, the queue name is provided as a parameter; see configuration for details.
Sending Messages
The DBOS libraries for Kafka and SQS do not include code for sending messages. Messages should be sent using the underlying messaging library, but wrapped in DBOS steps.
- KafkaJS
- Confluent Kafka
- SQS
// Setup ...
const kafka = new Kafka(kafkaConfig);
producer = kafka.producer();
// ... produce messages during workflow processing
await DBOS.runStep(async () => {
await producer.send({ topic, messages: [{ value: message }] });
});
// ... shutdown
await producer?.disconnect();
// Setup ...
const kafka = new Kafka(kafkaConfig);
producer = kafka.producer();
// ... produce messages during workflow processing
await DBOS.runStep(async () => {
await producer.send({ topic, messages: [{ value: message }] });
});
// ... shutdown
await producer?.disconnect();
// Setup ...
const sqs = new SQSClient(sqsConfig);
// ... produce messages during workflow processing
await DBOS.runStep(async () => {
await sqs.send(new SendMessageCommand(message));
});
// SQS client - no cleanup
Configuration Reference
- KafkaJS
- Confluent Kafka
- SQS
DBOS receivers consume kafka messages from topics and initiate workflows. The topic(s) may be specified as a string, regular expression, or an array of strings and regular expressions.
export type ConsumerTopics = string | RegExp | Array<string | RegExp>;
Options for the decorator and registerConsumer
are the same:
queueName
: If specified, workflows for processing messages will be enqueuedconfig
: Configuration, as specified by the underlying kafka library
registerConsumer<This, Return>(
func: (this: This, ...args: KafkaArgs) => Promise<Return>,
topics: ConsumerTopics,
options: {
queueName?: string;
config?: ConsumerConfig;
} = {},
);
consumer(
topics: ConsumerTopics,
options: {
queueName?: string;
config?: ConsumerConfig
}
);
DBOS receivers consume kafka messages from topics and initiate workflows. The topic(s) may be specified as a string, regular expression, or an array of strings and regular expressions.
export type ConsumerTopics = string | RegExp | Array<string | RegExp>;
Options for the decorator and registerConsumer
are the same:
queueName
: If specified, workflows for processing messages will be enqueuedconfig
: Configuration, as specified by the underlying kafka library
export type ConsumerTopics = string | RegExp | Array<string | RegExp>;
registerConsumer<This, Return>(
func: (this: This, ...args: KafkaArgs) => Promise<Return>,
topics: ConsumerTopics,
options: {
queueName?: string;
config?: KafkaJS.ConsumerConstructorConfig;
} = {},
)
consumer(
topics: ConsumerTopics,
options: {
queueName?: string;
config?: KafkaJS.ConsumerConstructorConfig
}
);
SQS message receipt can be configured at the receiver, class, or method level, with method-level configuration items overriding the class- or receiver-level defaults.
Configuration items are:
client
: Fully configured SQS client, or a function to get itworkflowQueueName
: If specified, workflows for processing messages will be enqueued to the named queuequeueUrl
: SQS Queue URL (or part) for receiving messagesgetWorkflowKey
: Optional function to calculate a workflow key from a message; if not specified, the message ID will be used
interface SQSConfig {
client?: SQSClient | (() => SQSClient);
queueUrl?: string;
getWorkflowKey?: (m: Message) => string;
workflowQueueName?: string;
}