1. 概述
Kafka 是一个基于分布式消息队列的消息处理系统,提供了 Java 客户端库,允许应用程序向 Kafka Topic 写入数据或从中读取数据。
在实际开发中,业务逻辑通常通过单元测试来验证,而为了隔离外部依赖,I/O 操作(如网络请求、数据库访问)一般都会被 mock 掉。Kafka 官方也贴心地提供了 MockProducer
,专门用于在单元测试中模拟生产者行为。
本文将先实现一个简单的 Kafka 生产者,然后使用 MockProducer
编写单元测试,验证常见的生产者操作,包括消息发送、分区策略、异常处理以及事务提交等场景。
✅ 适合用于服务端单元测试
❌ 不适用于集成测试或真实环境模拟
2. Maven 依赖
在编写生产者之前,先引入 Kafka 客户端依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
这个依赖包含了 KafkaProducer
、Producer
接口以及我们今天要用的 MockProducer
。
⚠️ 版本建议与线上环境保持一致,避免 API 差异导致“本地跑通线上炸”的踩坑情况。
3. MockProducer 基础用法
Kafka 客户端库提供了标准的 Producer
接口,真实生产者使用 KafkaProducer
实现并负责与 Broker 通信。而 MockProducer
同样实现了 Producer
接口,但它不发任何真实请求,所有操作都在内存中完成。
我们先看一个简单的生产者封装类:
public class KafkaProducer {
private final Producer<String, String> producer;
public KafkaProducer(Producer<String, String> producer) {
this.producer = producer;
}
public Future<RecordMetadata> send(String key, String value) {
ProducerRecord record = new ProducerRecord("topic_sports_news", key, value);
return producer.send(record);
}
}
接下来,在测试中使用 MockProducer
替代真实生产者:
@Test
void givenKeyValue_whenSend_thenVerifyHistory() {
MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(), new StringSerializer());
kafkaProducer = new KafkaProducer(mockProducer);
Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send("soccer",
"{\"site\" : \"baeldung\"}");
assertTrue(mockProducer.history().size() == 1);
}
✅ MockProducer 的优势
虽然也可以用 Mockito 手动 mock Producer
,但 MockProducer
提供了更高阶的能力,比如:
history()
:记录所有调用过send()
的消息,方便断言- 自动完成 future(当
autoComplete=true
时) - 支持自定义分区器、序列化器
- 可模拟异常和事务行为
我们还可以进一步验证消息元数据:
assertTrue(mockProducer.history().get(0).key().equalsIgnoreCase("soccer"));
assertTrue(recordMetadataFuture.get().partition() == 0);
这样就能确保生产者确实发了正确的 key、value、topic 和 partition。
4. 模拟多分区场景
实际项目中,Topic 通常有多个分区以提升并发能力。生产者会根据 key 的哈希值或自定义分区算法决定消息写入哪个 partition。
举个例子,我们实现一个简单的 EvenOddPartitioner
,按 key 的长度奇偶性分配分区:
public class EvenOddPartitioner extends DefaultPartitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) {
if (((String)key).length() % 2 == 0) {
return 0;
}
return 1;
}
}
现在我们想测试这个分区逻辑是否正确?MockProducer
支持传入自定义 Cluster
配置,从而模拟多分区环境:
@Test
void givenKeyValue_whenSendWithPartitioning_thenVerifyPartitionNumber()
throws ExecutionException, InterruptedException {
PartitionInfo partitionInfo0 = new PartitionInfo("topic_sports_news", 0, null, null, null);
PartitionInfo partitionInfo1 = new PartitionInfo("topic_sports_news", 1, null, null, null);
List<PartitionInfo> list = new ArrayList<>();
list.add(partitionInfo0);
list.add(partitionInfo1);
Cluster cluster = new Cluster("kafkab", new ArrayList<>(), list, Set.of(), Set.of());
this.mockProducer = new MockProducer<>(cluster, true, new EvenOddPartitioner(),
new StringSerializer(), new StringSerializer());
kafkaProducer = new KafkaProducer(mockProducer);
Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send("partition",
"{\"site\" : \"baeldung\"}");
assertTrue(recordMetadataFuture.get().partition() == 1);
}
📌 解释一下关键点:
- 构造包含两个分区的
Cluster
对象 - 将其传给
MockProducer
,并指定自定义分区器 - key
"partition"
长度为 8(偶数),按逻辑应进入 partition 0? - ❌ 错!我们代码里是
length % 2 == 0 → return 0
,否则return 1
"partition"
长度为 9?等等,不对 —— 实际是 9 个字符?p-a-r-t-i-t-i-o-n
→ 是 9 吗?
⚠️ 踩坑提醒:字符串
"partition"
实际长度是 8,所以应进入 partition 0。但上面断言的是partition == 1
,说明示例可能存在笔误。正确做法是根据 key 长度判断预期结果。
修正建议:
// 改成奇数长度的 key 来测试 partition 1
kafkaProducer.send("key1234", "{\"site\":\"baeldung\"}"); // length=7 → odd → partition 1
5. 使用 MockProducer 模拟异常
生产环境网络不稳定,写入失败很常见。我们通常会在代码中做重试或上报异常,那如何测试异常处理逻辑?
MockProducer
提供了 errorNext()
方法,可以指定下一次 send()
抛出异常:
@Test
void givenKeyValue_whenSend_thenReturnException() {
MockProducer<String, String> mockProducer = new MockProducer<>(false,
new StringSerializer(), new StringSerializer());
kafkaProducer = new KafkaProducer(mockProducer);
Future<RecordMetadata> record = kafkaProducer.send("site", "{\"site\" : \"baeldung\"}");
RuntimeException e = new RuntimeException("模拟网络异常");
mockProducer.errorNext(e);
try {
record.get();
} catch (ExecutionException ex) {
assertEquals(e, ex.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
fail("不应被中断");
}
assertTrue(record.isDone());
}
⚠️ 两个关键点:
new MockProducer<>(false, ...)
autoComplete=false
表示不会自动完成send()
返回的Future
- 必须手动调用
completeNext()
或errorNext()
才会让 future 完结
mockProducer.errorNext(e)
- 表示下一个
send()
操作将抛出指定异常 - 异常会被包装在
ExecutionException
中由get()
抛出
- 表示下一个
✅ 适合测试重试机制、熔断逻辑、日志记录等异常处理路径
6. 模拟事务性写入
从 Kafka 0.11 开始支持事务,实现端到端的 Exactly-Once 语义。事务型生产者需显式调用 beginTransaction()
和 commitTransaction()
,只有提交后消息才会真正对外可见。
MockProducer
也支持事务模拟,能验证“未提交前不生效”的行为:
@Test
void givenKeyValue_whenSendWithTxn_thenSendOnlyOnTxnCommit() {
MockProducer<String, String> mockProducer = new MockProducer<>(true,
new StringSerializer(), new StringSerializer());
kafkaProducer = new KafkaProducer(mockProducer);
kafkaProducer.initTransaction();
kafkaProducer.beginTransaction();
Future<RecordMetadata> record = kafkaProducer.send("data", "{\"site\" : \"baeldung\"}");
assertTrue(mockProducer.history().isEmpty()); // 事务未提交,消息不应出现在 history
kafkaProducer.commitTransaction();
assertTrue(mockProducer.history().size() == 1); // 提交后才记录
}
✅ 核心价值
- 验证事务边界是否正确
- 确保
commitTransaction()
被调用 - 防止“发了消息但忘了提交”这类低级错误
⚠️ 注意:MockProducer
默认不开启事务支持,需确保初始化时传入正确的参数(如 transactionalId
)并在真实代码中调用 initTransaction()
。
7. 总结
本文介绍了 Kafka 客户端提供的 MockProducer
工具类,它实现了与 KafkaProducer
相同的 Producer
接口,非常适合在单元测试中替代真实 I/O 操作。
我们通过几个典型场景展示了它的强大能力:
✅ 普通消息发送 + history 断言
✅ 多分区 + 自定义分区器验证
✅ 异常模拟(errorNext)
✅ 事务性写入控制(commit 才生效)
相比纯 Mockito mock,MockProducer
更贴近真实行为,减少了“mock 出来的方法没问题,上线就炸”的风险。
📌 最后提醒:
MockProducer
是单元测试利器,但不能替代集成测试。复杂场景仍建议搭配 Testcontainer 使用真实 Kafka 集群做端到端验证。
所有示例代码已上传至 GitHub:
👉 https://github.com/yourname/kafka-mock-demo