1. 概述
在 Kafka 中,定位(Seek)操作类似于在磁盘读取数据前找到数据存储的位置。消费者必须先定位到分区的正确位置,才能读取数据。
Kafka 中的 offset 是一个唯一且持续递增的数字,用于标记事件记录在分区中的位置。消费组中的每个消费者都会为每个分区维护自己的 offset,以跟踪消费进度。
消费者可能需要在分区的不同位置处理消息,典型场景包括:
- 重放历史事件
- 跳过特定消息
- 直接消费最新消息
本教程将探讨如何使用 Spring Kafka API 实现分区中不同位置的定位操作。
2. 使用 Java API 定位
默认情况下,消费者从分区开头读取消息并持续监听新消息。但实际开发中,我们常需要从特定位置、时间点或相对位置开始读取。下面介绍一个提供多种定位方式的 API 实现。
2.1. 按 Offset 定位
Spring Kafka 提供了 seek()
方法,用于将消费者直接定位到分区的指定 offset。
@GetMapping("partition/{partition}/offset/{offset}")
public ResponseEntity<Response> getOneByPartitionAndOffset(@PathVariable("partition") int partition,
@PathVariable("offset") int offset) {
try (KafkaConsumer<String, String> consumer =
(KafkaConsumer<String, String>) consumerFactory.createConsumer()) {
TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, partition);
consumer.assign(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, offset);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
Iterator<ConsumerRecord<String, String>> recordIterator = records.iterator();
if (recordIterator.hasNext()) {
ConsumerRecord<String, String> consumerRecord = recordIterator.next();
Response response = new Response(consumerRecord.partition(),
consumerRecord.offset(), consumerRecord.value());
return new ResponseEntity<>(response, HttpStatus.OK);
}
}
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}
这个 API 通过接口 partition/{partition}/offset/{offset}
接收分区号和 offset,调用 seek()
定位后返回指定位置的消息。响应模型包含分区、offset 和消息内容:
public record Response(int partition, long offset, String value) { }
⚠️ 注意:当前实现仅返回单条记录,实际应用中可扩展为批量获取。未处理 offset 不可用的情况,生产环境需增加异常处理。
测试前准备数据(向分区 0 发送 5 条测试消息):
@BeforeAll
static void beforeAll() {
testKafkaProducer = new KafkaProducer<>(props);
int partition = 0;
IntStream.range(0, 5)
.forEach(m -> {
String key = String.valueOf(new Random().nextInt());
String value = "Message no : %s".formatted(m);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME,
partition,
key,
value
);
try {
testKafkaProducer.send(record).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
测试验证(获取 offset 2 的消息):
@Test
void givenKafkaBrokerExists_whenSeekByPartition_thenMessageShouldBeRetrieved() {
this.webClient.get()
.uri("/seek/api/v1/partition/0/offset/2")
.exchange()
.expectStatus()
.isOk()
.expectBody(String.class)
.isEqualTo("{\"partition\":0,\"offset\":2,\"value\":\"Message no : 2\"}");
}
✅ 成功返回 offset 2 处的第三条消息。
2.2. 定位到分区开头
seekToBeginning()
方法将消费者定位到分区开头,从第一条消息开始读取。
@GetMapping("partition/{partition}/beginning")
public ResponseEntity<Response> getOneByPartitionToBeginningOffset(@PathVariable("partition") int partition) {
try (KafkaConsumer<String, String> consumer =
(KafkaConsumer<String, String>) consumerFactory.createConsumer()) {
TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, partition);
consumer.assign(Collections.singletonList(topicPartition));
consumer.seekToBeginning(Collections.singleton(topicPartition));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
Iterator<ConsumerRecord<String, String>> recordIterator = records.iterator();
if (recordIterator.hasNext()) {
ConsumerRecord<String, String> consumerRecord = recordIterator.next();
Response response = new Response(consumerRecord.partition(),
consumerRecord.offset(), consumerRecord.value());
return new ResponseEntity<>(response, HttpStatus.OK);
}
}
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}
通过接口 partition/{partition}/beginning
实现分区开头定位。测试验证:
@Test
void givenKafkaBrokerExists_whenSeekByBeginning_thenFirstMessageShouldBeRetrieved() {
this.webClient.get()
.uri("/seek/api/v1/partition/0/beginning")
.exchange()
.expectStatus()
.isOk()
.expectBody(String.class)
.isEqualTo("{\"partition\":0,\"offset\":0,\"value\":\"Message no : 0\"}");
}
✅ 成功返回 offset 0 处的首条消息。
2.3. 定位到分区结尾
seekToEnd()
方法将消费者定位到分区结尾,准备消费未来追加的消息。
@GetMapping("partition/{partition}/end")
public ResponseEntity<Long> getOneByPartitionToEndOffset(@PathVariable("partition") int partition) {
try (KafkaConsumer<String, String> consumer =
(KafkaConsumer<String, String>) consumerFactory.createConsumer()) {
TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, partition);
consumer.assign(Collections.singletonList(topicPartition));
consumer.seekToEnd(Collections.singleton(topicPartition));
return new ResponseEntity<>(consumer.position(topicPartition), HttpStatus.OK);
}
}
接口 partition/{partition}/end
返回分区结尾的下一个可用 offset(即最后一条消息 offset + 1)。测试验证:
@Test
void givenKafkaBrokerExists_whenSeekByEnd_thenLastMessageShouldBeRetrieved() {
this.webClient.get()
.uri("/seek/api/v1/partition/0/end")
.exchange()
.expectStatus()
.isOk()
.expectBody(Long.class)
.isEqualTo(5L);
}
✅ 返回 5L(最后一条消息 offset 为 4,下一个位置是 5)。
2.4. 实现 ConsumerSeekAware 类
除了直接使用消费者 API,Spring Kafka 还提供了 AbstractConsumerSeekAware
抽象类。该类允许消费者动态控制分区定位,支持在分区分配时指定 offset 或时间戳。
核心方法 seekRelative()
实现相对位置定位:
void seekRelative(java.lang.String topic, int partition, long offset, boolean toCurrent)
参数说明:
- topic: 目标 topic 名称
- partition: 分区号
- offset: 相对偏移量(正负均可)
- toCurrent:
true
→ 相对于当前 offset 定位false
→ 相对于分区开头定位
示例:定位到分区最后一条消息(offset -1 相对于结尾):
@Component
class ConsumerListener extends AbstractConsumerSeekAware {
public static final Map<String, String> MESSAGES = new HashMap<>();
@Override
public void onPartitionsAssigned(Map<TopicPartition,
Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet()
.forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, false));
}
@KafkaListener(id = "test-seek", topics = "test-seek-topic")
public void listen(ConsumerRecord<String, String> in) {
MESSAGES.put(in.key(), in.value());
}
}
在 onPartitionsAssigned
回调中调用 seekRelative()
,参数 -1
表示从结尾向前移动一位。测试验证:
@Test
void givenKafkaBrokerExists_whenMessagesAreSent_ThenLastMessageShouldBeRetrieved() {
Map<String, String> messages = consumerListener.MESSAGES;
Assertions.assertEquals(1, messages.size());
Assertions.assertEquals("Message no : 4", messages.get("4"));
}
✅ 成功消费 offset 4 处的最后一条消息。
3. 总结
本教程探讨了 Kafka 消费者定位的两种实现方式:
1. 直接使用消费者 API
适用于需要精确控制读取位置的特定场景:
- ✅ 重放历史事件
- ✅ 跳过特定消息
- ✅ 基于 offset 的自定义逻辑
2. 实现 ConsumerSeekAware 类
更适合持续消费场景:
- ✅ 自动定期提交 offset(默认
enable.auto.commit=true
) - ✅ 支持时间戳定位和相对位置定位
- ✅ 在分区分配时动态调整位置
选择建议:
- 需要临时定位 → 使用消费者 API
- 长期运行的消费者 → 实现
ConsumerSeekAware