1. 概述
本文将深入探讨Apache Kafka中消息顺序保障的挑战与解决方案。在分布式系统中,确保消息按正确顺序处理对维护数据完整性和一致性至关重要。虽然Kafka提供了维持消息顺序的机制,但在分布式环境中实现这一目标仍面临诸多复杂性。
2. 分区内顺序及其挑战
Kafka通过为每条消息分配唯一偏移量来保证单个分区内的顺序。这确保了消息在该分区内的顺序追加。然而,当我们扩展到使用多个分区时,维持全局顺序变得复杂。不同分区以不同速率接收消息,使得跨分区的严格顺序难以保证。
2.1 生产者与消费者时序问题
Kafka处理消息顺序时存在一个关键差异:生产者发送消息的顺序与消费者接收的顺序可能不同。当仅使用单个分区时,我们按消息到达Broker的顺序处理它们。然而,这个顺序可能与原始发送顺序不一致。这种混乱可能由网络延迟或消息重传导致。要解决这个问题,我们可以实现带确认和重试机制的生产者,确保消息不仅到达Kafka,而且顺序正确。
2.2 多分区挑战
虽然分区分布有利于可扩展性和容错性,但带来了实现全局消息顺序的复杂性。例如,我们按顺序发送两条消息M1和M2。Kafka按发送顺序接收它们,但将它们放入不同分区。关键问题在于:M1先发送并不意味着它会被先处理。在金融交易等需要严格处理顺序的场景中,这可能带来严重问题。
2.3 单分区消息顺序
我们创建两个主题:单分区主题single_partition_topic
和五分区主题multi_partition_topic
。以下是单分区主题的生产者示例:
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
producer = new KafkaProducer<>(producerProperties);
for (long sequenceNumber = 1; sequenceNumber <= 10; sequenceNumber++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
userEvent.setGlobalSequenceNumber(sequenceNumber);
userEvent.setEventNanoTime(System.nanoTime());
ProducerRecord<Long, UserEvent> producerRecord = new ProducerRecord<>(Config.SINGLE_PARTITION_TOPIC, userEvent);
Future<RecordMetadata> future = producer.send(producerRecord);
sentUserEventList.add(userEvent);
RecordMetadata metadata = future.get();
logger.info("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
}
UserEvent
是一个实现了Comparable
接口的POJO类,通过globalSequenceNumber
(外部序列号)排序。由于生产者发送POJO消息对象,我们实现了自定义的Jackson序列化器和反序列化器。
所有用户事件都进入分区0,事件ID序列如下:
841e593a-bca0-4dd7-9f32-35792ffc522e
9ef7b0c0-6272-4f9a-940d-37ef93c59646
0b09faef-2939-46f9-9c0a-637192e242c5
4158457a-73cc-4e65-957a-bf3f647d490a
fcf531b7-c427-4e80-90fd-b0a10bc096ca
23ed595c-2477-4475-a4f4-62f6cbb05c41
3a36fb33-0850-424c-81b1-dafe4dc3bb18
10bca2be-3f0e-40ef-bafc-eaf055b4ee26
d48dcd66-799c-4645-a977-fa68414ca7c9
7a70bfde-f659-4c34-ba75-9b43a5045d39
在Kafka中,每个消费者组都是独立实体。如果两个消费者属于不同组,它们都会接收主题的所有消息。这是因为Kafka将每个消费者组视为独立订阅者。
如果两个消费者属于同一组并订阅多分区主题,Kafka会确保每个消费者从不同分区集合读取。这允许消息的并发处理。
Kafka确保同一消费者组内不会有多个消费者读取同一条消息,因此每条消息在每个组中只被处理一次。
以下是消费同一主题消息的消费者代码:
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(Collections.singletonList(Config.SINGLE_PARTITION_TOPIC));
ConsumerRecords<Long, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
UserEvent userEvent = record.value();
receivedUserEventList.add(userEvent);
logger.info("User Event ID: " + userEvent.getUserEventId());
});
输出显示消费者按相同顺序消费消息,事件ID序列如下:
841e593a-bca0-4dd7-9f32-35792ffc522e
9ef7b0c0-6272-4f9a-940d-37ef93c59646
0b09faef-2939-46f9-9c0a-637192e242c5
4158457a-73cc-4e65-957a-bf3f647d490a
fcf531b7-c427-4e80-90fd-b0a10bc096ca
23ed595c-2477-4475-a4f4-62f6cbb05c41
3a36fb33-0850-424c-81b1-dafe4dc3bb18
10bca2be-3f0e-40ef-bafc-eaf055b4ee26
d48dcd66-799c-4645-a977-fa68414ca7c9
7a70bfde-f659-4c34-ba75-9b43a5045d39
2.4 多分区消息顺序
对于多分区主题,消费者和生产者配置相同。唯一区别是消息发送的主题和分区,生产者向multi_partition_topic
发送消息:
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, sequenceNumber, userEvent));
sentUserEventList.add(userEvent);
RecordMetadata metadata = future.get();
logger.info("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
消费者从同一主题消费消息:
consumer.subscribe(Collections.singletonList(Config.MULTI_PARTITION_TOPIC));
ConsumerRecords<Long, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
UserEvent userEvent = record.value();
receivedUserEventList.add(userEvent);
logger.info("User Event ID: " + userEvent.getUserEventId());
});
生产者输出显示事件ID及其对应分区:
939c1760-140e-4d0c-baa6-3b1dd9833a7d, 0
47fdbe4b-e8c9-4b30-8efd-b9e97294bb74, 4
4566a4ec-cae9-4991-a8a2-d7c5a1b3864f, 4
4b061609-aae8-415f-94d7-ae20d4ef1ca9, 3
eb830eb9-e5e9-498f-8618-fb5d9fc519e4, 2
9f2a048f-eec1-4c68-bc33-c307eec7cace, 1
c300f25f-c85f-413c-836e-b9dcfbb444c1, 0
c82efad1-6287-42c6-8309-ae1d60e13f5e, 4
461380eb-dd6-455c-9c92-ae58b0913954, 4
43bbe38a-5c9e-452b-be43-ebb26d58e782, 3
消费者输出显示消息消费顺序与发送顺序不一致:
939c1760-140e-4d0c-baa6-3b1dd9833a7d
47fdbe4b-e8c9-4b30-8efd-b9e97294bb74
4566a4ec-cae9-4991-a8a2-d7c5a1b3864f
c82efad1-6287-42c6-8309-ae1d60e13f5e
461380eb-dd6-455c-9c92-ae58b0913954
eb830eb9-e5e9-498f-8618-fb5d9fc519e4
4b061609-aae8-415f-94d7-ae20d4ef1ca9
43bbe38a-5c9e-452b-be43-ebb26d58e782
c300f25f-c85f-413c-836e-b9dcfbb444c1
9f2a048f-eec1-4c68-bc33-c307eec7cace
3. 消息顺序策略
3.1 使用单分区
我们可以使用Kafka的单分区(如前例中的single_partition_topic
)确保消息顺序。但这种方法存在权衡:
- 吞吐量限制:想象一个繁忙的披萨店。如果只有一个厨师(生产者)和一个服务员(消费者)在一张桌子(分区)工作,他们能处理的订单有限。在高消息量场景下,单分区就像这个单桌场景,会成为瓶颈。消息处理速率受限,因为同一时间只有一个生产者和一个消费者能操作单个分区。
- 并行度降低:如果有多个厨师(生产者)和服务员(消费者)在多张桌子(分区)工作,完成的订单数量会增加。Kafka的优势在于跨分区的并行处理。单分区丧失了这一优势,导致顺序处理并进一步限制消息流。
本质上,单分区保证顺序,但代价是降低吞吐量。
3.2 基于时间窗口缓冲的外部排序
这种方法中,生产者为每条消息附加全局序列号。多个消费者实例从不同分区并发消费消息,并使用这些序列号重新排序消息,确保全局顺序。
在多生产者的实际场景中,我们需要通过跨所有生产者进程可访问的共享资源管理全局序列号,如数据库序列或分布式计数器。这确保序列号在所有消息中唯一且有序:
for (long sequenceNumber = 1; sequenceNumber <= 10 ; sequenceNumber++) {
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
userEvent.setEventNanoTime(System.nanoTime());
userEvent.setGlobalSequenceNumber(sequenceNumber);
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, sequenceNumber, userEvent));
sentUserEventList.add(userEvent);
RecordMetadata metadata = future.get();
logger.info("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
}
在消费者端,我们将消息分组到时间窗口中并按顺序处理。在特定时间窗口内到达的消息被批量处理,窗口结束后处理批次。这确保窗口内的消息按顺序处理,即使它们在窗口内不同时间到达。消费者缓冲消息并根据序列号重新排序后再处理。我们需要确保消息按正确顺序处理,因此消费者应有一个缓冲期,多次轮询消息后再处理缓冲消息。缓冲期应足够长以应对潜在的消息顺序问题:
consumer.subscribe(Collections.singletonList(Config.MULTI_PARTITION_TOPIC));
List<UserEvent> buffer = new ArrayList<>();
long lastProcessedTime = System.nanoTime();
ConsumerRecords<Long, UserEvent> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
buffer.add(record.value());
});
while (!buffer.isEmpty()) {
if (System.nanoTime() - lastProcessedTime > BUFFER_PERIOD_NS) {
processBuffer(buffer, receivedUserEventList);
lastProcessedTime = System.nanoTime();
}
records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
records.forEach(record -> {
buffer.add(record.value());
});
}
void processBuffer(List buffer, List receivedUserEventList) {
Collections.sort(buffer);
buffer.forEach(userEvent -> {
receivedUserEventList.add(userEvent);
logger.info("Processing message with Global Sequence number: " + userEvent.getGlobalSequenceNumber() + ", User Event Id: " + userEvent.getUserEventId());
});
buffer.clear();
}
生产者输出显示事件ID及其对应分区:
d6ef910f-2e65-410d-8b86-fa0fc69f2333, 0
4d6bfe60-7aad-4d1b-a536-cc735f649e1a, 4
9b68dcfe-a6c8-4cca-874d-cfdda6a93a8f, 4
84bd88f5-9609-4342-a7e5-d124679fa55a, 3
55c00440-84e0-4234-b8df-d474536e9357, 2
8fee6cac-7b8f-4da0-a317-ad38cc531a68, 1
d04c1268-25c1-41c8-9690-fec56397225d, 0
11ba8121-5809-9abf-9d9c-aa180330ac27, 4
8e00173c-b8e1-4cf7-ae8c-8a9e28cfa6b2, 4
e1acd392-db07-4325-8966-0f7c7a48e3d3, 3
消费者输出显示全局序列号和事件ID:
1, d6ef910f-2e65-410d-8b86-fa0fc69f2333
2, 4d6bfe60-7aad-4d1b-a536-cc735f649e1a
3, 9b68dcfe-a6c8-4cca-874d-cfdda6a93a8f
4, 84bd88f5-9609-4342-a7e5-d124679fa55a
5, 55c00440-84e0-4234-b8df-d474536e9357
6, 8fee6cac-7b8f-4da0-a317-ad38cc531a68
7, d04c1268-25c1-41c8-9690-fec56397225d
8, 11ba8121-5809-9abf-9d9c-aa180330ac27
9, 8e00173c-b8e1-4cf7-ae8c-8a9e28cfa6b2
10, e1acd392-db07-4325-8966-0f7c7a48e3d3
3.3 外部排序与缓冲的注意事项
这种方法中,每个消费者实例缓冲消息并按序列号顺序处理。但需考虑以下问题:
- 缓冲区大小:缓冲区大小可能随消息量增加而增长。在严格按序列号排序的实现中,如果消息传递延迟,缓冲区可能显著增长。例如,如果每分钟处理100条消息但突然因延迟收到200条,缓冲区会意外膨胀。必须有效管理缓冲区大小,并准备超出预期限制时的策略。
- 延迟:缓冲消息本质上是让它们等待处理(引入延迟)。一方面有助于保持顺序,另一方面会减慢整体处理速度。需要在维持顺序和最小化延迟之间找到平衡。
- 故障处理:如果消费者故障,可能丢失缓冲消息。为防止此问题,可能需要定期保存缓冲区状态。
- 延迟消息:在窗口处理后到达的消息将乱序。根据用例,可能需要处理或丢弃此类消息的策略。
- 状态管理:如果处理涉及有状态操作,需要跨窗口管理和持久化状态的机制。
- 资源利用:在缓冲区中保存大量消息需要内存。需确保有足够资源处理,特别是消息在缓冲区停留时间较长时。
3.4 幂等生产者
Kafka的幂等生产者功能旨在精确一次传递消息,防止重复。这在生产者因网络错误或其他瞬时故障重试发送消息的场景中至关重要。虽然幂等性的主要目标是防止消息重复,但它间接影响消息顺序。Kafka通过生产者ID(PID)和序列号实现幂等性,序列号作为幂等键,在特定分区内唯一。
- 序列号:Kafka为生产者发送的每条消息分配序列号。这些序列号在每个分区内唯一,确保生产者按特定顺序发送的消息在Kafka接收后以相同顺序写入分区。序列号保证单个分区内顺序。但向多个分区生产消息时,不保证跨分区的全局顺序。例如,如果生产者向分区P1、P2、P3分别发送消息M1、M2、M3,每条消息在其分区内获得唯一序列号,但不保证这些分区的相对消费顺序。
- 生产者ID(PID):启用幂等性时,Broker为每个生产者分配唯一PID。该PID与序列号结合,使Kafka能识别并丢弃生产者重试导致的重复消息。
Kafka通过序列号保证消息按生产顺序写入分区,并通过PID和幂等功能防止重复。要启用幂等生产者,需在生产者配置中设置enable.idempotence
为true:
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
4. 生产者与消费者的关键配置
Kafka生产者和消费者的关键配置会影响消息顺序和吞吐量。
4.1 生产者配置
-
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
:决定发送多少消息而不等待确认。如果未启用幂等性且设置大于1,重试时可能打乱消息顺序。启用幂等性后,即使批量发送也能保持顺序。对于严格顺序(确保每条消息在下条发送前被读取),应设为1。若优先速度而非完美顺序,可设为5(但可能引入顺序问题)。 -
BATCH_SIZE_CONFIG
和LINGER_MS_CONFIG
:控制默认批处理大小(字节),旨在将同一分区的记录分组到更少请求中以提升性能。设置过低会导致发送大量小分组,降低性能;设置过高则可能浪费内存。如果分组未满,Kafka会等待一段时间(由LINGER_MS_CONFIG
控制)。如果新消息快速到达填满限制,立即发送;否则时间到期后发送现有消息。这是在速度和效率间的平衡。
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
props.put(ProducerConfig.LINGER_MS_CONFIG, "5");
4.2 消费者配置
-
MAX_POLL_RECORDS_CONFIG
:限制每次轮询获取的记录数。设置高可提升吞吐量,但可能增加保持顺序的难度。需在效率和可管理性间找到平衡点。 -
FETCH_MIN_BYTES_CONFIG
:设置高时,Kafka等待累积足够数据才发送,减少请求次数提升效率。若需快速获取数据,可设置较低。例如,消费者应用资源密集或需严格保持消息顺序(尤其多线程时),较小批次可能更有利。 -
FETCH_MAX_WAIT_MS_CONFIG
:决定消费者等待Kafka累积足够数据(满足FETCH_MIN_BYTES_CONFIG
)的最长时间。设置高则消费者愿意等待更久,可能一次获取更多数据;设置低则更快获取数据(即使量较少)。这是在等待更多数据和快速处理间的权衡。
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
5. 结论
本文深入探讨了Kafka中消息顺序保障的复杂性。我们分析了相关挑战并提出了应对策略。无论是通过单分区、基于时间窗口缓冲的外部排序,还是幂等生产者,Kafka都提供了满足消息顺序需求的定制化解决方案。
示例代码可在GitHub获取。