1. 概述
在本教程中,我们将探讨 Kafka 是如何通过新引入的 Transactional API 来实现生产者与消费者之间的 Exactly Once(恰好一次) 消息语义的。
此外,我们还会使用这个 API 实现事务型的生产者和消费者,以完成一个 WordCount 示例,展示如何在 Kafka 中实现端到端的 Exactly Once 处理。
2. Kafka 中的消息传递语义
由于各种故障的存在,消息系统无法保证生产者和消费者之间一定能够成功传递消息。根据客户端与系统的交互方式,消息传递语义主要有以下三种:
- ✅ At-most-once:消息系统不会重复发送消息,但可能会丢失一些消息;
- ✅ At-least-once:消息系统不会丢失消息,但可能会重复发送;
- ✅ Exactly-once:既不会丢失也不会重复,每条消息只被处理一次;
最初,Kafka 只支持 At-most-once 和 At-least-once 两种语义。
但自从引入了 Kafka Broker 与客户端之间的事务机制 后,Kafka 才真正实现了 Exactly Once 语义。为了更好地理解这一点,我们先来快速了解一下事务型客户端 API。
3. Maven 依赖
要使用 Kafka 的事务 API,我们需要在 pom.xml
中添加 Kafka Java 客户端依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.9.0</version>
</dependency>
4. 事务型消费-转换-生产循环
在我们的示例中,我们将从一个名为 sentences
的输入 Topic 中消费消息,对每条消息中的单词进行计数,并将结果发送到一个名为 counts
的输出 Topic。
我们假设 sentences
这个 Topic 中已经存在事务型数据。
4.1. 事务感知的 Producer
首先,我们创建一个典型的 Kafka Producer:
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");
但要支持事务,我们还需要设置 transactional.id
并启用幂等性:
producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "prod-1");
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
启用幂等性后,Kafka 会使用 transactional.id
来识别 Producer,并确保消息不会重复发送。
⚠️ 注意:每个 Producer 的 transactional.id
必须唯一,但重启后要保持一致。
4.2. 初始化事务支持
准备好后,我们还需要调用 initTransactions()
来让 Producer 支持事务操作:
producer.initTransactions();
这一步会将 Producer 注册到 Broker,Broker 会根据 transactional.id
和 epoch(版本号)来维护事务状态。
Broker 会将事务操作记录到事务日志中,如果发现相同 ID 但旧 epoch 的事务,会自动清理。
4.3. 事务感知的 Consumer
消费端我们需要设置 isolation.level=read_committed
,这样消费者将只读取已提交的事务消息:
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "my-group-id");
consumerProps.put("enable.auto.commit", "false");
consumerProps.put("isolation.level", "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(singleton("sentences"));
✅ 设置 read_committed
确保消费端不会读取未提交的事务消息。
默认值是 read_uncommitted
,即会读取所有消息,包括未提交的。
4.4. 消费并转换数据
现在我们已经配置好了事务型 Producer 和 Consumer,可以开始消费消息并进行转换:
ConsumerRecords<String, String> records = consumer.poll(ofSeconds(60));
Map<String, Integer> wordCountMap =
records.records(new TopicPartition("input", 0))
.stream()
.flatMap(record -> Stream.of(record.value().split(" ")))
.map(word -> Tuple.of(word, 1))
.collect(Collectors.toMap(
tuple -> tuple.getKey(),
t1 -> t1.getValue(),
(v1, v2) -> v1 + v2
));
⚠️ 上面这段代码本身不是事务的,但由于我们设置了 read_committed
,它只会读取已提交的消息。
4.5. 发送事务消息
要将结果以事务方式发送出去,我们需要调用 beginTransaction()
:
producer.beginTransaction();
然后将单词计数发送到 counts
Topic:
wordCountMap.forEach((key, value) ->
producer.send(new ProducerRecord<String, String>("counts", key, value.toString())));
✅ 注意:事务中的消息可以跨多个分区,Kafka 会记录所有涉及的分区。
4.6. 提交消费偏移量
我们还需要提交消费的偏移量,并将其纳入事务:
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionedRecords = records.records(partition);
long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset();
offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1));
}
⚠️ 注意:我们提交的是下一条消息的偏移量,因此要 +1
。
然后提交偏移量到事务中:
producer.sendOffsetsToTransaction(offsetsToCommit, new ConsumerGroupMetadata("my-group-id"));
4.7. 提交或中止事务
最后,提交事务:
producer.commitTransaction();
这会将所有缓冲的消息写入分区,并对消费者可见。
如果处理过程中出错,我们可以中止事务:
try {
// ... 消费、转换、发送
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
⚠️ 如果在 max.transaction.timeout.ms
时间内既没有提交也没有中止,Kafka Broker 会自动中止事务。默认超时时间为 15 分钟(900000 毫秒)。
5. 其他消费-转换-生产模式
我们上面介绍的是读写同一 Kafka 集群的情况。
如果应用需要跨集群操作(即从一个集群消费,向另一个集群生产),则需要使用传统的 commitSync()
或 commitAsync()
API,并将消费偏移量存储在外部状态系统中,以维护事务一致性。
6. 总结
对于数据一致性要求极高的应用,端到端 Exactly Once 处理是不可或缺的。
在本教程中,我们展示了如何使用 Kafka 的事务机制来实现 Exactly Once,并通过一个 WordCount 示例演示了其基本用法。
✅ Kafka 的事务 API 提供了强大的能力,但使用时要特别注意配置和异常处理,避免事务卡住或数据丢失。