1. 引言
本教程将深入探讨 Kafka 的主题(Topic)和分区(Partition)概念,以及它们之间的内在关联。
2. Kafka主题是什么
主题本质上是一个事件序列的存储机制。它相当于一个持久化的日志文件,严格按照事件发生的时间顺序存储数据。新事件总是追加到日志末尾,且事件一旦写入就不可变(immutable)。
举个典型场景:记录房间温度测量序列。当某个温度值(如下午5:02的25°C)被记录后,它就不可更改——因为事件已经发生。同时,下午5:06的温度值不可能出现在5:02记录之前。因此,将每个温度测量视为事件,用Kafka主题存储这种时序数据再合适不过。
3. Kafka分区是什么
Kafka通过分区机制提升系统扩展性。分区会将主题拆分成多个片段,并分散存储在分布式系统的不同节点上。分区数量可由开发者或集群默认配置决定。
Kafka保证同一分区内的事件严格有序,但默认不保证跨分区的全局顺序。
例如,为提升性能,我们将主题分为两个分区:
- 消费者读取同一分区内的事件时,顺序与生产顺序一致
- 但若两个事件被分发到不同分区,消费顺序可能与生产顺序不同
⚠️ 要改善事件顺序保证,可通过设置事件键(event key)实现:相同键的事件会被路由到同一有序分区,从而确保这些事件在消费者端保持生产顺序。
4. 消费者组
消费者组(Consumer Group)是读取同一主题的消费者集合。Kafka将主题的所有分区分配给组内消费者,每个分区只能被组内一个消费者消费(但一个消费者可能处理多个分区)。
假设一个主题有3个分区,消费者组包含2个消费者:
- 可能的分配方案:消费者A处理分区1和2,消费者B仅处理分区3
在 KIP-500更新 中,Kafka引入了基于KRaft的新共识算法。当组内消费者增减时,KRaft会自动在剩余消费者间重新均衡分区,确保所有分区都有消费者处理。
5. 配置应用程序
本节将创建主题、消费者和生产者的配置类。
5.1. 主题配置
首先创建主题配置类:
@Configuration
public class KafkaTopicConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
public NewTopic celciusTopic() {
return TopicBuilder.name("celcius-scale-topic")
.partitions(2)
.build();
}
}
KafkaTopicConfig
注入了两个Spring Bean:
KafkaAdmin
:初始化Kafka集群并指定网络地址NewTopic
:创建名为celcius-scale-topic
的主题(含2个分区)
5.2. 消费者和生产者配置
创建生产者配置类:
public class KafkaProducerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, Double> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DoubleSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Double> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
KafkaProducerConfig
注入两个Bean:
ProducerFactory
:指定事件序列化方式和目标服务器KafkaTemplate
:在服务类中用于发送事件
5.3. Kafka生产者服务
创建核心生产者服务:
public class ThermostatService {
private final KafkaTemplate<String, Double> kafkaTemplate;
public ThermostatService(KafkaTemplate<String, Double> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void measureCelsiusAndPublish(int numMeasurements) {
new Random().doubles(25, 35)
.limit(numMeasurements)
.forEach(tmp -> {
kafkaTemplate.send("celcius-scale-topic", tmp);
});
}
}
ThermostatService
提供核心方法 measureCelsiusAndPublish
:
- 生成[25,35)范围内的随机温度值
- 通过
kafkaTemplate.send()
将事件发布到celsius-scale-topic
6. 生产和消费事件
本节演示如何配置消费者读取主题事件(使用嵌入式Kafka)。
6.1. 创建消费者服务
创建消费者类:
@Service
public class TemperatureConsumer {
Map<String, Set<String>> consumedRecords = new ConcurrentHashMap<>();
@KafkaListener(topics = "celcius-scale-topic", groupId = "group-1")
public void consumer1(ConsumerRecord<?, ?> consumerRecord) {
trackConsumedPartitions("consumer-1", consumerRecord.partition());
}
private void trackConsumedPartitions(String consumerName, int partitionNumber) {
consumedRecords.computeIfAbsent(consumerName, k -> new HashSet<>());
consumedRecords.computeIfPresent(consumerName, (k, v) -> {
v.add(String.valueOf(partitionNumber));
return v;
});
}
}
关键点说明:
@KafkaListener
注解创建消费者topics
:指定监听主题groupId
:标识所属消费者组
- 使用
ConcurrentHashMap
跟踪消费记录:- Key:消费者名称
- Value:消费的分区集合
6.2. 创建测试类
创建集成测试:
@SpringBootTest(classes = ThermostatApplicationKafkaApp.class)
@EmbeddedKafka(partitions = 2, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
public class KafkaTopicsAndPartitionsIntegrationTest {
@ClassRule
public static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, true, "multitype");
@Autowired
private ThermostatService service;
@Autowired
private TemperatureConsumer consumer;
@Test
public void givenTopic_andConsumerGroup_whenConsumersListenToEvents_thenConsumeItCorrectly() throws Exception {
service.measureCelsiusAndPublish(10000);
Thread.sleep(1000);
System.out.println(consumer.consumedRecords);
}
}
测试要点:
✅ 使用 @EmbeddedKafka
启动嵌入式Kafka
✅ 通过JUnit规则管理Broker生命周期
✅ 生产10,000个事件后等待1秒(确保消费者就绪)
✅ 输出分区消费记录
测试输出示例:
{consumer-1=[0, 1]}
这表明单个消费者处理了所有分区(0和1)。若增加不同消费者组的消费者,结果会变化。
7. 总结
本文系统阐述了Kafka主题与分区的核心概念及其关系,并通过嵌入式Kafka演示了消费者处理多分区的实际场景。掌握这些基础对构建高吞吐、可扩展的实时数据管道至关重要。
完整示例代码见 GitHub仓库。