1. 简介

Apache Kafka 是一个强大的分布式流处理平台,擅长处理海量实时数据流。Kafka 将数据组织到 主题(topics) 中,并进一步将主题划分为多个分区。每个分区都是独立的通道,支持并行处理和容错

本教程将深入探讨在 Kafka 中向特定分区发送数据的技巧。我们将分析这种方案的优势、实现方法以及潜在挑战。

2. 理解 Kafka 分区

2.1 什么是 Kafka 分区

当生产者向 Kafka 主题发送消息时,Kafka 会根据指定的分区策略将消息组织到不同分区中。分区是 Kafka 的基本单元,代表一个线性、有序的消息序列。消息一旦被生产,就会根据分区策略分配到特定分区,随后追加到该分区的日志末尾

2.2 并行处理与消费者组

一个 Kafka 主题可划分为多个分区,而 消费者组 可以被分配到这些分区的子集。组内每个消费者独立处理分配给它的分区消息。这种并行处理机制显著提升了整体吞吐量和可扩展性,使 Kafka 能高效处理海量数据。

2.3 顺序性与处理保证

在单个分区内,Kafka 保证消息按接收顺序处理。这对依赖 消息顺序 的应用(如金融交易或事件日志)至关重要。但需注意,由于网络延迟等因素,消息接收顺序可能与原始发送顺序不同。

跨分区时,Kafka 不保证消息顺序。不同分区的消息可能被并发处理,导致事件顺序变化。设计依赖严格消息顺序的应用时必须考虑这一点。

2.4 容错与高可用性

分区也是 Kafka 实现卓越容错性的关键。每个分区可跨多个 Broker 复制。当某个 Broker 故障时,副本分区仍可访问,确保数据连续可用。

Kafka 集群能自动将消费者重定向到健康的 Broker,维持数据可用性和系统高可靠性。

3. 为何要向特定分区发送数据

3.1 数据亲和性

数据亲和性指将相关数据有意分配到同一分区。通过将相关数据发送到特定分区,确保它们被一起处理,提升处理效率。

例如,在订单跟踪和分析场景中,我们可能希望将同一客户的所有订单放在同一分区。这简化了跟踪和分析流程。

3.2 负载均衡

均匀分布数据到各分区可优化资源利用率。通过基于负载考虑将数据发送到不同分区,能避免资源瓶颈,确保每个分区承受均衡的工作负载。

3.3 优先级处理

某些场景下,数据优先级不同。Kafka 的分区能力支持将关键数据定向到专用分区以加速处理。这确保高优先级消息比低优先级消息获得更快处理。

4. 向特定分区发送的方法

Kafka 提供多种分区分配策略,实现灵活的数据分布和处理。以下是向特定分区发送消息的常用方法:

4.1 粘性分区器

Kafka 2.4+ 版本中,粘性分区器尝试将无键消息保持在同一分区。但此行为并非绝对,会受 batch.sizelinger.ms 等批处理设置影响。

为优化消息传递,Kafka 在发送前将消息分组为批次。batch.size(默认 16,384 字节)控制最大批次大小,影响无键消息在粘性分区器下的停留时长。

linger.ms(默认 0 毫秒)在发送批次前引入延迟,可能延长无键消息的粘性行为。

以下测试用例假设使用默认批处理配置。我们发送三条无显式键的消息,预期它们最初被分配到同一分区:

kafkaProducer.send("default-topic", "message1");
kafkaProducer.send("default-topic", "message2");
kafkaProducer.send("default-topic", "message3");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 3);

List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();

Set<Integer> uniquePartitions = records.stream()
  .map(ReceivedMessage::getPartition)
  .collect(Collectors.toSet());

Assert.assertEquals(1, uniquePartitions.size());

4.2 基于键的方法

基于键的方法中,Kafka 将相同键的消息路由到同一分区,优化相关数据处理。这通过哈希函数实现,确保消息键到分区的确定性映射。

以下测试用例中,相同键 partitionA 的消息应始终落在同一分区:

kafkaProducer.send("order-topic", "partitionA", "critical data");
kafkaProducer.send("order-topic", "partitionA", "more critical data");
kafkaProducer.send("order-topic", "partitionB", "another critical message");
kafkaProducer.send("order-topic", "partitionA", "another more critical data");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 4);

List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();
Map<String, List<ReceivedMessage>> messagesByKey = groupMessagesByKey(records);

messagesByKey.forEach((key, messages) -> {
    int expectedPartition = messages.get(0)
      .getPartition();
    for (ReceivedMessage message : messages) {
        assertEquals("Messages with key '" + key + "' should be in the same partition", message.getPartition(), expectedPartition);
    }
});

此外,基于键的方法保证相同键的消息在分区内按生产顺序接收。这对相关消息的顺序处理至关重要。

以下测试用例验证相同键消息在分区内按生产顺序接收:

kafkaProducer.send("order-topic", "partitionA", "message1");
kafkaProducer.send("order-topic", "partitionA", "message3");
kafkaProducer.send("order-topic", "partitionA", "message4");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 3);

List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();

StringBuilder resultMessage = new StringBuilder();
records.forEach(record -> resultMessage.append(record.getMessage()));
String expectedMessage = "message1message3message4";

assertEquals("Messages with the same key should be received in the order they were produced within a partition", 
  expectedMessage, resultMessage.toString());

4.3 自定义分区器

Kafka 允许通过实现 Partitioner 接口定义自定义分区器,实现细粒度控制。可根据消息内容、元数据等因素编写逻辑确定目标分区。

本节我们将创建基于客户类型的自定义分区逻辑:将高级客户订单定向到一个分区,普通客户订单定向到另一个分区。

首先创建 CustomPartitioner 类,继承 Kafka 的 Partitioner 接口。重写 partition() 方法实现自定义逻辑:

public class CustomPartitioner implements Partitioner {
    private static final int PREMIUM_PARTITION = 0;
    private static final int NORMAL_PARTITION = 1;

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        String customerType = extractCustomerType(key.toString());
        return "premium".equalsIgnoreCase(customerType) ? PREMIUM_PARTITION : NORMAL_PARTITION;
    }

    private String extractCustomerType(String key) {
        String[] parts = key.split("_");
        return parts.length > 1 ? parts[1] : "normal";
    }
   
    // more methods
}

接下来,在生产者配置中设置 PARTITIONER_CLASS_CONFIG 属性以应用自定义分区器:

private KafkaTemplate<String, String> setProducerToUseCustomPartitioner() {
    Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafkaBroker.getBrokersAsString());
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    producerProps.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
    DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerProps);

    return new KafkaTemplate<>(producerFactory);
}

然后构造测试用例验证自定义分区逻辑:

KafkaTemplate<String, String> kafkaTemplate = setProducerToUseCustomPartitioner();

kafkaTemplate.send("order-topic", "123_premium", "Order 123, Premium order message");
kafkaTemplate.send("order-topic", "456_normal", "Normal order message");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 2);

consumer.assign(Collections.singletonList(new TopicPartition("order-topic", 0)));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
    assertEquals("Premium order message should be in partition 0", 0, record.partition());
    assertEquals("123_premium", record.key());
}

4.4 直接分区分配

在主题间手动迁移数据或调整分区数据分布时,直接分区分配可控制消息位置。Kafka 提供通过 ProducerRecord 构造函数指定分区号发送消息的能力。通过指定分区号,可显式控制每条消息的目标分区

以下测试用例在 send() 方法中指定分区号作为第二个参数:

kafkaProducer.send("order-topic", 0, "123_premium", "Premium order message");
kafkaProducer.send("order-topic", 1, "456_normal", "Normal order message");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 2);

List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();

for (ReceivedMessage record : records) {
    if ("123_premium".equals(record.getKey())) {
        assertEquals("Premium order message should be in partition 0", 0, record.getPartition());
    } else if ("456_normal".equals(record.getKey())) {
        assertEquals("Normal order message should be in partition 1", 1, record.getPartition());
    }
}

5. 从特定分区消费

在消费者端,可使用 KafkaConsumer.assign() 方法指定要消费的分区。这提供了细粒度的消费控制,但需要手动管理分区偏移量。

以下示例使用 assign() 方法从特定分区消费消息:

KafkaTemplate<String, String> kafkaTemplate = setProducerToUseCustomPartitioner();

kafkaTemplate.send("order-topic", "123_premium", "Order 123, Premium order message");
kafkaTemplate.send("order-topic", "456_normal", "Normal order message");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 2);

consumer.assign(Collections.singletonList(new TopicPartition("order-topic", 0)));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
    assertEquals("Premium order message should be in partition 0", 0, record.partition());
    assertEquals("123_premium", record.key());
}

6. 潜在挑战与注意事项

向特定分区发送消息可能导致分区间负载不均。当分区逻辑未均匀分布消息时就会发生此问题。此外,扩展 Kafka 集群(增删 Broker)会触发分区重分配。重分配期间,Broker 可能迁移分区,导致消息顺序中断或临时不可用。

因此,应使用 Kafka 工具或指标定期监控各分区负载。例如 Kafka Admin Client 和 Micrometer 可帮助洞察分区健康状态和性能。Admin Client 可获取主题、分区及其当前状态信息,Micrometer 用于指标监控。

此外,需主动调整分区策略或横向扩展 Kafka 集群,以有效管理特定分区的负载增长。可考虑增加分区数量或调整键范围以实现更均匀的分布。

7. 总结

总之,在 Apache Kafka 中向特定分区发送消息的能力,为优化数据处理和提升系统效率提供了强大可能性。

本教程我们探讨了多种向特定分区发送消息的方法,包括基于键的方法、自定义分区器和直接分区分配。每种方法各有优势,可根据应用需求灵活选择。

一如既往,示例源代码可在 GitHub 获取。


原始标题:Sending Data to a Specific Partition in Kafka | Baeldung