1. 概述

Apache Kafka® 是一个分布式流处理平台。在之前的教程中,我们探讨了如何使用 Spring 实现 Kafka 的生产者和消费者

本教程将聚焦于 Kafka Connect 的使用。

我们将了解:

  • Kafka Connect 的不同类型
  • Kafka Connect 的功能与运行模式
  • 使用配置文件和 REST API 来管理 Connectors

2. Kafka Connect 与 Kafka Connectors 基础

Kafka Connect 是一个用于连接 Kafka 与外部系统的框架,这些外部系统包括数据库、键值存储、搜索引擎和文件系统等,它通过所谓的 Connectors 来实现连接。

Kafka Connectors 是即用型组件,它们可以帮我们将数据从外部系统导入到 Kafka 的 topic 中,也可以将 Kafka topic 中的数据导出到外部系统。我们可以使用现成的 connector 实现常见的数据源和目标系统,也可以自己开发 connector。

  • Source Connector 用于从外部系统收集数据。这些系统可以是数据库、流表、消息代理等。比如,它也可以从应用服务器中采集指标数据并发送到 Kafka topic,以便进行低延迟的流处理。
  • Sink Connector 用于将 Kafka topic 中的数据传输到其他系统,比如 Elasticsearch、Hadoop 或任意数据库。

一些 connector 由社区维护,另一些则由 Confluent 或其合作伙伴支持。实际上,大多数主流系统如 S3、JDBC、Cassandra 等都有现成的 connector 可用。

3. 功能特性

Kafka Connect 的主要功能包括:

连接外部系统与 Kafka 的框架 —— 简化了 connector 的开发、部署和管理
支持分布式和单机模式 —— 既能用于大规模生产集群,也适合开发、测试和小规模部署
REST 接口 —— 可通过 REST API 管理 connector
自动 offset 管理 —— 自动处理 offset 提交,避免手动实现这一易错环节
默认支持分布式与可扩展性 —— 利用 Kafka 的 group 管理协议,可动态增减 worker 节点以扩展集群
流批一体集成 —— 结合 Kafka 的能力,成为连接流式与批处理系统的桥梁
支持消息转换(Transformations) —— 允许对单条消息进行轻量级修改

4. 安装准备

我们推荐使用 Confluent Platform 而不是原生 Kafka 发行版。Confluent Platform 是 Kafka 背后公司 Confluent 提供的 Kafka 发行版,它包含了额外的工具、客户端和一些预构建的 Connectors。

对于我们的使用场景,开源版本就足够了,可以从 Confluent 官网 下载。

5. Kafka Connect 快速入门

我们将使用 Kafka Connect 最基础的两个 connector:文件 source connector 和文件 sink connector

Confluent Platform 默认包含了这两个 connector 以及参考配置文件。

5.1. Source Connector 配置

source connector 的参考配置文件路径为:
$CONFLUENT_HOME/etc/kafka/connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
topic=connect-test
file=test.txt

这个配置中包含所有 source connector 的通用属性:

  • name:用户自定义的 connector 实例名称
  • connector.class:指定 connector 的实现类
  • tasks.max:指定并行运行的 connector 实例数
  • topic:指定 connector 输出的目标 topic

此外,该配置还包含一个 connector 特有的属性:

  • file:指定 connector 读取的源文件路径

为了运行这个配置,我们先创建一个测试文件:

echo -e "foo\nbar\n" > $CONFLUENT_HOME/test.txt

⚠️ 注意:工作目录应为 $CONFLUENT_HOME

5.2. Sink Connector 配置

sink connector 的参考配置文件路径为:
$CONFLUENT_HOME/etc/kafka/connect-file-sink.properties

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

配置内容与 source connector 类似,但:

  • connector.class 指定的是 sink connector 实现
  • file 指定了数据写入的目标文件路径

5.3. Worker 配置

接下来,我们需要配置 Connect worker,它将协调 source 和 sink connector 的工作。

使用配置文件:$CONFLUENT_HOME/etc/kafka/connect-standalone.properties

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/share/java

关键参数说明:

  • plugin.path:指定 connector 实现所在的路径(可为多个)
  • bootstrap.servers:Kafka broker 地址
  • key.converter / value.converter:数据序列化/反序列化方式
  • offset.storage.file.filename单机模式下存储 offset 的文件路径
  • offset.flush.interval.ms:worker 提交 offset 的间隔时间

完整参数列表可参考 官方文档

5.4. 单机模式运行 Kafka Connect

启动命令如下:

$CONFLUENT_HOME/bin/connect-standalone \
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-source.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties

查看 topic 中的内容:

$CONFLUENT_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning

输出内容为:

{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

检查 $CONFLUENT_HOME 目录下的 test.sink.txt 文件:

cat $CONFLUENT_HOME/test.sink.txt
foo
bar

✅ sink connector 提取了 payload 内容并写入文件。

如果我们在 test.txt 中添加更多内容:

echo "baz" >> $CONFLUENT_HOME/test.txt

⚠️ 注意:必须以换行符结尾,否则最后一行不会被读取。

完成后,停止进程,准备进入分布式模式。

6. Connect 的 REST API

之前我们通过命令行传入配置文件来启动 Connect。但 Connect 本质上是作为服务运行的,因此它也提供了 REST API。

默认接口地址为:http://localhost:8083,常用接口如下:

  • GET /connectors – 获取所有 connector 列表
  • GET /connectors/{name} – 获取指定 connector 详情
  • POST /connectors – 创建新 connector,请求体为 JSON 格式(包含 name 和 config 字段)
  • GET /connectors/{name}/status – 获取 connector 状态(运行中、失败、暂停等)
  • DELETE /connectors/{name} – 删除 connector
  • GET /connector-plugins – 获取集群中安装的 connector 插件列表

完整接口列表参考 官方文档

✅ 下面我们将通过 REST API 来创建 connector。

7. Kafka Connect 分布式模式

单机模式适用于开发和测试,但若要充分利用 Kafka 的分布式特性,必须使用分布式模式

在分布式模式下,connector 的配置和元数据存储在 Kafka topic 中,worker 节点是无状态的。

7.1. 启动 Connect

参考配置文件路径:
$CONFLUENT_HOME/etc/kafka/connect-distributed.properties

主要参数与单机模式类似,区别如下:

  • group.id:Connect 集群的组名(不能与消费者组名重复)
  • offset.storage.topicconfig.storage.topicstatus.storage.topic:分别用于存储 offset、配置和状态的 topic

完整参数参考 官方文档

启动命令:

$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties

7.2. 使用 REST API 添加 Connector

在分布式模式下,我们不再通过命令行传入配置文件,而是使用 REST API。

创建 source connector:

JSON 文件 connect-file-source.json 内容如下:

{
    "name": "local-file-source",
    "config": {
        "connector.class": "FileStreamSource",
        "tasks.max": 1,
        "file": "test-distributed.txt",
        "topic": "connect-distributed"
    }
}

发送 POST 请求:

curl -d @"$CONFLUENT_HOME/connect-file-source.json" \
  -H "Content-Type: application/json" \
  -X POST http://localhost:8083/connectors

创建 sink connector:

JSON 文件 connect-file-sink.json 内容如下:

{
    "name": "local-file-sink",
    "config": {
        "connector.class": "FileStreamSink",
        "tasks.max": 1,
        "file": "test-distributed.sink.txt",
        "topics": "connect-distributed"
    }
}

发送 POST 请求:

curl -d @$CONFLUENT_HOME/connect-file-sink.json \
  -H "Content-Type: application/json" \
  -X POST http://localhost:8083/connectors

验证是否生效:

$CONFLUENT_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-distributed --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

检查 sink 文件:

cat $CONFLUENT_HOME/test-distributed.sink.txt
foo
bar

✅ 清理资源:

curl -X DELETE http://localhost:8083/connectors/local-file-source
curl -X DELETE http://localhost:8083/connectors/local-file-sink

8. 数据转换(Transformations)

8.1. 支持的转换器

Transformations 可以对单条消息进行轻量级修改,支持的内置转换包括:

InsertField – 添加字段(静态数据或元数据)
ReplaceField – 过滤或重命名字段
MaskField – 用空值替换字段(如 0 或空字符串)
HoistField – 将整个事件包装为 struct 或 map 中的一个字段
ExtractField – 从 struct 或 map 中提取特定字段
SetSchemaMetadata – 修改 schema 名称或版本
TimestampRouter – 基于时间戳修改 topic 名称
RegexRouter – 使用正则表达式修改 topic 名称

配置格式:

  • transforms – 转换器别名列表(逗号分隔)
  • transforms.$alias.type – 转换器类名
  • transforms.$alias.$transformationSpecificConfig – 转换器特定配置

8.2. 应用转换器

我们来配置两个转换:

  1. 将整条消息包装为 JSON struct
  2. 添加一个字段

首先修改 connect-distributed.properties

key.converter.schemas.enable=false
value.converter.schemas.enable=false

重启 Connect:

$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties

JSON 配置文件 connect-file-source-transform.json 内容如下:

{
    "name": "local-file-source",
    "config": {
        "connector.class": "FileStreamSource",
        "tasks.max": 1,
        "file": "test-transformation.txt",
        "topic": "connect-transformation",
        "transforms": "MakeMap,InsertSource",
        "transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value",
        "transforms.MakeMap.field": "line",
        "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.InsertSource.static.field": "data_source",
        "transforms.InsertSource.static.value": "test-file-source"
    }
}

发送 POST 请求:

curl -d @$CONFLUENT_HOME/connect-file-source-transform.json \
  -H "Content-Type: application/json" \
  -X POST http://localhost:8083/connectors

写入测试数据:

echo -e "Foo\nBar" > $CONFLUENT_HOME/test-transformation.txt

查看 topic 输出:

{"line":"Foo","data_source":"test-file-source"}
{"line":"Bar","data_source":"test-file-source"}

✅ 转换成功。

9. 使用现成的 Connectors

9.1. 获取 Connectors 的途径

✅ Apache Kafka 自带基础 connector(文件和控制台)
✅ Confluent Platform 包含更多 connector(ElasticSearch、HDFS、JDBC、S3)
Confluent Hub – Kafka connector 的“应用商店”
Confluent Connectors 页面 – 包含更多社区 connector
✅ 第三方厂商(如 Landoop)提供开源 connector 集合

9.2. 从 Confluent Hub 安装

企业版 Confluent 提供安装脚本(开源版无):

$CONFLUENT_HOME/bin/confluent-hub install confluentinc/kafka-connect-mqtt:1.0.0-preview

9.3. 手动安装 Connector

若使用开源版或找不到所需 connector,可手动安装:

  1. 下载并解压 connector 包
  2. lib 目录中的 jar 文件放入 plugin.path 指定的目录
  3. 可选:将 etc 中的配置文件用于启动或构建 JSON 配置

⚠️ 注意:lib 目录建议重命名为有意义的名称并移动到 $CONFLUENT_HOME/share/java

10. 总结

本教程中,我们介绍了 Kafka Connect 的安装与使用:

✅ 了解了 source 和 sink connector
✅ 掌握了单机与分布式模式
✅ 学习了数据转换器的使用
✅ 知道了如何获取和安装现成 connector

配置文件可从 GitHub 获取。


原始标题:Introduction to Kafka Connectors | Baeldung