1. 概述

本文将介绍如何使用Kafka Connector Sink实现Apache KafkaElasticSearch的集成。

Kafka项目提供了Kafka Connect这一强大工具,无需编写额外代码或应用,即可实现Kafka与外部数据存储系统的无缝集成。

2. 为什么选择Kafka Connect?

Kafka Connect为Kafka与ElasticSearch等数据存储系统之间的数据流传输提供了便捷方案。相较于编写自定义应用消费Kafka数据并导入ElasticSearch,使用Kafka Connect更具优势,因为它专为可扩展性、容错性和可管理性设计。主要优势包括:

可扩展性:支持分布式模式运行,多个Worker可分担负载
容错性:自动处理故障,确保数据正确性和完整性,提升管道健壮性
自助式连接器:无需开发定制集成组件或服务
高度可配置:通过简单配置和API即可轻松设置和管理

3. Docker环境搭建

使用Docker部署和管理环境,可简化安装过程并避免平台依赖问题。所有服务均使用官方镜像。

通过Docker Compose文件启动以下服务:Kafka、Zookeeper、ElasticSearch和Kafka Connect。本文不深入探讨Kafka搭建细节,点击此处了解更多

首先创建Docker Compose文件:

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - "2181:2181"
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.6.0
    environment:
      discovery.type: single-node
      xpack.security.enabled: "false"
    ports:
      - "9200:9200"
  kafka-connect:
    image: confluentinc/cp-kafka-connect:latest
    depends_on:
      - kafka
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
      CONNECT_GROUP_ID: kafka-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1

该配置创建了:

  • Zookeeper服务管理Kafka集群配置
  • Kafka Broker处理Topic数据
  • ElasticSearch实例(为简化关闭了认证)
  • Kafka Connect服务(基础配置仅用于本地运行)

⚠️ 注意:以上配置仅适用于测试环境,生产环境需考虑容错和弹性设计。

启动服务:

# 后台运行
docker compose up -d

容器启动后,需手动安装ElasticSearch Sink Connector:

docker exec -it kafka-connect bash -c \
  "confluent-hub install --no-prompt \
  confluentinc/kafka-connect-elasticsearch:latest"

重启Kafka Connect服务使新插件生效:

docker restart kafka-connect

验证安装结果:

curl -s http://localhost:8083/connector-plugins | jq .

响应中应包含io.confluent.connect.elasticsearch.ElasticsearchSinkConnector

4. 快速入门

现在尝试发送第一条从Kafka流向ElasticSearch的消息。首先创建Topic:

docker exec -it $(
  docker ps --filter "name=kafka" --format "{{.ID}}"
) bash -c \
  "kafka-topics --create --topic logs \
  --bootstrap-server kafka:9092 \
  --partitions 1 \
  --replication-factor 1"

创建连接器配置文件test-connector.json

{
    "name": "elasticsearch-sink-connector-test",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "type.name": "_doc",
        "connection.url": "http://elasticsearch:9200",
        "tasks.max": "1",
        "topics": "logs",
        "key.ignore": "true",
        "schema.ignore": "true"
    }
}

创建Kafka连接器:

curl -X POST -H 'Content-Type: application/json' \
--data @test-connector.json \
http://localhost:8083/connectors

验证连接器状态:

curl http://localhost:8083/connectors/elasticsearch-sink-connector-test/status

发送测试消息:

docker exec -it $(docker ps --filter "name=kafka" --format "{{.ID}}") \
kafka-console-producer --broker-list kafka:9092 --topic logs

输入消息:

{"message": "Hello world", "timestamp": "2025-02-05T12:00:00Z"}
{"message": "Test Kafka Connector", "timestamp": "2025-02-05T13:00:00Z"}

验证ElasticSearch数据:

curl -X GET "http://localhost:9200/logs/_search?pretty"

数据已自动从Kafka流向ElasticSearch,仅需绑定Topic与索引即可实现同步。

5. Kafka Connect ElasticSearch Sink高级场景

Kafka连接器提供强大机制实现数据存储与Kafka的集成,通过丰富配置选项满足多样化场景需求。

5.1. 处理Avro格式消息

Avro因其高效的序列化和模式演化能力被广泛使用。ElasticSearch可基于Avro Schema自动检测字段类型。

首先添加Schema Registry服务:

schema-registry:
    image: confluentinc/cp-schema-registry:latest
    depends_on:
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "kafka:9092"
      SCHEMA_REGISTRY_HOST_NAME: "schema-registry"

更新Docker Compose后启动:

docker compose up -d

创建Avro Topic:

docker exec -it $(
  docker ps --filter "name=kafka" --format "{{.ID}}"
) bash -c \
"kafka-topics --create \
  --topic avro_logs \
  --bootstrap-server kafka:9092 \
  --partitions 1 \
  --replication-factor 1"

创建连接器配置avro-sink-config.json

{
  "name": "avro-elasticsearch-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "type.name": "_doc",
    "connection.url": "http://elasticsearch:9200",
    "tasks.max": "1",
    "topics": "avro_logs",
    "key.ignore": "true",
    "schema.ignore": "false",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

关键配置说明:

  • schema.ignore: false:使用消息Schema创建ElasticSearch文档
  • value.converter:指定Avro格式转换器
  • value.converter.schema.registry.url:Schema Registry地址

创建连接器:

curl -X POST -H "Content-Type: application/json" \
--data @avro-sink-config.json \
http://localhost:8083/connectors

发送Avro消息:

docker exec -it $(
  docker ps --filter "name=schema-registry" --format "{{.ID}}"
) kafka-avro-console-producer \
  --broker-list kafka:9092 \
  --topic avro_logs \
  --property value.schema='{
   "type": "record",
   "name": "LogEntry",
   "fields": [
     {"name": "message", "type": "string"},
     {"name": "timestamp", "type": "long"}
   ]
 }'

输入测试消息:

{"message": "My Avro message", "timestamp": 1700000000}

验证ElasticSearch数据和映射:

curl -X GET "http://localhost:9200/avro_logs/_search?pretty"
curl -X GET "http://localhost:9200/avro_logs/_mapping"

清理测试资源:

curl -X DELETE "http://localhost:9200/avro_logs"
curl -X DELETE "http://localhost:8083/connectors/avro-elasticsearch-sink"

5.2. 时间戳转换

使用连接器配置timestamp-transform-sink.json将Unix时间戳转换为ISO-8601格式:

{
    "name": "timestamp-transform-sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "type.name": "_doc",
        "connection.url": "http://elasticsearch:9200",
        "tasks.max": "1",
        "topics": "epoch_logs",
        "key.ignore": "true",
        "schema.ignore": "true",
        "transforms": "TimestampConverter",
        "transforms.TimestampConverter.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.TimestampConverter.field": "timestamp",
        "transforms.TimestampConverter.target.type": "string",
        "transforms.TimestampConverter.format": "yyyy-MM-dd'T'HH:mm:ssZ"
    }
}

关键配置:

  • transforms:定义转换器名称
  • TimestampConverter:指定字段转换规则

创建连接器:

curl -X POST -H "Content-Type: application/json" \
--data @timestamp-transform-sink.json \
http://localhost:8083/connectors

发送测试消息:

docker exec -it $(
  docker ps --filter "name=kafka" --format "{{.ID}}"
) kafka-console-producer \
  --broker-list kafka:9092 \
  --topic epoch_logs

输入:

{"message": "Timestamp transformation", "timestamp": 1700000000000}

验证转换结果:

curl -X GET "http://localhost:9200/epoch_logs/_search?pretty"
curl -X GET "http://localhost:9200/epoch_logs/_mapping"

5.3. 错误处理与日志记录

默认情况下连接器的errors.tolerance属性为none,遇到错误会停止处理。但在实时处理场景中,我们可能需要忽略错误继续处理。

创建测试Topic:

docker exec -it $(
  docker ps --filter "name=kafka" --format "{{.ID}}"
) bash -c \
"kafka-topics --create \
  --topic test-error-handling \
  --bootstrap-server kafka:9092 \
  --partitions 1 \
  --replication-factor 1"

配置连接器error-handling-sink-config.json

{
    "name": "error-handling-elasticsearch-sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "type.name": "_doc",
        "connection.url": "http://elasticsearch:9200",
        "tasks.max": "1",
        "topics": "test-error-handling",
        "key.ignore": "true",
        "schema.ignore": "true",
        "behavior.on.malformed.documents": "warn",
        "behavior.on.error": "LOG",
        "errors.tolerance": "all",
        "errors.log.enable": "true",
        "errors.log.include.messages": "true"
    }
}

关键配置:

  • behavior.on.malformed.documents:记录无效文档而非停止
  • errors.tolerance: all:允许继续处理有效消息
  • errors.log.enable:启用错误日志
  • errors.log.include.messages:日志中包含问题消息

注册连接器:

curl -X POST -H "Content-Type: application/json" \
--data @error-handling-sink-config.json \
http://localhost:8083/connectors

发送测试消息(包含错误数据):

{"message": "Ok", "timestamp": "2025-02-08T12:00:00Z"}
{"message": "NOK", "timestamp": "invalid_timestamp"}
{"message": "Ok Again", "timestamp": "2025-02-08T13:00:00Z"}

验证ElasticSearch数据:

curl -X GET "http://localhost:9200/test-error-handling/_search?pretty"

仅有效消息被索引。

查看错误日志:

docker logs kafka-connect | grep "ERROR"

日志显示处理offset 1时出错,但连接器仍保持运行状态。

5.4. 批量处理与刷写优化

大规模数据流处理涉及多个性能参数。以下关键配置直接影响效率和可扩展性:

参数 默认值 说明
batch.size 2000 单批次记录数(1-1,000,000)
bulk.size.bytes 5MB 批量请求大小(可达GB级)
max.in.flight.requests 5 并发请求数(1-1000)
max.buffered.records 20,000 缓冲记录数(1-2,147,483,647)
linger.ms 1 批次等待时间(0-604,800,000ms)
flush.timeout.ms 3分钟 刷写超时(可达小时级)
flush.synchronously false 同步刷写模式
max.retries 5 最大重试次数
retry.backoff.ms 100 重试间隔(ms)
connection.compression false 连接压缩
write.method INSERT 写入模式(可UPSERT)
read.timeout.ms 3分钟 读取超时(可达小时级)

⚠️ 建议:生产环境需根据实际负载进行容量规划,测试不同配置组合以优化性能。完整参数列表参考官方文档

6. 总结

通过本指南,我们成功使用Kafka Connect Sink构建了从Kafka到ElasticSearch的近实时数据管道。高级场景测试验证了该方案在处理数据转换和不同摄入策略时的灵活性。同时深入了解了连接器提供的优化机制,可根据实际需求精细调整流处理管道。


原始标题:How to Connect Kafka with ElasticSearch | Baeldung