1. 概述
本文将介绍如何处理使用Spring Kafka消费Avro消息时出现的"未知魔数"错误及其他反序列化问题。我们将探索ErrorHandlingDeserializer的用法,并了解它如何帮助管理"毒丸"消息。
最后,我们将配置DefaultErrorHandler和DeadLetterPublishingRecoverer,将问题记录路由到DLQ主题,确保消费者能持续处理而不会卡死。
2. 毒丸消息与魔数
有时我们会收到因格式问题或意外内容而无法处理的消息——这些被称为毒丸消息。与其无休止地尝试处理它们,不如优雅地处理这些消息。
在Kafka中,当消费者期望接收Avro编码数据却收到其他内容时,就会出现毒丸消息。 例如,使用StringSerializer的生产者向期望Avro编码数据的主题发送纯文本消息,会导致消费者端的AvroDeserializer失败:
因此我们会收到带有*"未知魔数"信息的反序列化错误。"魔数"是Avro编码消息开头的标记,帮助反序列化器正确识别和处理消息。* 如果消息不是用Avro序列化器序列化的,且不以该字节开头,反序列化器就会抛出格式不匹配的错误。
3. 复现问题
为复现问题,我们使用一个简单的Spring Boot应用,它从Kafka主题消费Avro格式的消息。应用依赖spring-kafka、avro和kafka-avro-deserialzier:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.9.1</version>
</dependency>
服务使用*@KafkaListener监听"baeldung.article.published"主题的所有消息。为演示方便,我们将所有传入消息的文章标题存储在内存的List*中:
@Component
class AvroMagicByteApp {
// logger
List<String> blog = new ArrayList<>();
@KafkaListener(topics = "baeldung.article.published")
public void listen(Article article) {
LOG.info("a new article was published: {}", article);
blog.add(article.getTitle());
}
}
接下来添加Kafka特定配置。由于使用Spring Boot内置的Testcontainers支持,可以省略bootstrap-servers属性(测试时会自动注入)。将schema.registry.url设为"mock://test",因为测试中不使用真实的schema registry:
spring:
kafka:
# bootstrap-servers <-- 测试时由Spring和Testcontainers自动注入
consumer:
group-id: test-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: mock://test
specific.avro.reader: true
现在可以用Testcontainers启动Kafka容器测试应用的正常流程。
但如果我们向测试主题发送毒丸消息,就会遇到*"未知魔数!"*异常。 为生成违规消息,我们使用配置了StringSerializer的KafkaTemplate实例,向主题发送虚拟字符串:
@SpringBootTest
class AvroMagicByteLiveTest {
@Container
@ServiceConnection
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("apache/kafka:4.0.0"));
@Test
void whenSendingMalformedMessage_thenSendToDLQ() throws Exception {
stringKafkaTemplate()
.send("baeldung.article.published", "not a valid avro message!")
.get();
Thread.sleep(10_000L);
// 手动验证毒丸消息是否被正确处理
}
private static KafkaTemplate<Object, Object> stringKafkaTemplate() { /* ... */ }
}
我们还临时添加了*Thread.sleep()来观察应用日志。果然,服务无法反序列化消息,出现"未知魔数!"*错误:
ERROR o.s.k.l.KafkaMessageListenerContainer - Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; __please consider configuring an 'ErrorHandlingDeserializer'__ in the value and/or key deserializer
at org.springframework.kafka...DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:192)
[...]
Caused by: org.apache.kafka...RecordDeserializationException:
Error deserializing VALUE for partition baeldung.article.published-0 at offset 1.
__If needed, please seek past the record to continue consumption.__
at org.apache.kafka.clients...CompletedFetch.newRecordDeserializationException(CompletedFetch.java:346)
[...]
Caused by: org.apache.kafka...errors.SerializationException: __Unknown magic byte!__
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:649)
[...]
更糟的是,由于未正确处理且从未确认消息,这个错误会反复出现。简单说,消费者卡在该偏移量上,不断尝试处理畸形消息。
4. 错误处理反序列化器
幸运的是,错误日志很详细,甚至给出了解决方案:
This error handler cannot process 'SerializationException's directly;
please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer.
Spring Kafka中的ErrorHandlingDeserializer是个包装器,能捕获反序列化错误并让应用优雅处理,防止消费者崩溃。它将实际反序列化委托给其他反序列化器(如JsonDeserializer或KafkaAvroDeserializer),并捕获过程中抛出的任何异常。
配置时更新value-deserializer为ErrorHandlingDeserializer,并在spring.kafka.consumer.spring.deserializer.value.delegate.class中指定原始反序列化器:
spring.kafka:
consumer:
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
配置后,*"未知魔数!"异常只在日志中出现一次。*这次应用优雅地处理了毒丸消息,继续处理后续消息,不再尝试反序列化它。
5. 发布到DLQ
目前我们已为消息负载配置了ErrorHandlingDeserializer,正确处理了毒丸场景。但如果只是捕获异常后继续,就难以检查或恢复这些故障消息。为此应考虑将它们发送到DLQ主题。
死信队列(DLQ)是用于存储一次或多次尝试处理仍失败消息的特殊主题。 在应用中启用此行为:
@Configuration
class DlqConfig {
@Bean
DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer dlqPublishingRecoverer) {
return new DefaultErrorHandler(dlqPublishingRecoverer);
}
@Bean
DeadLetterPublishingRecoverer dlqPublishingRecoverer(KafkaTemplate<byte[], byte[]> bytesKafkaTemplate) {
return new DeadLetterPublishingRecoverer(bytesKafkaTemplate);
}
@Bean("bytesKafkaTemplate")
KafkaTemplate<?, ?> bytesTemplate(ProducerFactory<?, ?> kafkaProducerFactory) {
return new KafkaTemplate<>(kafkaProducerFactory,
Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()));
}
}
可以看到,我们定义了DefaultErrorHandler bean,用于确定哪些异常可重试。反序列化异常被视为不可重试,因此会直接发送到DLQ。 创建错误处理器时,通过构造函数注入DeadLetterPublishingRecoverer实例。
另一方面,dlqPublishingRecoverer 使用配置了ByteArraySerializer的KafkaTemplate将失败消息转发到DLQ主题,因为毒丸消息的确切格式未知。 它还负责解析DLQ主题名;默认会在原主题名后添加"-dlt":
@Test
void whenSendingMalformedMessage_thenSendToDLQ() throws Exception {
stringKafkaTemplate()
.send("baeldung.article.published", "not a valid avro message!")
.get();
var dlqRecord = listenForOneMessage("baeldung.article.published-dlt", ofSeconds(5L));
assertThat(dlqRecord.value())
.isEqualTo("not a valid avro message!");
}
private static ConsumerRecord<?, ?> listenForOneMessage(String topic, Duration timeout) {
return KafkaTestUtils.getOneRecord(
kafka.getBootstrapServers(), "test-group-id", topic, 0, false, true, timeout);
}
由此可见,配置ErrorHandlingDeserializer让我们能优雅处理畸形消息。自定义的DefaultErrorHandler和DeadLetterPublishingRecoverer bean则能将这些故障消息推送到DLQ主题。
6. 总结
本教程介绍了如何解决使用Spring Kafka处理Avro消息时出现的*"未知魔数"错误及其他反序列化问题。我们探讨了ErrorHandlingDeserializer*如何防止消费者被问题消息阻塞。
最后我们回顾了死信队列的概念,并配置Spring Kafka bean将毒丸消息路由到专用DLQ主题,确保处理流程顺畅不间断。
本文代码可在GitHub获取。