1. 概述
Apache Kafka® 是一个分布式流处理平台。在之前的教程中,我们探讨了如何使用 Spring 实现 Kafka 的生产者和消费者。
本教程将聚焦于 Kafka Connect 的使用。
我们将了解:
- Kafka Connect 的不同类型
- Kafka Connect 的功能与运行模式
- 使用配置文件和 REST API 来管理 Connectors
2. Kafka Connect 与 Kafka Connectors 基础
Kafka Connect 是一个用于连接 Kafka 与外部系统的框架,这些外部系统包括数据库、键值存储、搜索引擎和文件系统等,它通过所谓的 Connectors 来实现连接。
Kafka Connectors 是即用型组件,它们可以帮我们将数据从外部系统导入到 Kafka 的 topic 中,也可以将 Kafka topic 中的数据导出到外部系统。我们可以使用现成的 connector 实现常见的数据源和目标系统,也可以自己开发 connector。
- Source Connector 用于从外部系统收集数据。这些系统可以是数据库、流表、消息代理等。比如,它也可以从应用服务器中采集指标数据并发送到 Kafka topic,以便进行低延迟的流处理。
- Sink Connector 用于将 Kafka topic 中的数据传输到其他系统,比如 Elasticsearch、Hadoop 或任意数据库。
一些 connector 由社区维护,另一些则由 Confluent 或其合作伙伴支持。实际上,大多数主流系统如 S3、JDBC、Cassandra 等都有现成的 connector 可用。
3. 功能特性
Kafka Connect 的主要功能包括:
✅ 连接外部系统与 Kafka 的框架 —— 简化了 connector 的开发、部署和管理
✅ 支持分布式和单机模式 —— 既能用于大规模生产集群,也适合开发、测试和小规模部署
✅ REST 接口 —— 可通过 REST API 管理 connector
✅ 自动 offset 管理 —— 自动处理 offset 提交,避免手动实现这一易错环节
✅ 默认支持分布式与可扩展性 —— 利用 Kafka 的 group 管理协议,可动态增减 worker 节点以扩展集群
✅ 流批一体集成 —— 结合 Kafka 的能力,成为连接流式与批处理系统的桥梁
✅ 支持消息转换(Transformations) —— 允许对单条消息进行轻量级修改
4. 安装准备
我们推荐使用 Confluent Platform 而不是原生 Kafka 发行版。Confluent Platform 是 Kafka 背后公司 Confluent 提供的 Kafka 发行版,它包含了额外的工具、客户端和一些预构建的 Connectors。
对于我们的使用场景,开源版本就足够了,可以从 Confluent 官网 下载。
5. Kafka Connect 快速入门
我们将使用 Kafka Connect 最基础的两个 connector:文件 source connector 和文件 sink connector。
Confluent Platform 默认包含了这两个 connector 以及参考配置文件。
5.1. Source Connector 配置
source connector 的参考配置文件路径为:$CONFLUENT_HOME/etc/kafka/connect-file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
topic=connect-test
file=test.txt
这个配置中包含所有 source connector 的通用属性:
name
:用户自定义的 connector 实例名称connector.class
:指定 connector 的实现类tasks.max
:指定并行运行的 connector 实例数topic
:指定 connector 输出的目标 topic
此外,该配置还包含一个 connector 特有的属性:
file
:指定 connector 读取的源文件路径
为了运行这个配置,我们先创建一个测试文件:
echo -e "foo\nbar\n" > $CONFLUENT_HOME/test.txt
⚠️ 注意:工作目录应为 $CONFLUENT_HOME
。
5.2. Sink Connector 配置
sink connector 的参考配置文件路径为:$CONFLUENT_HOME/etc/kafka/connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
配置内容与 source connector 类似,但:
connector.class
指定的是 sink connector 实现file
指定了数据写入的目标文件路径
5.3. Worker 配置
接下来,我们需要配置 Connect worker,它将协调 source 和 sink connector 的工作。
使用配置文件:$CONFLUENT_HOME/etc/kafka/connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/share/java
关键参数说明:
plugin.path
:指定 connector 实现所在的路径(可为多个)bootstrap.servers
:Kafka broker 地址key.converter
/value.converter
:数据序列化/反序列化方式offset.storage.file.filename
:单机模式下存储 offset 的文件路径offset.flush.interval.ms
:worker 提交 offset 的间隔时间
完整参数列表可参考 官方文档
5.4. 单机模式运行 Kafka Connect
启动命令如下:
$CONFLUENT_HOME/bin/connect-standalone \
$CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
$CONFLUENT_HOME/etc/kafka/connect-file-source.properties \
$CONFLUENT_HOME/etc/kafka/connect-file-sink.properties
查看 topic 中的内容:
$CONFLUENT_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning
输出内容为:
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
检查 $CONFLUENT_HOME
目录下的 test.sink.txt
文件:
cat $CONFLUENT_HOME/test.sink.txt
foo
bar
✅ sink connector 提取了 payload 内容并写入文件。
如果我们在 test.txt
中添加更多内容:
echo "baz" >> $CONFLUENT_HOME/test.txt
⚠️ 注意:必须以换行符结尾,否则最后一行不会被读取。
完成后,停止进程,准备进入分布式模式。
6. Connect 的 REST API
之前我们通过命令行传入配置文件来启动 Connect。但 Connect 本质上是作为服务运行的,因此它也提供了 REST API。
默认接口地址为:http://localhost:8083
,常用接口如下:
GET /connectors
– 获取所有 connector 列表GET /connectors/{name}
– 获取指定 connector 详情POST /connectors
– 创建新 connector,请求体为 JSON 格式(包含 name 和 config 字段)GET /connectors/{name}/status
– 获取 connector 状态(运行中、失败、暂停等)DELETE /connectors/{name}
– 删除 connectorGET /connector-plugins
– 获取集群中安装的 connector 插件列表
完整接口列表参考 官方文档
✅ 下面我们将通过 REST API 来创建 connector。
7. Kafka Connect 分布式模式
单机模式适用于开发和测试,但若要充分利用 Kafka 的分布式特性,必须使用分布式模式。
在分布式模式下,connector 的配置和元数据存储在 Kafka topic 中,worker 节点是无状态的。
7.1. 启动 Connect
参考配置文件路径:$CONFLUENT_HOME/etc/kafka/connect-distributed.properties
主要参数与单机模式类似,区别如下:
group.id
:Connect 集群的组名(不能与消费者组名重复)offset.storage.topic
、config.storage.topic
、status.storage.topic
:分别用于存储 offset、配置和状态的 topic
完整参数参考 官方文档
启动命令:
$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties
7.2. 使用 REST API 添加 Connector
在分布式模式下,我们不再通过命令行传入配置文件,而是使用 REST API。
创建 source connector:
JSON 文件 connect-file-source.json
内容如下:
{
"name": "local-file-source",
"config": {
"connector.class": "FileStreamSource",
"tasks.max": 1,
"file": "test-distributed.txt",
"topic": "connect-distributed"
}
}
发送 POST 请求:
curl -d @"$CONFLUENT_HOME/connect-file-source.json" \
-H "Content-Type: application/json" \
-X POST http://localhost:8083/connectors
创建 sink connector:
JSON 文件 connect-file-sink.json
内容如下:
{
"name": "local-file-sink",
"config": {
"connector.class": "FileStreamSink",
"tasks.max": 1,
"file": "test-distributed.sink.txt",
"topics": "connect-distributed"
}
}
发送 POST 请求:
curl -d @$CONFLUENT_HOME/connect-file-sink.json \
-H "Content-Type: application/json" \
-X POST http://localhost:8083/connectors
验证是否生效:
$CONFLUENT_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-distributed --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
检查 sink 文件:
cat $CONFLUENT_HOME/test-distributed.sink.txt
foo
bar
✅ 清理资源:
curl -X DELETE http://localhost:8083/connectors/local-file-source
curl -X DELETE http://localhost:8083/connectors/local-file-sink
8. 数据转换(Transformations)
8.1. 支持的转换器
Transformations 可以对单条消息进行轻量级修改,支持的内置转换包括:
✅ InsertField
– 添加字段(静态数据或元数据)
✅ ReplaceField
– 过滤或重命名字段
✅ MaskField
– 用空值替换字段(如 0 或空字符串)
✅ HoistField
– 将整个事件包装为 struct 或 map 中的一个字段
✅ ExtractField
– 从 struct 或 map 中提取特定字段
✅ SetSchemaMetadata
– 修改 schema 名称或版本
✅ TimestampRouter
– 基于时间戳修改 topic 名称
✅ RegexRouter
– 使用正则表达式修改 topic 名称
配置格式:
transforms
– 转换器别名列表(逗号分隔)transforms.$alias.type
– 转换器类名transforms.$alias.$transformationSpecificConfig
– 转换器特定配置
8.2. 应用转换器
我们来配置两个转换:
- 将整条消息包装为 JSON struct
- 添加一个字段
首先修改 connect-distributed.properties
:
key.converter.schemas.enable=false
value.converter.schemas.enable=false
重启 Connect:
$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties
JSON 配置文件 connect-file-source-transform.json
内容如下:
{
"name": "local-file-source",
"config": {
"connector.class": "FileStreamSource",
"tasks.max": 1,
"file": "test-transformation.txt",
"topic": "connect-transformation",
"transforms": "MakeMap,InsertSource",
"transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.MakeMap.field": "line",
"transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSource.static.field": "data_source",
"transforms.InsertSource.static.value": "test-file-source"
}
}
发送 POST 请求:
curl -d @$CONFLUENT_HOME/connect-file-source-transform.json \
-H "Content-Type: application/json" \
-X POST http://localhost:8083/connectors
写入测试数据:
echo -e "Foo\nBar" > $CONFLUENT_HOME/test-transformation.txt
查看 topic 输出:
{"line":"Foo","data_source":"test-file-source"}
{"line":"Bar","data_source":"test-file-source"}
✅ 转换成功。
9. 使用现成的 Connectors
9.1. 获取 Connectors 的途径
✅ Apache Kafka 自带基础 connector(文件和控制台)
✅ Confluent Platform 包含更多 connector(ElasticSearch、HDFS、JDBC、S3)
✅ Confluent Hub – Kafka connector 的“应用商店”
✅ Confluent Connectors 页面 – 包含更多社区 connector
✅ 第三方厂商(如 Landoop)提供开源 connector 集合
9.2. 从 Confluent Hub 安装
企业版 Confluent 提供安装脚本(开源版无):
$CONFLUENT_HOME/bin/confluent-hub install confluentinc/kafka-connect-mqtt:1.0.0-preview
9.3. 手动安装 Connector
若使用开源版或找不到所需 connector,可手动安装:
- 下载并解压 connector 包
- 将
lib
目录中的 jar 文件放入plugin.path
指定的目录 - 可选:将
etc
中的配置文件用于启动或构建 JSON 配置
⚠️ 注意:lib
目录建议重命名为有意义的名称并移动到 $CONFLUENT_HOME/share/java
。
10. 总结
本教程中,我们介绍了 Kafka Connect 的安装与使用:
✅ 了解了 source 和 sink connector
✅ 掌握了单机与分布式模式
✅ 学习了数据转换器的使用
✅ 知道了如何获取和安装现成 connector
配置文件可从 GitHub 获取。