1. 简介

如今再强调 Kafka 的能力似乎已是老生常谈。只要你的系统需要消息队列,并且涉及多个服务协同工作,Kafka 已是行业标准首选

本文将探讨如何在 Kotlin 项目中使用 Apache Kafka 进行消息的发送与接收。虽然通过 Spring 生态(如 spring-kafka)集成的方式更为常见,相关教程也早已覆盖 Java 场景下的基础生产者/消费者Kafka Streams,但本篇我们聚焦于 直接使用原生 Kafka 客户端库,不依赖 Spring 框架。

这种方式更轻量,适合对底层控制有要求的场景,也能帮助你避开一些 Spring 自动配置带来的“黑盒”踩坑。

2. Kafka 测试环境搭建

要动手实践,首先得有个可用的 Kafka 环境。本地部署和维护 Kafka 实例非常繁琐——Zookeeper、Broker 配置、网络调优……光是启动就可能卡住半天。

✅ 推荐方案:使用 Testcontainers
它能通过 Docker 快速拉起一个完整的 Kafka 服务,只需几行代码,开发测试效率大幅提升。

Maven 依赖配置

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>testcontainers</artifactId>
    <version>1.19.3</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <version>1.19.3</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>1.19.3</version>
    <scope>test</scope>
</dependency>

启动 Kafka 容器

@Testcontainers
class AppTest {
    companion object {
        @JvmStatic
        @Container
        val kafka = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
    }
}
  • @Testcontainers:启用 Testcontainers 扩展
  • @Container:标记该字段为容器实例
  • @JvmStatic:确保 JUnit 能正确访问静态字段,实现多测试共享实例

⚠️ 注意:Confluent 官方镜像启动较慢,有时会超时:

Container startup failed
org.testcontainers.containers.ContainerLaunchException: Container startup failed
    at app//org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:349)
    at app//org.testcontainers.containers.GenericContainer.start(GenericContainer.java:322)

应对策略

  • 增加超时时间(Testcontainers 支持配置)
  • 或改用 docker-compose.yml 启动独立环境,测试连接 localhost:9092

替代方案示例(docker-compose.yml):

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

3. Kafka 客户端使用

核心依赖仅需 kafka-clients

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.3.1</version>
</dependency>

3.1 Kafka 生产者(Producer)

配置属性

val producerProps = mapOf<String, String>(
    "bootstrap.servers" to kafka.bootstrapServers,
    "key.serializer" to "org.apache.kafka.common.serialization.StringSerializer",
    "value.serializer" to "org.apache.kafka.common.serialization.ByteArraySerializer",
    "security.protocol" to "PLAINTEXT"
)

关键参数说明:

  • bootstrap.servers:Kafka Broker 地址
  • key.serializer / value.serializer:序列化器,Kafka 只传输字节流 ❗
  • security.protocol:无认证场景设为 PLAINTEXT

📌 Kafka 协议无关,所有数据必须序列化为 byte[]。JSON、Protobuf 等格式需自行处理。

创建并发送消息

KafkaProducer<String, ByteArray>(producerProps).use { producer ->
    // 异步发送,立即返回 Future
    producer.send(ProducerRecord("test", "1", "Hello, world!".encodeToByteArray()))
    
    // 带回调的发送,可捕获异常
    producer.send(
        ProducerRecord("test", "2", "Async message".encodeToByteArray())
    ) { metadata, exception ->
        if (exception != null) {
            println("Send failed: $exception")
        } else {
            println("Sent to ${metadata.topic()} offset ${metadata.offset()}")
        }
    }
}

✅ 必须使用 .use {} 或手动调用 close(),否则资源泄露。

挂起函数封装(Kotlin 协程友好)

suspend fun <K, V> Producer<K, V>.asyncSend(record: ProducerRecord<K, V>) =
    suspendCoroutine<RecordMetadata> { continuation ->
        send(record) { metadata, exception ->
            exception?.let(continuation::resumeWithException)
                ?: continuation.resume(metadata)
        }
    }

这样可以在协程中 await 发送结果。

3.2 Kafka 消费者(Consumer)

配置属性

val consumerProps = mapOf(
    "bootstrap.servers" to "localhost:9092",
    "auto.offset.reset" to "earliest",
    "key.deserializer" to "org.apache.kafka.common.serialization.StringDeserializer",
    "value.deserializer" to "org.apache.kafka.common.serialization.ByteArrayDeserializer",
    "group.id" to "test-group",
    "security.protocol" to "PLAINTEXT"
)

关键参数:

  • group.id:消费者组 ID,必须设置
  • auto.offset.reset:偏移量重置策略,测试用 earliest 从头消费

消费消息

KafkaConsumer<String, ByteArray>(consumerProps).use { consumer ->
    consumer.subscribe(listOf("test"))
    
    while (true) {
        val records = consumer.poll(400.milliseconds.toJavaDuration())
        for (record in records) {
            val key = record.key()
            val value = String(record.value())
            println("Received: key=$key, value=$value")
        }
    }
}

⚠️ 实际应用中通常在循环中持续 poll(),直到程序退出。

测试中可简化等待逻辑:

tailrec fun <T> repeatUntilSome(block: () -> T?): T = block() ?: repeatUntilSome(block)

val message = repeatUntilSome {
    consumer.poll(400.milliseconds.toJavaDuration())
        .map { String(it.value()) }
        .firstOrNull()
}

✅ 同样记得 use {} 自动关闭。

4. Kafka Streams 使用

当业务涉及多个 Topic 的复杂流处理(如 Join、聚合、窗口计算),直接使用 Consumer 写逻辑容易出错且难以管理 Offset 提交。

✅ Kafka Streams 提供了高阶 DSL,抽象了底层细节,让流处理变得声明式。

依赖引入

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams-test-utils</artifactId>
    <version>3.3.1</version>
    <scope>test</scope>
</dependency>

流处理拓扑定义

假设需求:两个输入流(电影系列名 + 续集编号),按 Key Join 后输出完整名称。

fun getTopology(topicConfig: TopicConfig): Topology = StreamsBuilder().apply {
    val (inStream1, inStream2, outStream) = topicConfig
    
    stream(inStream1, Consumed.with(Serdes.String(), Serdes.ByteArray()))
        .join(
            stream(inStream2, Consumed.with(Serdes.String(), Serdes.ByteArray())),
            { name, num -> "${String(name)} ${String(num)}".encodeToByteArray() },
            JoinWindows.ofTimeDifferenceAndGrace(Duration.ofSeconds(3), Duration.ofSeconds(1))
        )
        .to(outStream)
}.build()
  • JoinWindows:设定 Join 的时间窗口(3秒内匹配)
  • Serdes:Serializer/Deserializer 封装

启动流处理

KafkaStreams(getTopology(topicConfig), StreamsConfig(streamConfig)).use { streams ->
    streams.cleanUp() // 清理旧状态(测试专用)
    streams.start()

    // 模拟写入数据
    populateData(topicConfig)

    // 验证输出
    val results = retrieveResultsFromOutputStream(topicConfig)
    assertEquals(EXPECTED_MOVIES, results)
}

KafkaStreams 也是 Closeable,务必用 use {} 包裹。

4.1 Kafka Streams 测试优化

上述方式依赖真实 Kafka 实例,测试慢且不稳定。

✅ 推荐使用 TopologyTestDriver —— 无需网络,纯内存运行,速度极快。

使用 TopologyTestDriver

val testDriverConfig = mapOf(
    StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG to Serdes.String().javaClass.name,
    StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG to Serdes.ByteArray().javaClass.name
).toProperties()

val testDriver = TopologyTestDriver(getTopology(topicConfig), testDriverConfig)

模拟输入 & 验证输出

// 创建输入 Topic 模拟器
private fun TopologyTestDriver.createStandardTestTopic(topicName: String): TestInputTopic<String, ByteArray> =
    createInputTopic(topicName, Serdes.String().serializer(), Serdes.ByteArray().serializer())

val inStream1 = testDriver.createStandardTestTopic(topicConfig.inStream1)
val inStream2 = testDriver.createStandardTestTopic(topicConfig.inStream2)

// 写入测试数据
testData().forEachIndexed { i, (sequelNumber, franchiseName) ->
    inStream1.pipeInput(TestRecord((i + 1).toString(), franchiseName))
    inStream2.pipeInput(TestRecord((i + 1).toString(), sequelNumber))
}

// 读取输出验证
val output = testDriver.createOutputTopic(
    topicConfig.outStream,
    Serdes.String().deserializer(),
    Serdes.ByteArray().deserializer()
).readValuesToList().map(::String)

assertEquals(EXPECTED_MOVIES, output)

✅ 优势:

  • 无外部依赖
  • 执行速度快
  • 可精准控制事件时间(支持模拟时间推进)

5. 总结

本文展示了如何在 Kotlin 项目中脱离 Spring 框架,直接使用原生 Kafka 客户端和 Streams API。

核心要点:

  • ✅ 使用 Testcontainers 快速搭建测试环境,提升开发效率
  • ✅ Kafka 原生客户端在 Kotlin 中语法更简洁,但核心逻辑与 Java 一致
  • ✅ 消息必须序列化为 byte[],注意 Serializer/Deserializer 配置
  • KafkaProducerKafkaConsumerKafkaStreams 均为资源,必须显式关闭
  • ✅ 复杂流处理优先考虑 Kafka Streams,配合 TopologyTestDriver 实现高效单元测试

所有示例代码已上传至 GitHub:https://github.com/Baeldung/kotlin-tutorials/tree/master/kotlin-apache-kafka


原始标题:Apache Kafka with Kotlin