Skip to main content

In this guide, you'll learn how to use DBOS transactions and workflows to process Kafka messages with exactly-once semantics.

As there is more than one Kafka client for the JavaScript ecosystem, DBOS supports pluggable libraries.

KafkaJS

First, install the DBOS library for KafkaJS in your application:

npm install @dbos-inc/dbos-kafkajs

Then, define your transaction or workflow. It must take in the Kafka topic, partition, and message as inputs:

import { DBOS } from '@dbos-inc/dbos-sdk';
import { KafkaConfig, KafkaMessage} from "kafkajs";

export class KafkaExample{
@DBOS.workflow()
static async kafkaWorkflow(topic: string, partition: number, message: KafkaMessage) {
DBOS.logger.info(`Message received: ${message.value?.toString()}`)
}
}

Then, annotate your method with a @KafkaConsume decorator specifying which topic to consume from. Additionally, annotate your class with a @Kafka decorator defining which brokers to connect to. DBOS invokes your method exactly-once for each message sent to the topic.

import { DBOS } from "@dbos-inc/dbos-sdk";
import { KafkaConfig, KafkaMessage} from "kafkajs";
import { Kafka, KafkaConsume } from "@dbos-inc/dbos-kafkajs";

const kafkaConfig: KafkaConfig = {
brokers: ['localhost:9092']
}

@Kafka(kafkaConfig)
export class KafkaExample{
@KafkaConsume("example-topic")
@DBOS.workflow()
static async kafkaWorkflow(topic: string, partition: number, message: KafkaMessage) {
DBOS.logger.info(`Message received: ${message.value?.toString()}`)
}
}

If you need more control, you can pass detailed configurations to both the @Kafka and @KafkaConsume decorators. The @Kafka decorator takes in a KafkaJS configuration object used to configure Kafka for all methods in its class. The @KafkaConsume decorator takes in a KafkaJS consumer configuration as an optional second argument. For example, you can specify a custom consumer group ID:

@KafkaConsume("example-topic", { groupId: "custom-group-id" })
@DBOS.workflow()
static async kafkaWorkflow(topic: string, partition: number, message: KafkaMessage) {
DBOS.logger.info(`Message received: ${message.value?.toString()}`)
}

Under the hood, DBOS constructs an idempotency key for each Kafka message from its topic, partition, and offset and passes it into your workflow or transaction. This combination is guaranteed to be unique for each Kafka cluster. Thus, even if a message is delivered multiple times (e.g., due to transient network failures or application interruptions), your transaction or workflow processes it exactly once.

KafkaJS Integration Decorators

@Kafka(kafkaConfig: KafkaConfig)

Class-level decorator defining a Kafka configuration to use in all class methods. Takes in a KafkaJS configuration object.

@KafkaConsume(topic: string | RegExp | Array<string | RegExp>, consumerConfig?: ConsumerConfig, queueName?: string)

Runs a transaction or workflow exactly-once for each message received on the specified topic(s). Takes in a Kafka topic or list of Kafka topics (required) and a KafkaJS consumer configuration (optional). Requires class to be decorated with @Kafka. The decorated method must take as input a Kafka topic, partition, and message as in the example below:

import { DBOS } from "@dbos-inc/dbos-sdk";
import { KafkaConfig, KafkaMessage} from "kafkajs";
import { Kafka, KafkaConsume } from "@dbos-inc/dbos-kafkajs";

const kafkaConfig: KafkaConfig = {
brokers: ['localhost:9092']
}

@Kafka(kafkaConfig)
class KafkaExample{

@KafkaConsume("example-topic")
@DBOS.workflow()
static async kafkaWorkflow(topic: string, partition: number, message: KafkaMessage) {
// This workflow executes exactly once for each message sent to "example-topic".
// All methods annotated with Kafka decorators must take in the topic, partition, and message as inputs just like this method.
}
}

Concurrency and Rate Limiting

By default, @KafkaConsume workflows are started immediately upon receiving Kafka messages. If queueName is provided to the @KafkaConsume decorator, then the workflows will be enqueued in a workflow queue and subject to rate limits.

Confluent's JavaScript Client for Apache Kafka

First, install the DBOS library for Confluent's JavaScript Client for Apache Kafka in your application:

npm install @dbos-inc/dbos-confluent-kafka

Then, define your transaction or workflow. It must take in the Kafka topic, partition, and message as inputs:

import { DBOS } from '@dbos-inc/dbos-sdk';
import { KafkaConfig, Message } from "@dbos-inc/dbos-confluent-kafka";

export class CKafkaExample{
@DBOS.workflow()
static async kafkaWorkflow(topic: string, partition: number, message: Message) {
DBOS.logger.info(`Message received: ${message.value?.toString()}`)
}
}

Then, annotate your method with a @CKafkaConsume decorator specifying which topic to consume from. Additionally, annotate your class with a @CKafka decorator defining which brokers to connect to. DBOS invokes your method exactly-once for each message sent to the topic.

import { DBOS } from "@dbos-inc/dbos-sdk";
import { KafkaConfig, Message, CKafka, CKafkaConsume } from "@dbos-inc/dbos-confluent-kafka";

const kafkaConfig: KafkaConfig = {
brokers: ['localhost:9092']
}

@CKafka(kafkaConfig)
export class CKafkaExample{
@CKafkaConsume("example-topic")
@DBOS.workflow()
static async kafkaWorkflow(topic: string, partition: number, message: Message) {
DBOS.logger.info(`Message received: ${message.value?.toString()}`)
}
}

If you need more control, you can pass detailed configurations to both the @CKafka and @CKafkaConsume decorators. The @CKafka and @CKafkaConsume decorators take in a configuration object used to configure Kafka for all methods in its class. You can also use KafkaJS-like configuration options.

For example, you can specify a custom consumer group ID:

@CKafkaConsume("example-topic", { groupId: "custom-group-id" })
@DBOS.workflow()
static async kafkaWorkflow(topic: string, partition: number, message: Message) {
DBOS.logger.info(`Message received: ${message.value?.toString()}`)
}

Under the hood, DBOS constructs an idempotency key for each Kafka message from its topic, partition, and offset and passes it into your workflow or transaction. This combination is guaranteed to be unique for each Kafka cluster. Thus, even if a message is delivered multiple times (e.g., due to transient network failures or application interruptions), your transaction or workflow processes it exactly once.

Confluent Kafka Integration Decorators

@CKafka(kafkaConfig: KafkaConfig)

Class-level decorator defining a Kafka configuration to use in all class methods. Takes in a KafkaJS configuration object or rdkafka configuration object.

@CKafkaConsume(topic: string | RegExp | Array<string | RegExp>, consumerConfig?: ConsumerConfig, queueName?: string)

Runs a transaction or workflow exactly-once for each message received on the specified topic(s). Takes in a Kafka topic or list of Kafka topics (required) and a consumer configuration. Requires class to be decorated with @CKafka. The decorated method must take as input a Kafka topic, partition, and message as in the example below:

import { DBOS } from "@dbos-inc/dbos-sdk";
import { KafkaConfig, Message, CKafka, CKafkaConsume } from "@dbos-inc/dbos-confluent-kafka";

const kafkaConfig: KafkaConfig = {
brokers: ['localhost:9092']
}

@CKafka(kafkaConfig)
class CKafkaExample{
@CKafkaConsume("example-topic")
@DBOS.workflow()
static async kafkaWorkflow(topic: string, partition: number, message: KafkaMessage) {
// This workflow executes exactly once for each message sent to "example-topic".
// All methods annotated with CKafka decorators must take in the topic, partition, and message as inputs just like this method.
}
}

Concurrency and Rate Limiting

By default, @CKafkaConsume workflows are started immediately upon receiving Kafka messages. If queueName is provided to the @CKafkaConsume decorator, then the workflows will be enqueued in a workflow queue and subject to rate limits.