1. 概述

在本教程中,我们将探讨 Kafka 是如何通过新引入的 Transactional API 来实现生产者与消费者之间的 Exactly Once(恰好一次) 消息语义的。

此外,我们还会使用这个 API 实现事务型的生产者和消费者,以完成一个 WordCount 示例,展示如何在 Kafka 中实现端到端的 Exactly Once 处理。

2. Kafka 中的消息传递语义

由于各种故障的存在,消息系统无法保证生产者和消费者之间一定能够成功传递消息。根据客户端与系统的交互方式,消息传递语义主要有以下三种:

  • At-most-once:消息系统不会重复发送消息,但可能会丢失一些消息;
  • At-least-once:消息系统不会丢失消息,但可能会重复发送;
  • Exactly-once:既不会丢失也不会重复,每条消息只被处理一次;

最初,Kafka 只支持 At-most-once 和 At-least-once 两种语义。

但自从引入了 Kafka Broker 与客户端之间的事务机制 后,Kafka 才真正实现了 Exactly Once 语义。为了更好地理解这一点,我们先来快速了解一下事务型客户端 API。

3. Maven 依赖

要使用 Kafka 的事务 API,我们需要在 pom.xml 中添加 Kafka Java 客户端依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.9.0</version>
</dependency>

4. 事务型消费-转换-生产循环

在我们的示例中,我们将从一个名为 sentences 的输入 Topic 中消费消息,对每条消息中的单词进行计数,并将结果发送到一个名为 counts 的输出 Topic。

我们假设 sentences 这个 Topic 中已经存在事务型数据。

4.1. 事务感知的 Producer

首先,我们创建一个典型的 Kafka Producer:

Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");

但要支持事务,我们还需要设置 transactional.id 并启用幂等性:

producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "prod-1");

KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

启用幂等性后,Kafka 会使用 transactional.id 来识别 Producer,并确保消息不会重复发送。

⚠️ 注意:每个 Producer 的 transactional.id 必须唯一,但重启后要保持一致。

4.2. 初始化事务支持

准备好后,我们还需要调用 initTransactions() 来让 Producer 支持事务操作:

producer.initTransactions();

这一步会将 Producer 注册到 Broker,Broker 会根据 transactional.id 和 epoch(版本号)来维护事务状态。

Broker 会将事务操作记录到事务日志中,如果发现相同 ID 但旧 epoch 的事务,会自动清理。

4.3. 事务感知的 Consumer

消费端我们需要设置 isolation.level=read_committed,这样消费者将只读取已提交的事务消息:

Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "my-group-id");
consumerProps.put("enable.auto.commit", "false");
consumerProps.put("isolation.level", "read_committed");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(singleton("sentences"));

✅ 设置 read_committed 确保消费端不会读取未提交的事务消息。

默认值是 read_uncommitted,即会读取所有消息,包括未提交的。

4.4. 消费并转换数据

现在我们已经配置好了事务型 Producer 和 Consumer,可以开始消费消息并进行转换:

ConsumerRecords<String, String> records = consumer.poll(ofSeconds(60));
Map<String, Integer> wordCountMap =
  records.records(new TopicPartition("input", 0))
    .stream()
    .flatMap(record -> Stream.of(record.value().split(" ")))
    .map(word -> Tuple.of(word, 1))
    .collect(Collectors.toMap(
      tuple -> tuple.getKey(),
      t1 -> t1.getValue(),
      (v1, v2) -> v1 + v2
    ));

⚠️ 上面这段代码本身不是事务的,但由于我们设置了 read_committed,它只会读取已提交的消息。

4.5. 发送事务消息

要将结果以事务方式发送出去,我们需要调用 beginTransaction()

producer.beginTransaction();

然后将单词计数发送到 counts Topic:

wordCountMap.forEach((key, value) ->
    producer.send(new ProducerRecord<String, String>("counts", key, value.toString())));

✅ 注意:事务中的消息可以跨多个分区,Kafka 会记录所有涉及的分区。

4.6. 提交消费偏移量

我们还需要提交消费的偏移量,并将其纳入事务:

Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
    List<ConsumerRecord<String, String>> partitionedRecords = records.records(partition);
    long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset();
    offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1));
}

⚠️ 注意:我们提交的是下一条消息的偏移量,因此要 +1

然后提交偏移量到事务中:

producer.sendOffsetsToTransaction(offsetsToCommit, new ConsumerGroupMetadata("my-group-id"));

4.7. 提交或中止事务

最后,提交事务:

producer.commitTransaction();

这会将所有缓冲的消息写入分区,并对消费者可见。

如果处理过程中出错,我们可以中止事务:

try {
  // ... 消费、转换、发送
  producer.commitTransaction();
} catch (Exception e) {
  producer.abortTransaction();
}

⚠️ 如果在 max.transaction.timeout.ms 时间内既没有提交也没有中止,Kafka Broker 会自动中止事务。默认超时时间为 15 分钟(900000 毫秒)。

5. 其他消费-转换-生产模式

我们上面介绍的是读写同一 Kafka 集群的情况。

如果应用需要跨集群操作(即从一个集群消费,向另一个集群生产),则需要使用传统的 commitSync()commitAsync() API,并将消费偏移量存储在外部状态系统中,以维护事务一致性。

6. 总结

对于数据一致性要求极高的应用,端到端 Exactly Once 处理是不可或缺的

在本教程中,我们展示了如何使用 Kafka 的事务机制来实现 Exactly Once,并通过一个 WordCount 示例演示了其基本用法。

✅ Kafka 的事务 API 提供了强大的能力,但使用时要特别注意配置和异常处理,避免事务卡住或数据丢失。


原始标题:Exactly Once Processing in Kafka with Java | Baeldung