Skip to main content

Integrating with Kafka

In this guide, you'll learn how to use DBOS transactions and workflows to process Kafka messages with exactly-once semantics.

First, install KafkaJS in your application:

npm install kafkajs

Then, define your transaction or workflow. It must take in the Kafka topic, partition, and message as inputs:

import { Workflow, WorkflowContext } from '@dbos-inc/dbos-sdk';

export class KafkaExample{
@Workflow()
static async kafkaWorkflow(ctxt: WorkflowContext, topic: string, partition: number, message: KafkaMessage) {
ctxt.logger.info(`Message received: ${message.value?.toString()}`)
}
}

Then, annotate your method with a @KafkaConsume decorator specifying which topic to consume from. Additionally, annotate your class with a @Kafka decorator defining which brokers to connect to. DBOS invokes your method exactly-once for each message sent to the topic.

import { KafkaConfig, KafkaMessage} from "kafkajs";
import { Workflow, WorkflowContext, Kafka, KafkaConsume } from '@dbos-inc/dbos-sdk';

const kafkaConfig: KafkaConfig = {
brokers: ['localhost:9092']
}

@Kafka(kafkaConfig)
export class KafkaExample{

@KafkaConsume("example-topic")
@Workflow()
static async kafkaWorkflow(ctxt: WorkflowContext, topic: string, partition: number, message: KafkaMessage) {
ctxt.logger.info(`Message received: ${message.value?.toString()}`)
}
}

If you need more control, you can pass detailed configurations to both the @Kafka and @KafkaConsume decorators. The @Kafka decorator takes in a KafkaJS configuration object used to configure Kafka for all methods in its class. The @KafkaConsume decorator takes in a KafkaJS consumer configuration as an optional second argument. For example, you can specify a custom consumer group ID:

@KafkaConsume("example-topic", { groupId: "custom-group-id" })
@Workflow()
static async kafkaWorkflow(ctxt: WorkflowContext, topic: string, partition: number, message: KafkaMessage) {
ctxt.logger.info(`Message received: ${message.value?.toString()}`)
}

Under the hood, DBOS constructs an idempotency key for each Kafka message from its topic, partition, and offset and passes it into your workflow or transaction. This combination is guaranteed to be unique for each Kafka cluster. Thus, even if a message is delivered multiple times (e.g., due to transient network failures or application interruptions), your transaction or workflow processes it exactly once.