1. 概述

本文将介绍如何处理使用Spring Kafka消费Avro消息时出现的"未知魔数"错误及其他反序列化问题。我们将探索ErrorHandlingDeserializer的用法,并了解它如何帮助管理"毒丸"消息。

最后,我们将配置DefaultErrorHandlerDeadLetterPublishingRecoverer,将问题记录路由到DLQ主题,确保消费者能持续处理而不会卡死。

2. 毒丸消息与魔数

有时我们会收到因格式问题或意外内容而无法处理的消息——这些被称为毒丸消息。与其无休止地尝试处理它们,不如优雅地处理这些消息。

在Kafka中,当消费者期望接收Avro编码数据却收到其他内容时,就会出现毒丸消息。 例如,使用StringSerializer的生产者向期望Avro编码数据的主题发送纯文本消息,会导致消费者端的AvroDeserializer失败:

毒丸消息示例

因此我们会收到带有*"未知魔数"信息的反序列化错误。"魔数"是Avro编码消息开头的标记,帮助反序列化器正确识别和处理消息。* 如果消息不是用Avro序列化器序列化的,且不以该字节开头,反序列化器就会抛出格式不匹配的错误。

3. 复现问题

为复现问题,我们使用一个简单的Spring Boot应用,它从Kafka主题消费Avro格式的消息。应用依赖spring-kafkaavrokafka-avro-deserialzier

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>7.9.1</version>
</dependency>

服务使用*@KafkaListener监听"baeldung.article.published"主题的所有消息。为演示方便,我们将所有传入消息的文章标题存储在内存的List*中:

@Component
class AvroMagicByteApp {

    // logger
    List<String> blog = new ArrayList<>();

    @KafkaListener(topics = "baeldung.article.published")
    public void listen(Article article) {
        LOG.info("a new article was published: {}", article);
        blog.add(article.getTitle());
    }
}

接下来添加Kafka特定配置。由于使用Spring Boot内置的Testcontainers支持,可以省略bootstrap-servers属性(测试时会自动注入)。将schema.registry.url设为"mock://test",因为测试中不使用真实的schema registry:

spring:
  kafka:
#    bootstrap-servers <-- 测试时由Spring和Testcontainers自动注入
    consumer:
      group-id: test-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      properties:
        schema.registry.url: mock://test
        specific.avro.reader: true

现在可以用Testcontainers启动Kafka容器测试应用的正常流程。

但如果我们向测试主题发送毒丸消息,就会遇到*"未知魔数!"*异常。 为生成违规消息,我们使用配置了StringSerializerKafkaTemplate实例,向主题发送虚拟字符串:

@SpringBootTest
class AvroMagicByteLiveTest {

    @Container
    @ServiceConnection
    static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("apache/kafka:4.0.0"));

    @Test
    void whenSendingMalformedMessage_thenSendToDLQ() throws Exception {
        stringKafkaTemplate()
          .send("baeldung.article.published", "not a valid avro message!")
          .get();

        Thread.sleep(10_000L);
        // 手动验证毒丸消息是否被正确处理
    }

    private static KafkaTemplate<Object, Object> stringKafkaTemplate() { /* ... */ }
}

我们还临时添加了*Thread.sleep()来观察应用日志。果然,服务无法反序列化消息,出现"未知魔数!"*错误:

ERROR o.s.k.l.KafkaMessageListenerContainer - Consumer exception
  java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; __please consider configuring an 'ErrorHandlingDeserializer'__ in the value and/or key deserializer
  at org.springframework.kafka...DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:192)
   [...]
Caused by: org.apache.kafka...RecordDeserializationException: 
Error deserializing VALUE for partition baeldung.article.published-0 at offset 1. 
  __If needed, please seek past the record to continue consumption.__
  at org.apache.kafka.clients...CompletedFetch.newRecordDeserializationException(CompletedFetch.java:346)
   [...]
Caused by: org.apache.kafka...errors.SerializationException: __Unknown magic byte!__
  at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:649)
   [...]

更糟的是,由于未正确处理且从未确认消息,这个错误会反复出现。简单说,消费者卡在该偏移量上,不断尝试处理畸形消息。

4. 错误处理反序列化器

幸运的是,错误日志很详细,甚至给出了解决方案:

This error handler cannot process 'SerializationException's directly; 
please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer.

Spring Kafka中的ErrorHandlingDeserializer是个包装器,能捕获反序列化错误并让应用优雅处理,防止消费者崩溃。它将实际反序列化委托给其他反序列化器(如JsonDeserializerKafkaAvroDeserializer),并捕获过程中抛出的任何异常。

配置时更新value-deserializerErrorHandlingDeserializer,并在spring.kafka.consumer.spring.deserializer.value.delegate.class中指定原始反序列化器:

spring.kafka:
  consumer:
    value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
    properties:
      spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer

配置后,*"未知魔数!"异常只在日志中出现一次。*这次应用优雅地处理了毒丸消息,继续处理后续消息,不再尝试反序列化它。

5. 发布到DLQ

目前我们已为消息负载配置了ErrorHandlingDeserializer,正确处理了毒丸场景。但如果只是捕获异常后继续,就难以检查或恢复这些故障消息。为此应考虑将它们发送到DLQ主题。

死信队列(DLQ)是用于存储一次或多次尝试处理仍失败消息的特殊主题。 在应用中启用此行为:

@Configuration
class DlqConfig {

    @Bean
    DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer dlqPublishingRecoverer) {
        return new DefaultErrorHandler(dlqPublishingRecoverer);
    }

    @Bean
    DeadLetterPublishingRecoverer dlqPublishingRecoverer(KafkaTemplate<byte[], byte[]> bytesKafkaTemplate) {
        return new DeadLetterPublishingRecoverer(bytesKafkaTemplate);
    }

    @Bean("bytesKafkaTemplate")
    KafkaTemplate<?, ?> bytesTemplate(ProducerFactory<?, ?> kafkaProducerFactory) {
        return new KafkaTemplate<>(kafkaProducerFactory, 
          Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()));
    }
}

可以看到,我们定义了DefaultErrorHandler bean,用于确定哪些异常可重试。反序列化异常被视为不可重试,因此会直接发送到DLQ。 创建错误处理器时,通过构造函数注入DeadLetterPublishingRecoverer实例。

另一方面,dlqPublishingRecoverer 使用配置了ByteArraySerializerKafkaTemplate将失败消息转发到DLQ主题,因为毒丸消息的确切格式未知。 它还负责解析DLQ主题名;默认会在原主题名后添加"-dlt":

@Test
void whenSendingMalformedMessage_thenSendToDLQ() throws Exception {
    stringKafkaTemplate()
      .send("baeldung.article.published", "not a valid avro message!")
      .get();

    var dlqRecord = listenForOneMessage("baeldung.article.published-dlt", ofSeconds(5L));

    assertThat(dlqRecord.value())
      .isEqualTo("not a valid avro message!");
}

private static ConsumerRecord<?, ?> listenForOneMessage(String topic, Duration timeout) {
    return KafkaTestUtils.getOneRecord(
      kafka.getBootstrapServers(), "test-group-id", topic, 0, false, true, timeout);
}

由此可见,配置ErrorHandlingDeserializer让我们能优雅处理畸形消息。自定义的DefaultErrorHandlerDeadLetterPublishingRecoverer bean则能将这些故障消息推送到DLQ主题。

6. 总结

本教程介绍了如何解决使用Spring Kafka处理Avro消息时出现的*"未知魔数"错误及其他反序列化问题。我们探讨了ErrorHandlingDeserializer*如何防止消费者被问题消息阻塞。

最后我们回顾了死信队列的概念,并配置Spring Kafka bean将毒丸消息路由到专用DLQ主题,确保处理流程顺畅不间断。

本文代码可在GitHub获取。


« 上一篇: Java 25 新特性详解
» 下一篇: Spring gRPC 项目指南