1. 概述

在 Kafka 中,定位(Seek)操作类似于在磁盘读取数据前找到数据存储的位置。消费者必须先定位到分区的正确位置,才能读取数据。

Kafka 中的 offset 是一个唯一且持续递增的数字,用于标记事件记录在分区中的位置。消费组中的每个消费者都会为每个分区维护自己的 offset,以跟踪消费进度。

消费者可能需要在分区的不同位置处理消息,典型场景包括:

  • 重放历史事件
  • 跳过特定消息
  • 直接消费最新消息

本教程将探讨如何使用 Spring Kafka API 实现分区中不同位置的定位操作。

2. 使用 Java API 定位

默认情况下,消费者从分区开头读取消息并持续监听新消息。但实际开发中,我们常需要从特定位置、时间点或相对位置开始读取。下面介绍一个提供多种定位方式的 API 实现。

2.1. 按 Offset 定位

Spring Kafka 提供了 seek() 方法,用于将消费者直接定位到分区的指定 offset。

@GetMapping("partition/{partition}/offset/{offset}")
public ResponseEntity<Response> getOneByPartitionAndOffset(@PathVariable("partition") int partition,
  @PathVariable("offset") int offset) {
    try (KafkaConsumer<String, String> consumer = 
      (KafkaConsumer<String, String>) consumerFactory.createConsumer()) {
          TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, partition);
          consumer.assign(Collections.singletonList(topicPartition));
          consumer.seek(topicPartition, offset);
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
          Iterator<ConsumerRecord<String, String>> recordIterator = records.iterator();
          if (recordIterator.hasNext()) {
              ConsumerRecord<String, String> consumerRecord = recordIterator.next();
              Response response = new Response(consumerRecord.partition(),
                consumerRecord.offset(), consumerRecord.value());
              return new ResponseEntity<>(response, HttpStatus.OK);
        }
    }
    return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}

这个 API 通过接口 partition/{partition}/offset/{offset} 接收分区号和 offset,调用 seek() 定位后返回指定位置的消息。响应模型包含分区、offset 和消息内容:

public record Response(int partition, long offset, String value) { }

⚠️ 注意:当前实现仅返回单条记录,实际应用中可扩展为批量获取。未处理 offset 不可用的情况,生产环境需增加异常处理。

测试前准备数据(向分区 0 发送 5 条测试消息):

@BeforeAll
static void beforeAll() {
    testKafkaProducer = new KafkaProducer<>(props);
    int partition = 0;
    IntStream.range(0, 5)
      .forEach(m -> {
        String key = String.valueOf(new Random().nextInt());
        String value = "Message no : %s".formatted(m);
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME,
          partition,
          key,
          value
          );
        try {
          testKafkaProducer.send(record).get();
        } catch (InterruptedException | ExecutionException e) {
          throw new RuntimeException(e);
        }
        });
}

测试验证(获取 offset 2 的消息):

@Test
void givenKafkaBrokerExists_whenSeekByPartition_thenMessageShouldBeRetrieved() {
    this.webClient.get()
      .uri("/seek/api/v1/partition/0/offset/2")
      .exchange()
      .expectStatus()
      .isOk()
      .expectBody(String.class)
      .isEqualTo("{\"partition\":0,\"offset\":2,\"value\":\"Message no : 2\"}");
}

✅ 成功返回 offset 2 处的第三条消息。

2.2. 定位到分区开头

seekToBeginning() 方法将消费者定位到分区开头,从第一条消息开始读取。

@GetMapping("partition/{partition}/beginning")
public ResponseEntity<Response> getOneByPartitionToBeginningOffset(@PathVariable("partition") int partition) {
    try (KafkaConsumer<String, String> consumer = 
      (KafkaConsumer<String, String>) consumerFactory.createConsumer()) {
        TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, partition);
        consumer.assign(Collections.singletonList(topicPartition));
        consumer.seekToBeginning(Collections.singleton(topicPartition));
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        Iterator<ConsumerRecord<String, String>> recordIterator = records.iterator();
        if (recordIterator.hasNext()) {
            ConsumerRecord<String, String> consumerRecord = recordIterator.next();
            Response response = new Response(consumerRecord.partition(),
              consumerRecord.offset(), consumerRecord.value());
            return new ResponseEntity<>(response, HttpStatus.OK);
        }
    }
    return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}

通过接口 partition/{partition}/beginning 实现分区开头定位。测试验证:

@Test
void givenKafkaBrokerExists_whenSeekByBeginning_thenFirstMessageShouldBeRetrieved() {
    this.webClient.get()
      .uri("/seek/api/v1/partition/0/beginning")
      .exchange()
      .expectStatus()
      .isOk()
      .expectBody(String.class)
      .isEqualTo("{\"partition\":0,\"offset\":0,\"value\":\"Message no : 0\"}");
}

✅ 成功返回 offset 0 处的首条消息。

2.3. 定位到分区结尾

seekToEnd() 方法将消费者定位到分区结尾,准备消费未来追加的消息。

@GetMapping("partition/{partition}/end")
public ResponseEntity<Long> getOneByPartitionToEndOffset(@PathVariable("partition") int partition) {
    try (KafkaConsumer<String, String> consumer = 
      (KafkaConsumer<String, String>) consumerFactory.createConsumer()) {
        TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, partition);
        consumer.assign(Collections.singletonList(topicPartition));
        consumer.seekToEnd(Collections.singleton(topicPartition));
        return new ResponseEntity<>(consumer.position(topicPartition), HttpStatus.OK);
    }
}

接口 partition/{partition}/end 返回分区结尾的下一个可用 offset(即最后一条消息 offset + 1)。测试验证:

@Test
void givenKafkaBrokerExists_whenSeekByEnd_thenLastMessageShouldBeRetrieved() {
    this.webClient.get()
      .uri("/seek/api/v1/partition/0/end")
      .exchange()
      .expectStatus()
      .isOk()
      .expectBody(Long.class)
      .isEqualTo(5L);
}

✅ 返回 5L(最后一条消息 offset 为 4,下一个位置是 5)。

2.4. 实现 ConsumerSeekAware

除了直接使用消费者 API,Spring Kafka 还提供了 AbstractConsumerSeekAware 抽象类。该类允许消费者动态控制分区定位,支持在分区分配时指定 offset 或时间戳。

核心方法 seekRelative() 实现相对位置定位:

void seekRelative​(java.lang.String topic, int partition, long offset, boolean toCurrent)

参数说明:

  • topic: 目标 topic 名称
  • partition: 分区号
  • offset: 相对偏移量(正负均可)
  • toCurrent:
    • true → 相对于当前 offset 定位
    • false → 相对于分区开头定位

示例:定位到分区最后一条消息(offset -1 相对于结尾):

@Component
class ConsumerListener extends AbstractConsumerSeekAware {

    public static final Map<String, String> MESSAGES = new HashMap<>();

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, 
      Long> assignments, ConsumerSeekCallback callback) {
        assignments.keySet()
          .forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, false));
    }

    @KafkaListener(id = "test-seek", topics = "test-seek-topic")
    public void listen(ConsumerRecord<String, String> in) {
        MESSAGES.put(in.key(), in.value());
    }
}

onPartitionsAssigned 回调中调用 seekRelative(),参数 -1 表示从结尾向前移动一位。测试验证:

@Test
void givenKafkaBrokerExists_whenMessagesAreSent_ThenLastMessageShouldBeRetrieved() {
    Map<String, String> messages = consumerListener.MESSAGES;
    Assertions.assertEquals(1, messages.size());
    Assertions.assertEquals("Message no : 4", messages.get("4"));
}

✅ 成功消费 offset 4 处的最后一条消息。

3. 总结

本教程探讨了 Kafka 消费者定位的两种实现方式:

1. 直接使用消费者 API
适用于需要精确控制读取位置的特定场景:

  • ✅ 重放历史事件
  • ✅ 跳过特定消息
  • ✅ 基于 offset 的自定义逻辑

2. 实现 ConsumerSeekAware
更适合持续消费场景:

  • ✅ 自动定期提交 offset(默认 enable.auto.commit=true
  • ✅ 支持时间戳定位和相对位置定位
  • ✅ 在分区分配时动态调整位置

选择建议:

  • 需要临时定位 → 使用消费者 API
  • 长期运行的消费者 → 实现 ConsumerSeekAware

原始标题:Consumer Seek in Kafka | Baeldung