1. 概述
Apache Kafka 是一个分布式流处理平台,通过基于分区的架构实现高吞吐量数据传输。当消息发送到Kafka主题时,它们会被分发到多个分区以实现并行处理。这种设计使Kafka能够在保持性能的同时实现水平扩展。
理解Kafka在多分区场景下的消息传递机制对于构建可靠的分布式系统至关重要。分区策略直接影响消息顺序、消费者扩展性和整体系统行为。本文将深入探讨当主题包含多个分区时,Kafka如何传递消息,重点关注路由策略、顺序保证和消费者协调机制。
2. 消息分区路由
Kafka根据消息是否包含键,采用两种主要策略来确定消息的目标分区。这个决策从根本上影响了消息的分布和处理方式。
2.1 基于键的分区策略
当发送带有键的消息时,Kafka使用确定性哈希函数(通常是Murmur2哈希)将消息路由到固定分区,确保相关消息始终位于同一分区:
public void sendMessagesWithKey() {
String key = "user-123";
for (int i = 0; i <= 5; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("user-events", key, "Event " + i);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
logger.info("Key: {}, Partition: {}, Offset: {}", key, metadata.partition(),
metadata.offset());
}
});
}
producer.flush();
}
Kafka对键应用MurmurHash2算法,并通过模运算(基于分区数量)选择目标分区。所有键为"user-123"的消息都会被路由到同一分区,确保它们按顺序处理。这在需要维护特定实体状态或顺序的场景中特别有用。
2.2 无键消息的粘性分区策略
无键消息采用粘性分区策略进行分发,这种策略通过高效批处理提升吞吐量:
public Map<Integer, Integer> sendMessagesWithoutKey() {
Map<Integer, Integer> partitionCounts = new HashMap<>();
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("events", null, // no key
"Message " + i);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
synchronized (partitionCounts) {
partitionCounts.merge(metadata.partition(), 1, Integer::sum);
}
}
});
}
producer.flush();
logger.info("Distribution across partitions: {}", partitionCounts);
return partitionCounts;
}
与纯轮询相比,粘性分区策略会先填满一个分区的批次再切换到下一个分区,减少网络请求并提高压缩率。这种粘性行为会持续到批次满或linger时间到期。
3. 跨分区顺序保证
Kafka的顺序保证完全依赖于分区结构。理解这些保证对于正确处理顺序操作的系统设计至关重要。
3.1 分区内顺序保证
每个分区通过顺序偏移量分配维护严格顺序:消息按追加顺序写入分区日志,并严格按此顺序消费:
public void demonstratePartitionOrdering() throws InterruptedException {
String orderId = "order-789";
String[] events = { "created", "validated", "paid", "shipped", "delivered" };
for (String event : events) {
ProducerRecord<String, String> record = new ProducerRecord<>("orders", orderId, event);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
logger.info("Event: {} -> Partition: {}, Offset: {}", event, metadata.partition(),
metadata.offset());
}
});
// small delay to demonstrate sequential processing
Thread.sleep(100);
}
producer.flush();
}
由于所有消息共享相同键,它们被路由到同一分区并在消费时保持顺序。这种保证在分区内是绝对的:消费者将始终按生产顺序读取这些事件。
3.2 跨分区无顺序保证
不同键的消息可能落入不同分区,Kafka不提供跨分区的顺序保证:
public void demonstrateCrossPartitionBehavior() {
long startTime = System.currentTimeMillis();
// these will likely go to different partitions
producer.send(new ProducerRecord<>("events", "key-A", "First at " + (System.currentTimeMillis() - startTime) + "ms"));
producer.send(new ProducerRecord<>("events", "key-B", "Second at " + (System.currentTimeMillis() - startTime) + "ms"));
producer.send(new ProducerRecord<>("events", "key-C", "Third at " + (System.currentTimeMillis() - startTime) + "ms"));
producer.flush();
}
即使我们顺序发送这些消息,消费者也可能乱序处理,因为它们位于不同分区。由于消费者负载或网络条件差异,某些分区的处理速度可能快于其他分区。
4. 消费者组协调
Kafka通过在消费者组内分配分区实现水平扩展。这种协调机制是Kafka可扩展性模型的基础。
4.1 组内分区分配
当多个消费者加入同一组时,Kafka将每个分区精确分配给一个消费者,避免组内重复处理:
public void createConsumerGroup() {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", "order-processors");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));
int recordCount = 0;
while (recordCount < 10) { // process limited records for demo
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
logger.info("Consumer: {}, Partition: {}, Offset: {}, Value: {}", Thread.currentThread()
.getName(), record.partition(), record.offset(), record.value());
recordCount++;
}
consumer.commitSync();
}
consumer.close();
}
如果组中有6个分区和3个消费者,每个消费者通常处理2个分区。这种分配确保负载均衡且组内无消息重复。Kafka的组协调器自动管理这些分配。
4.2 多组实现消息广播
不同消费者组可以独立处理相同消息,使多个应用能响应同一事件:
public void startMultipleGroups() {
String[] groupIds = { "analytics-group", "audit-group", "notification-group" };
CountDownLatch latch = new CountDownLatch(groupIds.length);
for (String groupId : groupIds) {
startConsumerGroup(groupId, latch);
}
try {
latch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread()
.interrupt();
}
}
private void startConsumerGroup(String groupId, CountDownLatch latch) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
// other properties
new Thread(() -> {
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Arrays.asList("orders"));
int recordCount = 0;
while (recordCount < 5) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
recordCount += processRecordsForGroup(groupId, records);
}
} finally {
latch.countDown();
}
}).start();
}
每个组维护独立的偏移量跟踪,允许不同服务按自身节奏处理消息。这种模式支持事件驱动架构,多个系统可对同一业务事件做出反应。
5. 处理消费者重平衡
当消费者加入或离开组时,Kafka会重平衡分区分配。此过程确保持续运行但可能导致短暂中断。我们可以使用协作式重平衡最小化影响:
public void configureCooperativeRebalancing() {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", "cooperative-group");
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection partitions) {
logger.info("Revoked partitions: {}", partitions);
// complete processing of current records
}
@Override
public void onPartitionsAssigned(Collection partitions) {
logger.info("Assigned partitions: {}", partitions);
// initialize any partition-specific state
}
});
// process a few records to demonstrate
int recordCount = 0;
while (recordCount < 5) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
recordCount += records.count();
}
consumer.close();
}
协作式重平衡允许未受影响的消费者继续处理,仅重新分配必要分区,显著减少扩展操作的影响。
6. 处理保证机制
为可靠处理消息,我们通常通过手动控制偏移量提交实现至少一次投递:
public void processWithManualCommit() {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", "manual-commit-group");
props.put("enable.auto.commit", "false");
props.put("max.poll.records", "10");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));
int totalProcessed = 0;
while (totalProcessed < 10) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processOrder(record);
totalProcessed++;
} catch (Exception e) {
logger.error("Processing failed for offset: {}", record.offset(), e);
break;
}
}
if (!records.isEmpty()) {
consumer.commitSync();
logger.info("Committed {} records", records.count());
}
}
consumer.close();
}
这种方法确保处理失败时消息不会丢失,但需要设计处理逻辑以优雅处理潜在重复。
7. 综合应用
当生产者向Kafka发送消息时,流程始于分区选择:
- ✅ 带键消息使用一致性哈希确保相关数据位于同一分区
- ✅ 无键消息使用粘性分区提高批处理效率
- ✅ 每个分区内通过顺序偏移量维护严格顺序
- ❌ 跨分区无全局顺序保证
每个分区在每个消费者组中分配给一个消费者,实现无重复的并行处理。不同消费者组独立消费相同消息,各自维护偏移量跟踪。当消费者加入或离开时,Kafka使用协作策略重平衡分区分配以减少中断。这种设计使Kafka能在保持分区顺序的同时实现水平扩展。
8. 总结
本文深入探讨了Kafka基于分区的架构如何在关键场景下处理消息传递并维护顺序保证。我们看到Kafka优先考虑可扩展性和吞吐量而非全局顺序,提供符合大多数实际需求的分区级保证。
关键结论是:分区是Kafka中并行性和顺序性的基本单元。通过围绕这些约束设计应用,我们可以构建高效处理数百万消息的可扩展系统。本文相关代码可在GitHub获取。