1. 概述

本文将深入探讨Spring-Kafka中的RecordDeserializationException异常。我们将创建一个自定义错误处理器来捕获该异常,并跳过无效消息,使消费者能继续处理后续事件。

本文基于Spring Boot的Kafka模块,这些模块提供了与Kafka Broker交互的便捷工具。若需深入了解Kafka内部机制,建议回顾平台核心概念

2. 创建Kafka监听器

本文示例将构建一个监听baeldung.articles.published主题的小应用,处理传入消息。为展示自定义错误处理能力,应用需在遇到反序列化异常时继续消费消息。

Spring-Kafka版本将由父级Spring Boot POM自动解析,我们只需添加模块依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

该模块支持使用@KafkaListener注解(对Kafka消费者API的抽象)。我们利用它创建ArticlesPublishedListener组件,并引入EmailService组件处理每条消息:

@Component
class ArticlesPublishedListener {
    private final EmailService emailService;

    // 构造器

    @KafkaListener(topics = "baeldung.articles.published")
    public void onArticlePublished(ArticlePublishedEvent event) {
        emailService.sendNewsletter(event.article());
    }
}

record ArticlePublishedEvent(String article) {
}

对于消费者配置,我们仅定义示例关键属性。生产环境中可按需调整或外部化配置:

@Bean
ConsumerFactory<String, ArticlePublishedEvent> consumerFactory(
  @Value("${spring.kafka.bootstrap-servers}") String bootstrapServers
) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung-app-1");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(
      props,
      new StringDeserializer(),
      new JsonDeserializer<>(ArticlePublishedEvent.class)
    );
}

@Bean
KafkaListenerContainerFactory<String, ArticlePublishedEvent> kafkaListenerContainerFactory(
  ConsumerFactory<String, ArticlePublishedEvent> consumerFactory
) {
    var factory = new ConcurrentKafkaListenerContainerFactory<String, ArticlePublishedEvent>();
    factory.setConsumerFactory(consumerFactory);
    return factory;
}

3. 搭建测试环境

我们使用Kafka Testcontainer启动测试用的Kafka Docker容器:

@Testcontainers
@SpringBootTest(classes = Application.class)
class DeserializationExceptionLiveTest {

    @Container
    private static KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));

    @DynamicPropertySource
    static void setProps(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
    }

    // ...
}

同时需要KafkaProducerEmailService验证监听器功能。这些组件将发送消息并验证处理结果。 为简化测试,我们将所有文章存储在内存列表中:

@Service
class EmailService { 
    private final List<String> articles = new ArrayList<>();
   
    // logger, getter

    public void sendNewsletter(String article) {
        log.info("Sending newsletter for article: " + article);
        articles.add(article);
    }
}

只需在测试类中注入EmailService,接着创建testKafkaProducer

@Autowired
EmailService emailService;

static KafkaProducer<String, String> testKafkaProducer;

@BeforeAll
static void beforeAll() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    testKafkaProducer = new KafkaProducer<>(props);
}

现在测试正常流程:发布两条有效JSON消息,验证emailService被正确调用:

@Test
void whenPublishingValidArticleEvent_thenProcessWithoutErrors() {
    publishArticle("{ \"article\": \"Kotlin for Java Developers\" }");
    publishArticle("{ \"article\": \"The S.O.L.I.D. Principles\" }");

    await().untilAsserted(() -> 
      assertThat(emailService.getArticles())
        .containsExactlyInAnyOrder(
          "Kotlin for Java Developers", 
          "The S.O.L.I.D. Principles"
        ));
}

4. 触发RecordDeserializationException

当配置的反序列化器无法解析消息的key/value时,Kafka抛出RecordDeserializationException。只需发布包含无效JSON的消息即可复现:

@Test
void whenPublishingInvalidArticleEvent_thenCatchExceptionAndContinueProcessing() {
    publishArticle("{ \"article\": \"Introduction to Kafka\" }");
    publishArticle(" !! Invalid JSON !! ");
    publishArticle("{ \"article\": \"Kafka Streams Tutorial\" }");

    await().untilAsserted(() -> 
      assertThat(emailService.getArticles())
        .containsExactlyInAnyOrder(
          "Kotlin for Java Developers",
          "The S.O.L.I.D. Principles"
        ));
}

运行测试后控制台会显示重复错误日志:

ERROR 7716 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Consumer exception

**java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer**
   at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:151) ~[spring-kafka-2.8.11.jar:2.8.11]
   at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1815) ~[spring-kafka-2.8.11.jar:2.8.11]
   ...
**Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition baeldung.articles.published-0 at offset 1. If needed, please seek past the record to continue consumption.**
   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1448) ~[kafka-clients-3.1.2.jar:na]
   ...
**Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data** [[32, 33, 33, 32, 73, 110, 118, 97, 108, 105, 100, 32, 74, 83, 79, 78, 32, 33, 33, 32]] from topic [baeldung.articles.published]
   at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:588) ~[spring-kafka-2.8.11.jar:2.8.11]
   ...
**Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('!' (code 33))**: expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
   **at [Source: (byte[])" !! Invalid JSON !! "; line: 1, column: 3]**
   at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391) ~[jackson-core-2.13.5.jar:2.13.5]
   ...

测试最终超时失败,断言错误显示仅第一条消息被处理:

org.awaitility.core.ConditionTimeoutException: Assertion condition 
Expecting actual:
  ["Introduction to Kafka"]
to contain exactly in any order:
  ["Introduction to Kafka", "Kafka Streams Tutorial"]
but could not find the following elements:
  ["Kafka Streams Tutorial"]
 within 5 seconds.

⚠️ 踩坑点:第二条消息反序列化失败后,监听器持续尝试消费同一条消息,导致错误循环出现。

5. 创建错误处理器

分析错误日志发现两个关键建议:

  • 配置ErrorHandlingDeserializer
  • 跳过问题记录继续消费

简单粗暴的解决方案:创建自定义错误处理器处理反序列化异常,并增加消费者偏移量以跳过无效消息。

5.1. 实现CommonErrorHandler

实现CommonErrorHandler接口需重写两个方法:

  • handleOne() - 处理单条失败记录
  • handleOtherException() - 处理非特定记录的异常
class KafkaErrorHandler implements CommonErrorHandler {

    @Override
    public void handleOne(Exception exception, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, MessageListenerContainer container) {
        handle(exception, consumer);
    }

    @Override
    public void handleOtherException(Exception exception, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {
        handle(exception, consumer);
    }

    private void handle(Exception exception, Consumer<?, ?> consumer) {
        log.error("Exception thrown", exception);
        // ...
    }
}

5.2. Kafka Consumer的seek()和commitSync()

使用Consumer接口的seek()方法可手动修改主题分区的当前偏移量,用于重放或跳过消息。

当异常为RecordDeserializationException时,调用seek()跳到下一条消息:

void handle(Exception exception, Consumer<?, ?> consumer) {
    log.error("Exception thrown", exception);
    if (exception instanceof RecordDeserializationException ex) {
        consumer.seek(ex.topicPartition(), ex.offset() + 1L);
        consumer.commitSync();
    } else {
        log.error("Exception not handled", exception);
    }
}

关键操作:必须调用commitSync()提交偏移量,确保新位置被Kafka Broker持久化。这会更新消费者组的提交偏移量,标记调整位置前的消息已处理。

5.3. 更新配置

最后将自定义错误处理器添加到消费者配置。先声明为@Bean

@Bean
CommonErrorHandler commonErrorHandler() {
    return new KafkaErrorHandler();
}

再将新bean注入ConcurrentKafkaListenerContainerFactory

@Bean
ConcurrentKafkaListenerContainerFactory<String, ArticlePublishedEvent> kafkaListenerContainerFactory(
  ConsumerFactory<String, ArticlePublishedEvent> consumerFactory,
  CommonErrorHandler commonErrorHandler
) {
    var factory = new ConcurrentKafkaListenerContainerFactory<String, ArticlePublishedEvent>();
    factory.setConsumerFactory(consumerFactory);
    factory.setCommonErrorHandler(commonErrorHandler);
    return factory;
}

大功告成!重新运行测试,监听器将跳过无效消息继续消费。

6. 总结

本文探讨了Spring Kafka的RecordDeserializationException,发现若未正确处理,该异常会阻塞消费者组对特定分区的消费。

通过实现Kafka的CommonErrorHandler接口,我们使监听器能处理反序列化失败并继续消费消息。利用Consumer API的seek()commitSync()方法,通过调整消费者偏移量成功绕过无效消息。

本文源码可在GitHub获取。


原始标题:How to Catch Deserialization Errors in Spring-Kafka?