Skip to main content

Integrating with Kafka

In this guide, you'll learn how to use DBOS transactions and workflows to process Kafka messages with exactly-once semantics.

First, install Confluent Kafka in your application:

pip install confluent-kafka

Then, define your transaction or workflow. It must take in a Kafka message as an input parameter:

from dbos import DBOS, KafkaMessage

@DBOS.workflow()
def test_kafka_workflow(msg: KafkaMessage):
DBOS.logger.info(f"Message received: {msg.value.decode()}")

Then, annotate your function with a @DBOS.kafka_consumer decorator specifying which brokers to connect to and which topics to consume from. Configuration setting details are available from the Confluent Kafka API docs and the official Kafka documentation. At a minimum, you must specify bootstrap.servers and group.id configuration settings.

from dbos import DBOS, KafkaMessage

@DBOS.kafka_consumer(
config={
"bootstrap.servers": "localhost:9092",
"group.id": "dbos-kafka-group",
},
topics=["example-topic"],
)
@DBOS.workflow()
def test_kafka_workflow(msg: KafkaMessage):
DBOS.logger.info(f"Message received: {msg.value.decode()}")

Under the hood, DBOS constructs an idempotency key for each Kafka message from its topic, partition, and offset and passes it into your workflow or transaction. This combination is guaranteed to be unique for each Kafka cluster. Thus, even if a message is delivered multiple times (e.g., due to transient network failures or application interruptions), your transaction or workflow processes it exactly once.

In-Order Processing

You can process Kafka events in-order by setting in_order=True in the @DBOS.kafka_consumer decorator. If this is set, messages are processed sequentially in order by offset. In other words, processing of Message #4 does not begin until Message #3 is fully processed. For example:

from dbos import DBOS, KafkaMessage

@DBOS.kafka_consumer(
config=config,
topics=["example-topic"],
in_order=True
)
@DBOS.workflow()
def process_messages_in_order(msg: KafkaMessage):
DBOS.logger.info(f"Messages are processed sequentially in offset order")