1. 概述

本文将深入探讨KafkaProducer的重试机制,以及如何根据具体场景调整其配置参数。我们将分析关键配置项的默认值,并通过实际示例展示自定义配置的效果。

2. 默认配置机制

当消息未被Broker确认时,KafkaProducer默认会自动重试发送。我们可以通过故意配置错误的Topic设置来模拟生产者失败场景。

首先在pom.xml中添加kafka-clients依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.8.0</version>
</dependency>

接下来模拟Broker拒绝消息的场景。使用"min.insync.replicas"配置项,该配置要求写入操作前必须有足够数量的副本同步。我们创建一个Topic并设置该值为2,尽管测试环境只有一个Broker:

@Test
void givenDefaultConfig_whenMessageCannotBeSent_thenKafkaProducerRetries() throws Exception {
    NewTopic newTopic = new NewTopic("test-topic-1", 1, (short) 1)
      .configs(Map.of("min.insync.replicas", "2"));
    adminClient.createTopics(singleton(newTopic)).all().get();

  // 发布消息并验证异常
}

创建KafkaProducer发送消息,观察其重试行为:

@Test
void givenDefaultConfig_whenMessageCannotBeSent_thenKafkaProducerRetries() throws Exception {
    // 设置Topic配置

    Properties props = new Properties();
    props.put(BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
    props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    
    ProducerRecord<String, String> record = new ProducerRecord<>("test-topic-1", "test-value");
    assertThatThrownBy(() -> producer.send(record).get())
      .isInstanceOf(ExecutionException.class)
      .hasCauseInstanceOf(org.apache.kafka.common.errors.TimeoutException.class)     
      .hasMessageContaining("Expiring 1 record(s) for test-topic-1-0");
}

从异常和日志可见,生产者多次尝试发送消息,最终在2分钟后超时。这符合KafkaProducer默认配置

  • retries (默认Integer.MAX_VALUE):最大重试次数
  • delivery.timeout.ms (默认120,000):消息确认超时时间
  • retry.backoff.ms (默认100):重试间隔时间
  • retry.backoff.max.ms (默认1,000):最大重试间隔

⚠️ 注意:默认重试次数接近无限,可能导致长时间阻塞!

3. 自定义重试配置

我们可以根据实际需求调整KafkaProducer的重试参数。例如设置:

  • 最大投递时间5秒
  • 重试间隔500毫秒
  • 最大重试次数20次
@Test
void givenCustomConfig_whenMessageCannotBeSent_thenKafkaProducerRetries() throws Exception {
    // 设置Topic配置

    Properties props = new Properties();
    // 其他配置项
    props.put(RETRIES_CONFIG, 20);
    props.put(RETRY_BACKOFF_MS_CONFIG, "500");
    props.put(DELIVERY_TIMEOUT_MS_CONFIG, "5000");
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);

    ProducerRecord<String, String> record = new ProducerRecord<>("test-topic-2", "test-value");
    assertThatThrownBy(() -> producer.send(record).get())
      .isInstanceOf(ExecutionException.class)
      .hasCauseInstanceOf(org.apache.kafka.common.errors.TimeoutException.class)
      .hasMessageContaining("Expiring 1 record(s) for test-topic-2-0");
}

生产者按预期在5秒后停止重试。日志显示:

  • 重试间隔为500毫秒
  • 重试次数从20开始递减
12:57:19.599 [kafka-producer-network-thread | producer-1] WARN  o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1] Got error produce response with correlation id 5 on topic-partition test-topic-2-0, retrying (19 attempts left). Error: NOT_ENOUGH_REPLICAS

12:57:20.107 [kafka-producer-network-thread | producer-1] WARN  o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1] Got error produce response with correlation id 6 on topic-partition test-topic-2-0, retrying (18 attempts left). Error: NOT_ENOUGH_REPLICAS

12:57:20.612 [kafka-producer-network-thread | producer-1] WARN  o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1] Got error produce response with correlation id 7 on topic-partition test-topic-2-0, retrying (17 attempts left). Error: NOT_ENOUGH_REPLICAS

[...]

关键配置项总结: | 参数 | 默认值 | 建议场景 | |------|--------|----------| | retries | Integer.MAX_VALUE | 生产环境建议设置合理上限 | | delivery.timeout.ms | 120,000 | 根据业务容忍度调整 | | retry.backoff.ms | 100 | 网络不稳定时适当增大 | | retry.backoff.max.ms | 1,000 | 避免雪崩效应 |

4. 总结

本文详细分析了KafkaProducer的重试机制,重点包括:

  • 默认重试行为的特点
  • 关键配置项的作用原理
  • 自定义重试策略的实践方法

合理配置重试参数能显著提升系统健壮性,但需避免过度重试导致的资源浪费。完整代码示例可在GitHub仓库获取。


原始标题:Retries With Kafka Producer | Baeldung