Integrating with Kafka
In this guide, you'll learn how to use DBOS workflows to process Kafka messages exactly-once.
DBOS supports two popular Kafka clients: KafkaJS and Confluent Kafka.
KafkaJS Integration Tutorial
First, install the DBOS integration for KafkaJS in your application:
npm install @dbos-inc/dbos-kafkajs
Then, define your workflow. It must take in the Kafka topic, partition, and message as inputs:
import { DBOS } from '@dbos-inc/dbos-sdk';
import { KafkaConfig, KafkaMessage} from "kafkajs";
export class KafkaExample{
@DBOS.workflow()
static async kafkaWorkflow(topic: string, partition: number, message: KafkaMessage) {
DBOS.logger.info(`Message received: ${message.value?.toString()}`)
}
}
Then, annotate your method with a @KafkaConsume
decorator specifying which topic to consume from.
Additionally, annotate your class with a @Kafka
decorator defining which brokers to connect to.
DBOS invokes your method exactly-once for each message sent to the topic.
import { DBOS } from "@dbos-inc/dbos-sdk";
import { KafkaConfig, KafkaMessage} from "kafkajs";
import { Kafka, KafkaConsume } from "@dbos-inc/dbos-kafkajs";
const kafkaConfig: KafkaConfig = {
brokers: ['localhost:9092']
}
@Kafka(kafkaConfig)
export class KafkaExample{
@KafkaConsume("example-topic")
@DBOS.workflow()
static async kafkaWorkflow(topic: string, partition: number, message: KafkaMessage) {
DBOS.logger.info(`Message received: ${message.value?.toString()}`)
}
}
If you need more control, you can pass detailed configurations to both the @Kafka
and @KafkaConsume
decorators.
The @Kafka
decorator takes in a KafkaJS configuration object used to configure Kafka for all methods in its class.
The @KafkaConsume
decorator takes in a KafkaJS consumer configuration as an optional second argument.
For example, you can specify a custom consumer group ID:
@KafkaConsume("example-topic", { groupId: "custom-group-id" })
@DBOS.workflow()
static async kafkaWorkflow(topic: string, partition: number, message: KafkaMessage) {
DBOS.logger.info(`Message received: ${message.value?.toString()}`)
}
Under the hood, DBOS constructs an idempotency key for each Kafka message from its topic, partition, and offset and passes it into your workflow. This combination is guaranteed to be unique for each Kafka cluster. Thus, even if a message is delivered multiple times (e.g., due to transient network failures or application interruptions), your workflow processes it exactly once.
KafkaJS Integration Reference
@Kafka
@Kafka(kafkaConfig: KafkaConfig)
Class-level decorator defining a Kafka configuration to use in all class methods. Takes in a KafkaJS configuration object.
@KafkaConsume
@KafkaConsume(topic: string | RegExp | Array<string | RegExp>, consumerConfig?: ConsumerConfig, queueName?: string)
Runs a workflow or transaction exactly-once for each message received on the specified topic(s).
Takes in a Kafka topic or list of Kafka topics (required) and a KafkaJS consumer configuration (optional).
Requires class to be decorated with @Kafka
.
The decorated method must take as input a Kafka topic, partition, and message as in the example below:
import { DBOS } from "@dbos-inc/dbos-sdk";
import { KafkaConfig, KafkaMessage} from "kafkajs";
import { Kafka, KafkaConsume } from "@dbos-inc/dbos-kafkajs";
const kafkaConfig: KafkaConfig = {
brokers: ['localhost:9092']
}
@Kafka(kafkaConfig)
class KafkaExample{
@KafkaConsume("example-topic")
@DBOS.workflow()
static async kafkaWorkflow(topic: string, partition: number, message: KafkaMessage) {
// This workflow executes exactly once for each message sent to "example-topic".
// All methods annotated with Kafka decorators must take in the topic, partition, and message as inputs just like this method.
}
}
Concurrency and Rate Limiting
By default, @KafkaConsume
workflows are started immediately upon receiving Kafka messages. If queueName
is provided to the @KafkaConsume
decorator, then the workflows will be enqueued in a workflow queue and subject to rate limits.
Confluent Kafka Integration Tutorial
First, install the DBOS integration for Confluent's JavaScript Client for Apache Kafka in your application:
npm install @dbos-inc/dbos-confluent-kafka
Then, define your workflow. It must take in the Kafka topic, partition, and message as inputs:
import { DBOS } from '@dbos-inc/dbos-sdk';
import { KafkaConfig, Message } from "@dbos-inc/dbos-confluent-kafka";
export class CKafkaExample{
@DBOS.workflow()
static async kafkaWorkflow(topic: string, partition: number, message: Message) {
DBOS.logger.info(`Message received: ${message.value?.toString()}`)
}
}
Then, annotate your method with a @CKafkaConsume
decorator specifying which topic to consume from.
Additionally, annotate your class with a @CKafka
decorator defining which brokers to connect to.
DBOS invokes your method exactly-once for each message sent to the topic.
import { DBOS } from "@dbos-inc/dbos-sdk";
import { KafkaConfig, Message, CKafka, CKafkaConsume } from "@dbos-inc/dbos-confluent-kafka";
const kafkaConfig: KafkaConfig = {
brokers: ['localhost:9092']
}
@CKafka(kafkaConfig)
export class CKafkaExample{
@CKafkaConsume("example-topic")
@DBOS.workflow()
static async kafkaWorkflow(topic: string, partition: number, message: Message) {
DBOS.logger.info(`Message received: ${message.value?.toString()}`)
}
}
If you need more control, you can pass detailed configurations to both the @CKafka
and @CKafkaConsume
decorators.
The @CKafka
and @CKafkaConsume
decorators take in a configuration object used to configure Kafka for all methods in its class. You can also use KafkaJS-like configuration options.
For example, you can specify a custom consumer group ID:
@CKafkaConsume("example-topic", { groupId: "custom-group-id" })
@DBOS.workflow()
static async kafkaWorkflow(topic: string, partition: number, message: Message) {
DBOS.logger.info(`Message received: ${message.value?.toString()}`)
}
Under the hood, DBOS constructs an idempotency key for each Kafka message from its topic, partition, and offset and passes it into your workflow. This combination is guaranteed to be unique for each Kafka cluster. Thus, even if a message is delivered multiple times (e.g., due to transient network failures or application interruptions), your workflow processes it exactly once.
Confluent Kafka Integration Reference
@CKafka
@CKafka(kafkaConfig: KafkaConfig)
Class-level decorator defining a Kafka configuration to use in all class methods. Takes in a KafkaJS configuration object or rdkafka configuration object.
@CKafkaConsume
@CKafkaConsume(topic: string | RegExp | Array<string | RegExp>, consumerConfig?: ConsumerConfig, queueName?: string)
Runs a workflow or transaction exactly-once for each message received on the specified topic(s).
Takes in a Kafka topic or list of Kafka topics (required) and a consumer configuration.
Requires class to be decorated with @CKafka
.
The decorated method must take as input a Kafka topic, partition, and message as in the example below:
import { DBOS } from "@dbos-inc/dbos-sdk";
import { KafkaConfig, Message, CKafka, CKafkaConsume } from "@dbos-inc/dbos-confluent-kafka";
const kafkaConfig: KafkaConfig = {
brokers: ['localhost:9092']
}
@CKafka(kafkaConfig)
class CKafkaExample{
@CKafkaConsume("example-topic")
@DBOS.workflow()
static async kafkaWorkflow(topic: string, partition: number, message: KafkaMessage) {
// This workflow executes exactly once for each message sent to "example-topic".
// All methods annotated with CKafka decorators must take in the topic, partition, and message as inputs just like this method.
}
}
Concurrency and Rate Limiting
By default, @CKafkaConsume
workflows are started immediately upon receiving Kafka messages. If queueName
is provided to the @CKafkaConsume
decorator, then the workflows will be enqueued in a workflow queue and subject to rate limits.