1. 简介
如今再强调 Kafka 的能力似乎已是老生常谈。只要你的系统需要消息队列,并且涉及多个服务协同工作,Kafka 已是行业标准首选。
本文将探讨如何在 Kotlin 项目中使用 Apache Kafka 进行消息的发送与接收。虽然通过 Spring 生态(如 spring-kafka
)集成的方式更为常见,相关教程也早已覆盖 Java 场景下的基础生产者/消费者和Kafka Streams,但本篇我们聚焦于 直接使用原生 Kafka 客户端库,不依赖 Spring 框架。
这种方式更轻量,适合对底层控制有要求的场景,也能帮助你避开一些 Spring 自动配置带来的“黑盒”踩坑。
2. Kafka 测试环境搭建
要动手实践,首先得有个可用的 Kafka 环境。本地部署和维护 Kafka 实例非常繁琐——Zookeeper、Broker 配置、网络调优……光是启动就可能卡住半天。
✅ 推荐方案:使用 Testcontainers
它能通过 Docker 快速拉起一个完整的 Kafka 服务,只需几行代码,开发测试效率大幅提升。
Maven 依赖配置
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>
启动 Kafka 容器
@Testcontainers
class AppTest {
companion object {
@JvmStatic
@Container
val kafka = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
}
}
@Testcontainers
:启用 Testcontainers 扩展@Container
:标记该字段为容器实例@JvmStatic
:确保 JUnit 能正确访问静态字段,实现多测试共享实例
⚠️ 注意:Confluent 官方镜像启动较慢,有时会超时:
Container startup failed
org.testcontainers.containers.ContainerLaunchException: Container startup failed
at app//org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:349)
at app//org.testcontainers.containers.GenericContainer.start(GenericContainer.java:322)
应对策略:
- 增加超时时间(Testcontainers 支持配置)
- 或改用
docker-compose.yml
启动独立环境,测试连接localhost:9092
替代方案示例(docker-compose.yml
):
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
3. Kafka 客户端使用
核心依赖仅需 kafka-clients
:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.1</version>
</dependency>
3.1 Kafka 生产者(Producer)
配置属性
val producerProps = mapOf<String, String>(
"bootstrap.servers" to kafka.bootstrapServers,
"key.serializer" to "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer" to "org.apache.kafka.common.serialization.ByteArraySerializer",
"security.protocol" to "PLAINTEXT"
)
关键参数说明:
bootstrap.servers
:Kafka Broker 地址key.serializer
/value.serializer
:序列化器,Kafka 只传输字节流 ❗security.protocol
:无认证场景设为PLAINTEXT
📌 Kafka 协议无关,所有数据必须序列化为
byte[]
。JSON、Protobuf 等格式需自行处理。
创建并发送消息
KafkaProducer<String, ByteArray>(producerProps).use { producer ->
// 异步发送,立即返回 Future
producer.send(ProducerRecord("test", "1", "Hello, world!".encodeToByteArray()))
// 带回调的发送,可捕获异常
producer.send(
ProducerRecord("test", "2", "Async message".encodeToByteArray())
) { metadata, exception ->
if (exception != null) {
println("Send failed: $exception")
} else {
println("Sent to ${metadata.topic()} offset ${metadata.offset()}")
}
}
}
✅ 必须使用 .use {}
或手动调用 close()
,否则资源泄露。
挂起函数封装(Kotlin 协程友好)
suspend fun <K, V> Producer<K, V>.asyncSend(record: ProducerRecord<K, V>) =
suspendCoroutine<RecordMetadata> { continuation ->
send(record) { metadata, exception ->
exception?.let(continuation::resumeWithException)
?: continuation.resume(metadata)
}
}
这样可以在协程中 await
发送结果。
3.2 Kafka 消费者(Consumer)
配置属性
val consumerProps = mapOf(
"bootstrap.servers" to "localhost:9092",
"auto.offset.reset" to "earliest",
"key.deserializer" to "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" to "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"group.id" to "test-group",
"security.protocol" to "PLAINTEXT"
)
关键参数:
group.id
:消费者组 ID,必须设置auto.offset.reset
:偏移量重置策略,测试用earliest
从头消费
消费消息
KafkaConsumer<String, ByteArray>(consumerProps).use { consumer ->
consumer.subscribe(listOf("test"))
while (true) {
val records = consumer.poll(400.milliseconds.toJavaDuration())
for (record in records) {
val key = record.key()
val value = String(record.value())
println("Received: key=$key, value=$value")
}
}
}
⚠️ 实际应用中通常在循环中持续 poll()
,直到程序退出。
测试中可简化等待逻辑:
tailrec fun <T> repeatUntilSome(block: () -> T?): T = block() ?: repeatUntilSome(block)
val message = repeatUntilSome {
consumer.poll(400.milliseconds.toJavaDuration())
.map { String(it.value()) }
.firstOrNull()
}
✅ 同样记得 use {}
自动关闭。
4. Kafka Streams 使用
当业务涉及多个 Topic 的复杂流处理(如 Join、聚合、窗口计算),直接使用 Consumer 写逻辑容易出错且难以管理 Offset 提交。
✅ Kafka Streams 提供了高阶 DSL,抽象了底层细节,让流处理变得声明式。
依赖引入
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>3.3.1</version>
<scope>test</scope>
</dependency>
流处理拓扑定义
假设需求:两个输入流(电影系列名 + 续集编号),按 Key Join 后输出完整名称。
fun getTopology(topicConfig: TopicConfig): Topology = StreamsBuilder().apply {
val (inStream1, inStream2, outStream) = topicConfig
stream(inStream1, Consumed.with(Serdes.String(), Serdes.ByteArray()))
.join(
stream(inStream2, Consumed.with(Serdes.String(), Serdes.ByteArray())),
{ name, num -> "${String(name)} ${String(num)}".encodeToByteArray() },
JoinWindows.ofTimeDifferenceAndGrace(Duration.ofSeconds(3), Duration.ofSeconds(1))
)
.to(outStream)
}.build()
JoinWindows
:设定 Join 的时间窗口(3秒内匹配)Serdes
:Serializer/Deserializer 封装
启动流处理
KafkaStreams(getTopology(topicConfig), StreamsConfig(streamConfig)).use { streams ->
streams.cleanUp() // 清理旧状态(测试专用)
streams.start()
// 模拟写入数据
populateData(topicConfig)
// 验证输出
val results = retrieveResultsFromOutputStream(topicConfig)
assertEquals(EXPECTED_MOVIES, results)
}
✅ KafkaStreams
也是 Closeable
,务必用 use {}
包裹。
4.1 Kafka Streams 测试优化
上述方式依赖真实 Kafka 实例,测试慢且不稳定。
✅ 推荐使用 TopologyTestDriver
—— 无需网络,纯内存运行,速度极快。
使用 TopologyTestDriver
val testDriverConfig = mapOf(
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG to Serdes.String().javaClass.name,
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG to Serdes.ByteArray().javaClass.name
).toProperties()
val testDriver = TopologyTestDriver(getTopology(topicConfig), testDriverConfig)
模拟输入 & 验证输出
// 创建输入 Topic 模拟器
private fun TopologyTestDriver.createStandardTestTopic(topicName: String): TestInputTopic<String, ByteArray> =
createInputTopic(topicName, Serdes.String().serializer(), Serdes.ByteArray().serializer())
val inStream1 = testDriver.createStandardTestTopic(topicConfig.inStream1)
val inStream2 = testDriver.createStandardTestTopic(topicConfig.inStream2)
// 写入测试数据
testData().forEachIndexed { i, (sequelNumber, franchiseName) ->
inStream1.pipeInput(TestRecord((i + 1).toString(), franchiseName))
inStream2.pipeInput(TestRecord((i + 1).toString(), sequelNumber))
}
// 读取输出验证
val output = testDriver.createOutputTopic(
topicConfig.outStream,
Serdes.String().deserializer(),
Serdes.ByteArray().deserializer()
).readValuesToList().map(::String)
assertEquals(EXPECTED_MOVIES, output)
✅ 优势:
- 无外部依赖
- 执行速度快
- 可精准控制事件时间(支持模拟时间推进)
5. 总结
本文展示了如何在 Kotlin 项目中脱离 Spring 框架,直接使用原生 Kafka 客户端和 Streams API。
核心要点:
- ✅ 使用 Testcontainers 快速搭建测试环境,提升开发效率
- ✅ Kafka 原生客户端在 Kotlin 中语法更简洁,但核心逻辑与 Java 一致
- ✅ 消息必须序列化为
byte[]
,注意Serializer/Deserializer
配置 - ✅
KafkaProducer
、KafkaConsumer
、KafkaStreams
均为资源,必须显式关闭 - ✅ 复杂流处理优先考虑 Kafka Streams,配合
TopologyTestDriver
实现高效单元测试
所有示例代码已上传至 GitHub:https://github.com/Baeldung/kotlin-tutorials/tree/master/kotlin-apache-kafka