1. 概述

Apache Kafka 作为分布式消息流系统,提供了灵活的消息确认机制来保障数据可靠性。本文将深入探讨 Kafka 生产者和消费者的确认选项配置,帮助开发者根据业务需求在性能与可靠性之间找到平衡点。

2. 生产者确认机制

即使 Kafka Broker 配置可靠,生产者配置同样关键。生产者支持三种确认模式,下面逐一解析。

2.1. 无确认模式 (acks=0)

acks 属性设为 0

static KafkaProducer<String, String> producerack0;

static Properties getProducerProperties() {
    Properties producerProperties = new Properties();
    producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
      KAFKA_CONTAINER.getBootstrapServers());
    return producerProperties;
}

static void setUp() throws IOException, InterruptedException {
    Properties producerProperties = getProducerProperties();
    producerProperties.put(ProducerConfig.ACKS_CONFIG,
      "0");
    producerack0 = new KafkaProducer<>(producerProperties);
}

生产者发送消息后不等待 Broker 响应,直接认为发送成功。这种模式下:

  • ✅ 吞吐量最高(网络速度决定发送速率)
  • ❌ 消息可能丢失(Broker 未收到时无感知)
  • ❌ 即使分区离线、Leader 选举中或集群宕机,生产者也收不到错误

⚠️ 虽然生产者延迟低,但端到端延迟未降低(消费者需等待消息复制完成)

2.2. Leader 确认模式 (acks=1)

acks 属性设为 1

static KafkaProducer<String, String> producerack1;

static Properties getProducerProperties() {
    Properties producerProperties = new Properties();
    producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
      KAFKA_CONTAINER.getBootstrapServers());
    return producerProperties;
}

static void setUp() throws IOException, InterruptedException {
    Properties producerack1Prop = getProducerProperties();
    producerack1Prop.put(ProducerConfig.ACKS_CONFIG,
      "1");
    producerack1 = new KafkaProducer<>(producerack1Prop);
}

Leader 副本收到消息后立即返回成功响应

  • ✅ 比 acks=0 更可靠(失败时生产者可重试)
  • ❌ Leader 崩溃时未复制的消息仍可能丢失
  • ❌ 可能出现副本不足(Leader 确认早于复制完成)

⚠️ 延迟高于 acks=0(需等待一个副本确认)

2.3. 全副本确认模式 (acks=all)

acks 属性设为 all(或 -1):

static KafkaProducer<String, String> producerackAll;

static Properties getProducerProperties() {
    Properties producerProperties = new Properties();
    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
      KAFKA_CONTAINER.getBootstrapServers());
    return producerProperties;
}

static void setUp() throws IOException, InterruptedException {
    Properties producerackAllProp = getProducerProperties();
    producerackAllProp.put(ProducerConfig.ACKS_CONFIG,
      "all");
    producerackAll = new KafkaProducer<>(producerackAllProp);
}

所有同步副本 (ISR) 收到消息后才返回成功

  • ✅ 最高可靠性(多副本保障数据安全)
  • ✅ 生产者持续重试直到完全提交
  • ❌ 延迟最高(需等待所有副本同步)

⚠️ 关键配置:

  • Broker 端 min.insync.replicas 指定最小同步副本数
  • acks=-1 等同于 acks=all

踩坑警告acks 只能是 0/1/all 三者之一,非法值会抛出 ConfigException。对于 acks=1acks=all,可通过 retries/retry.backoff.ms/delivery.timeout.ms 控制重试行为。

3. 消费者确认机制

数据只有在 Kafka 标记为已提交(写入所有同步副本)后才对消费者可见。消费者核心职责是维护消费偏移量,避免消息丢失。以下是关键配置项:

3.1. 消费者组 ID (group.id)

每个消费者属于特定组,由 group.id 标识:

  • 同组消费者自动分配 Topic 分区(并行消费)
  • 独立 group.id 可使消费者接收所有消息(广播模式)

3.2. 偏移量重置策略 (auto.offset.reset)

控制消费者无有效偏移量时的行为:

static Properties getConsumerProperties() {
    Properties consumerProperties = new Properties();
    consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
      KAFKA_CONTAINER.getBootstrapServers());
    return consumerProperties;
}

// 从最新消息开始
Properties consumerProperties = getConsumerProperties(); 
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
    // ...
}

或:

static Properties getConsumerProperties() {
    Properties consumerProperties = new Properties();
    consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
      KAFKA_CONTAINER.getBootstrapServers());
    return consumerProperties;
}

// 从最早消息开始
Properties consumerProperties = getConsumerProperties(); 
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
    // ...
}

三种策略:

  • latest(默认):从最新记录开始(忽略历史消息)
  • earliest:从分区起始位置开始(消费全量数据)
  • none:无有效偏移量时抛出异常

3.3. 自动提交开关 (enable.auto.commit)

控制偏移量是否自动提交,默认 true

static Properties getConsumerProperties() {
    Properties consumerProperties = new Properties();
    consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
      KAFKA_CONTAINER.getBootstrapServers());
    return consumerProperties;
}

// 启用自动提交
Properties consumerProperties = getConsumerProperties(); 
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
    // ...
}
  • true:简化代码(通过 auto.commit.interval.ms 控制频率)
  • false:手动控制提交时机(减少重复/丢失)

3.4. 自动提交间隔 (auto.commit.interval.ms)

配置自动提交频率(毫秒):

static Properties getConsumerProperties() {
    Properties consumerProperties = new Properties();
    consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
      KAFKA_CONTAINER.getBootstrapServers());
    return consumerProperties;
}

// 每7秒提交一次
Properties consumerProperties = getConsumerProperties(); 
consumerProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 7000); 
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
    // ...
}
  • 默认值:5000 毫秒
  • 频繁提交 → 增加开销但减少重复消息
  • 低频提交 → 降低开销但消费者故障时可能重复消费

4. 总结

Kafka 的确认机制提供了精细化的可靠性控制:

  • 生产者:通过 acks 在吞吐量与数据安全间权衡
    • acks=0:极限性能(牺牲可靠性)
    • acks=1:平衡选择(常见生产配置)
    • acks=all:金融级可靠(关键业务首选)
  • 消费者:通过偏移量管理保障消费可靠性
    • 组 ID 控制消费模式
    • 偏移量策略决定起始位置
    • 手动提交实现精确控制

开发者应根据业务 SLA 要求选择合适配置,避免过度设计或可靠性不足。记住:没有最佳配置,只有适合业务的配置。


原始标题:Kafka Producer and Consumer Message Acknowledgement Options | Baeldung