1. 概述

Kafka 是一个基于分布式消息队列的消息处理系统,提供了 Java 客户端库,允许应用程序向 Kafka Topic 写入数据或从中读取数据。

在实际开发中,业务逻辑通常通过单元测试来验证,而为了隔离外部依赖,I/O 操作(如网络请求、数据库访问)一般都会被 mock 掉。Kafka 官方也贴心地提供了 MockProducer,专门用于在单元测试中模拟生产者行为。

本文将先实现一个简单的 Kafka 生产者,然后使用 MockProducer 编写单元测试,验证常见的生产者操作,包括消息发送、分区策略、异常处理以及事务提交等场景。

✅ 适合用于服务端单元测试
❌ 不适用于集成测试或真实环境模拟


2. Maven 依赖

在编写生产者之前,先引入 Kafka 客户端依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version>
</dependency>

这个依赖包含了 KafkaProducerProducer 接口以及我们今天要用的 MockProducer

⚠️ 版本建议与线上环境保持一致,避免 API 差异导致“本地跑通线上炸”的踩坑情况。


3. MockProducer 基础用法

Kafka 客户端库提供了标准的 Producer 接口,真实生产者使用 KafkaProducer 实现并负责与 Broker 通信。而 MockProducer 同样实现了 Producer 接口,但它不发任何真实请求,所有操作都在内存中完成。

我们先看一个简单的生产者封装类:

public class KafkaProducer {

    private final Producer<String, String> producer;

    public KafkaProducer(Producer<String, String> producer) {
        this.producer = producer;
    }

    public Future<RecordMetadata> send(String key, String value) {
        ProducerRecord record = new ProducerRecord("topic_sports_news", key, value);
        return producer.send(record);
    }
}

接下来,在测试中使用 MockProducer 替代真实生产者:

@Test
void givenKeyValue_whenSend_thenVerifyHistory() {

    MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(), new StringSerializer());

    kafkaProducer = new KafkaProducer(mockProducer);
    Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send("soccer", 
      "{\"site\" : \"baeldung\"}");

    assertTrue(mockProducer.history().size() == 1);
}

✅ MockProducer 的优势

虽然也可以用 Mockito 手动 mock Producer,但 MockProducer 提供了更高阶的能力,比如:

  • history():记录所有调用过 send() 的消息,方便断言
  • 自动完成 future(当 autoComplete=true 时)
  • 支持自定义分区器、序列化器
  • 可模拟异常和事务行为

我们还可以进一步验证消息元数据:

assertTrue(mockProducer.history().get(0).key().equalsIgnoreCase("soccer"));
assertTrue(recordMetadataFuture.get().partition() == 0);

这样就能确保生产者确实发了正确的 key、value、topic 和 partition。


4. 模拟多分区场景

实际项目中,Topic 通常有多个分区以提升并发能力。生产者会根据 key 的哈希值或自定义分区算法决定消息写入哪个 partition。

举个例子,我们实现一个简单的 EvenOddPartitioner,按 key 的长度奇偶性分配分区:

public class EvenOddPartitioner extends DefaultPartitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, 
      byte[] valueBytes, Cluster cluster) {
        if (((String)key).length() % 2 == 0) {
            return 0;
        }
        return 1;
    }
}

现在我们想测试这个分区逻辑是否正确?MockProducer 支持传入自定义 Cluster 配置,从而模拟多分区环境:

@Test
void givenKeyValue_whenSendWithPartitioning_thenVerifyPartitionNumber() 
  throws ExecutionException, InterruptedException {
    PartitionInfo partitionInfo0 = new PartitionInfo("topic_sports_news", 0, null, null, null);
    PartitionInfo partitionInfo1 = new PartitionInfo("topic_sports_news", 1, null, null, null);
    List<PartitionInfo> list = new ArrayList<>();
    list.add(partitionInfo0);
    list.add(partitionInfo1);

    Cluster cluster = new Cluster("kafkab", new ArrayList<>(), list, Set.of(), Set.of());
    this.mockProducer = new MockProducer<>(cluster, true, new EvenOddPartitioner(), 
      new StringSerializer(), new StringSerializer());

    kafkaProducer = new KafkaProducer(mockProducer);
    Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send("partition", 
      "{\"site\" : \"baeldung\"}");

    assertTrue(recordMetadataFuture.get().partition() == 1);
}

📌 解释一下关键点:

  • 构造包含两个分区的 Cluster 对象
  • 将其传给 MockProducer,并指定自定义分区器
  • key "partition" 长度为 8(偶数),按逻辑应进入 partition 0?
  • ❌ 错!我们代码里是 length % 2 == 0 → return 0,否则 return 1
  • "partition" 长度为 9?等等,不对 —— 实际是 9 个字符?p-a-r-t-i-t-i-o-n → 是 9 吗?

⚠️ 踩坑提醒:字符串 "partition" 实际长度是 8,所以应进入 partition 0。但上面断言的是 partition == 1,说明示例可能存在笔误。正确做法是根据 key 长度判断预期结果。

修正建议:

// 改成奇数长度的 key 来测试 partition 1
kafkaProducer.send("key1234", "{\"site\":\"baeldung\"}"); // length=7 → odd → partition 1

5. 使用 MockProducer 模拟异常

生产环境网络不稳定,写入失败很常见。我们通常会在代码中做重试或上报异常,那如何测试异常处理逻辑?

MockProducer 提供了 errorNext() 方法,可以指定下一次 send() 抛出异常:

@Test
void givenKeyValue_whenSend_thenReturnException() {
    MockProducer<String, String> mockProducer = new MockProducer<>(false, 
      new StringSerializer(), new StringSerializer());

    kafkaProducer = new KafkaProducer(mockProducer);
    Future<RecordMetadata> record = kafkaProducer.send("site", "{\"site\" : \"baeldung\"}");
    RuntimeException e = new RuntimeException("模拟网络异常");
    mockProducer.errorNext(e);

    try {
        record.get();
    } catch (ExecutionException ex) {
        assertEquals(e, ex.getCause());
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        fail("不应被中断");
    }
    assertTrue(record.isDone());
}

⚠️ 两个关键点:

  1. new MockProducer<>(false, ...)

    • autoComplete=false 表示不会自动完成 send() 返回的 Future
    • 必须手动调用 completeNext()errorNext() 才会让 future 完结
  2. mockProducer.errorNext(e)

    • 表示下一个 send() 操作将抛出指定异常
    • 异常会被包装在 ExecutionException 中由 get() 抛出

✅ 适合测试重试机制、熔断逻辑、日志记录等异常处理路径


6. 模拟事务性写入

从 Kafka 0.11 开始支持事务,实现端到端的 Exactly-Once 语义。事务型生产者需显式调用 beginTransaction()commitTransaction(),只有提交后消息才会真正对外可见。

MockProducer 也支持事务模拟,能验证“未提交前不生效”的行为:

@Test
void givenKeyValue_whenSendWithTxn_thenSendOnlyOnTxnCommit() {
    MockProducer<String, String> mockProducer = new MockProducer<>(true, 
      new StringSerializer(), new StringSerializer());

    kafkaProducer = new KafkaProducer(mockProducer);
    kafkaProducer.initTransaction();
    kafkaProducer.beginTransaction();
    Future<RecordMetadata> record = kafkaProducer.send("data", "{\"site\" : \"baeldung\"}");

    assertTrue(mockProducer.history().isEmpty()); // 事务未提交,消息不应出现在 history
    kafkaProducer.commitTransaction();
    assertTrue(mockProducer.history().size() == 1); // 提交后才记录
}

✅ 核心价值

  • 验证事务边界是否正确
  • 确保 commitTransaction() 被调用
  • 防止“发了消息但忘了提交”这类低级错误

⚠️ 注意:MockProducer 默认不开启事务支持,需确保初始化时传入正确的参数(如 transactionalId)并在真实代码中调用 initTransaction()


7. 总结

本文介绍了 Kafka 客户端提供的 MockProducer 工具类,它实现了与 KafkaProducer 相同的 Producer 接口,非常适合在单元测试中替代真实 I/O 操作。

我们通过几个典型场景展示了它的强大能力:

✅ 普通消息发送 + history 断言
✅ 多分区 + 自定义分区器验证
✅ 异常模拟(errorNext)
✅ 事务性写入控制(commit 才生效)

相比纯 Mockito mock,MockProducer 更贴近真实行为,减少了“mock 出来的方法没问题,上线就炸”的风险。

📌 最后提醒:

MockProducer 是单元测试利器,但不能替代集成测试。复杂场景仍建议搭配 Testcontainer 使用真实 Kafka 集群做端到端验证。

所有示例代码已上传至 GitHub:
👉 https://github.com/yourname/kafka-mock-demo


原始标题:Using Kafka MockProducer