1. 引言

本教程将深入探讨 Kafka 的主题(Topic)和分区(Partition)概念,以及它们之间的内在关联。

2. Kafka主题是什么

主题本质上是一个事件序列的存储机制。它相当于一个持久化的日志文件,严格按照事件发生的时间顺序存储数据。新事件总是追加到日志末尾,且事件一旦写入就不可变(immutable)。

举个典型场景:记录房间温度测量序列。当某个温度值(如下午5:02的25°C)被记录后,它就不可更改——因为事件已经发生。同时,下午5:06的温度值不可能出现在5:02记录之前。因此,将每个温度测量视为事件,用Kafka主题存储这种时序数据再合适不过。

3. Kafka分区是什么

Kafka通过分区机制提升系统扩展性。分区会将主题拆分成多个片段,并分散存储在分布式系统的不同节点上。分区数量可由开发者或集群默认配置决定。

Kafka保证同一分区内的事件严格有序,但默认不保证跨分区的全局顺序。

例如,为提升性能,我们将主题分为两个分区:

  • 消费者读取同一分区内的事件时,顺序与生产顺序一致
  • 但若两个事件被分发到不同分区,消费顺序可能与生产顺序不同

⚠️ 要改善事件顺序保证,可通过设置事件键(event key)实现:相同键的事件会被路由到同一有序分区,从而确保这些事件在消费者端保持生产顺序。

4. 消费者组

消费者组(Consumer Group)是读取同一主题的消费者集合。Kafka将主题的所有分区分配给组内消费者,每个分区只能被组内一个消费者消费(但一个消费者可能处理多个分区)。

假设一个主题有3个分区,消费者组包含2个消费者:

  • 可能的分配方案:消费者A处理分区1和2,消费者B仅处理分区3

KIP-500更新 中,Kafka引入了基于KRaft的新共识算法。当组内消费者增减时,KRaft会自动在剩余消费者间重新均衡分区,确保所有分区都有消费者处理

5. 配置应用程序

本节将创建主题、消费者和生产者的配置类。

5.1. 主题配置

首先创建主题配置类:

@Configuration
public class KafkaTopicConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    public NewTopic celciusTopic() {
        return TopicBuilder.name("celcius-scale-topic")
                .partitions(2)
                .build();
    }
}

KafkaTopicConfig 注入了两个Spring Bean:

  • KafkaAdmin:初始化Kafka集群并指定网络地址
  • NewTopic:创建名为celcius-scale-topic的主题(含2个分区)

5.2. 消费者和生产者配置

创建生产者配置类:

public class KafkaProducerConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, Double> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DoubleSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Double> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

KafkaProducerConfig 注入两个Bean:

  • ProducerFactory:指定事件序列化方式和目标服务器
  • KafkaTemplate:在服务类中用于发送事件

5.3. Kafka生产者服务

创建核心生产者服务:

public class ThermostatService {

    private final KafkaTemplate<String, Double> kafkaTemplate;

    public ThermostatService(KafkaTemplate<String, Double> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void measureCelsiusAndPublish(int numMeasurements) {
        new Random().doubles(25, 35)
                .limit(numMeasurements)
                .forEach(tmp -> {
                    kafkaTemplate.send("celcius-scale-topic", tmp);
                });
    }
}

ThermostatService 提供核心方法 measureCelsiusAndPublish

  • 生成[25,35)范围内的随机温度值
  • 通过 kafkaTemplate.send() 将事件发布到 celsius-scale-topic

6. 生产和消费事件

本节演示如何配置消费者读取主题事件(使用嵌入式Kafka)。

6.1. 创建消费者服务

创建消费者类:

@Service
public class TemperatureConsumer {
    Map<String, Set<String>> consumedRecords = new ConcurrentHashMap<>();

    @KafkaListener(topics = "celcius-scale-topic", groupId = "group-1")
    public void consumer1(ConsumerRecord<?, ?> consumerRecord) {
        trackConsumedPartitions("consumer-1", consumerRecord.partition());
    }

    private void trackConsumedPartitions(String consumerName, int partitionNumber) {
        consumedRecords.computeIfAbsent(consumerName, k -> new HashSet<>());
        consumedRecords.computeIfPresent(consumerName, (k, v) -> {
            v.add(String.valueOf(partitionNumber));
            return v;
        });
    }
}

关键点说明:

  • @KafkaListener 注解创建消费者
    • topics:指定监听主题
    • groupId:标识所属消费者组
  • 使用 ConcurrentHashMap 跟踪消费记录:
    • Key:消费者名称
    • Value:消费的分区集合

6.2. 创建测试类

创建集成测试:

@SpringBootTest(classes = ThermostatApplicationKafkaApp.class)
@EmbeddedKafka(partitions = 2, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
public class KafkaTopicsAndPartitionsIntegrationTest {
    @ClassRule
    public static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, true, "multitype");

    @Autowired
    private ThermostatService service;

    @Autowired
    private TemperatureConsumer consumer;

    @Test
    public void givenTopic_andConsumerGroup_whenConsumersListenToEvents_thenConsumeItCorrectly() throws Exception {
        service.measureCelsiusAndPublish(10000);
        Thread.sleep(1000);
        System.out.println(consumer.consumedRecords);
    }
}

测试要点: ✅ 使用 @EmbeddedKafka 启动嵌入式Kafka ✅ 通过JUnit规则管理Broker生命周期 ✅ 生产10,000个事件后等待1秒(确保消费者就绪) ✅ 输出分区消费记录

测试输出示例:

{consumer-1=[0, 1]}

这表明单个消费者处理了所有分区(0和1)。若增加不同消费者组的消费者,结果会变化。

7. 总结

本文系统阐述了Kafka主题与分区的核心概念及其关系,并通过嵌入式Kafka演示了消费者处理多分区的实际场景。掌握这些基础对构建高吞吐、可扩展的实时数据管道至关重要。

完整示例代码见 GitHub仓库


原始标题:Understanding Kafka Topics and Partitions