1. 概述
本文将介绍两种在Kafka中通过命令行发送键值消息的方法。在金融交易、预订系统、电商等事件驱动的实际场景中,保证特定主题的消息顺序是常见需求。此时就需要用到Kafka消息键机制。
2. 前置条件
在开始操作前,需要确认以下事项:
✅ 运行中的Kafka实例
如果没有可用环境,可通过Kafka Docker或官方快速入门指南搭建。本文假设Kafka服务已运行在kafka-server:9092
。
✅ 业务模型准备
假设我们正在处理支付系统消息,核心模型类如下:
public class PaymentEvent {
private String reference;
private BigDecimal amount;
private Currency currency;
// 标准getter/setter方法
}
✅ Kafka CLI工具
从官网下载压缩包后解压,所有命令将在解压目录的bin
文件夹下执行。
✅ 创建主题
执行以下命令创建payments
主题:
bin/kafka-topics.sh --create --topic payments --bootstrap-server kafka-server:9092
成功创建会看到:
Created topic payments.
✅ 启动测试消费者
运行以下命令监听消息(注意启用键打印):
bin/kafka-console-consumer.sh --topic payments --bootstrap-server kafka-server:9092 --property "print.key=true" --property "key.separator=="
⚠️ 关键参数说明:
print.key=true
:必须显式启用才能显示消息键key.separator==
:将默认分隔符从制表符改为等号(与后续生产者保持一致)
3. 命令行发送键值消息
使用Kafka控制台生产者发送消息:
bin/kafka-console-producer.sh --topic payments --bootstrap-server kafka-server:9092 --property "parse.key=true" --property "key.separator=="
核心参数说明:
parse.key=true
:启用键解析key.separator==
:指定键值分隔符
启动后进入交互模式,输入消息:
>KEY1={"reference":"P000000001", "amount": "37.75", "currency":"EUR"}
>KEY2={"reference":"P000000002", "amount": "2", "currency":"EUR"}
消费者端会收到完整消息:
KEY1={"reference":"P000000001", "amount": "37.75", "currency":"EUR"}
KEY2={"reference":"P000000002", "amount": "2", "currency":"EUR"}
4. 从文件发送键值消息
批量发送时,文件输入是更高效的方式。操作步骤:
创建数据文件
payment-events.txt
:KEY3={"reference":"P000000003", "amount": "80", "currency":"SEK"} KEY4={"reference":"P000000004", "amount": "77.8", "currency":"GBP"}
通过文件重定向发送:
bin/kafka-console-producer.sh --topic payments --bootstrap-server kafka-server:9092 --property "parse.key=true" --property "key.separator==" < payment-events.txt
验证消费者输出:
KEY3={"reference":"P000000003", "amount": "80", "currency":"SEK"} KEY4={"reference":"P000000004", "amount": "77.8", "currency":"GBP"}
5. 总结
本文演示了Kafka中两种命令行发送键值消息的方法:
- 交互式输入:适合测试少量消息
- 文件批量导入:适合批量数据导入
这些方法在需要保证消息顺序的场景(如支付事件处理)中特别实用。踩坑提醒:务必检查parse.key
和key.separator
参数配置,否则可能导致消息键解析失败。