1. 概述

Logstash 和 Kafka 都是处理实时数据流的强大工具。Kafka 作为分布式事件流平台表现卓越,而 Logstash 则是用于摄取、过滤和转发数据到各种输出的数据处理管道。

本教程将深入分析 Kafka 和 Logstash 的区别,并提供使用示例。

2. 环境准备

在对比 Logstash 和 Kafka 之前,请确保已安装以下必备组件并掌握相关基础知识:

Java 8 或更高版本
Logstash(属于 ELK 技术栈但可独立使用)
Kafka(需理解发布-订阅模型)

安装指南:

3. Logstash 深入解析

3.1 核心组件

Logstash 是 ELK 技术栈中的开源数据处理管道,用于收集、处理和转发多源数据。其核心组件协同工作实现数据收集、转换和输出:

  1. 输入插件(Inputs)
    从日志文件、数据库、Kafka 等消息队列或云服务获取数据,定义原始数据来源。

  2. 过滤器(Filters)
    处理和转换数据的核心组件:

    • Grok:解析非结构化数据
    • Mutate:修改字段
    • Date:时间戳格式化
    • 支持深度自定义和数据预处理
  3. 输出插件(Outputs)
    将处理后数据发送到 Elasticsearch、数据库、消息队列或本地文件,支持并行输出到多个接口。

  4. 编解码器(Codecs)
    编码/解码数据流(如 JSON 转结构化对象),作为数据摄入或输出时的微型插件。

  5. 管道(Pipelines)
    定义输入→过滤→输出的完整数据流,可创建多阶段复杂工作流。

3.2 实战示例

将日志文件处理为 JSON 格式输出:

创建输入文件 /tmp/example.log

2024-10-12 10:01:15 INFO User login successful
2024-10-12 10:05:32 ERROR Database connection failed
2024-10-12 10:10:45 WARN Disk space running low

执行 Logstash 命令:

$ sudo logstash -e '
input { 
  file { 
    path => "/tmp/example.log" 
    start_position => "beginning" 
    sincedb_path => "/dev/null" 
  } 
} 
filter { 
  grok { 
    match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}" }
  } 
  mutate {
    remove_field => ["log", "timestamp", "event", "@timestamp"]
  }
}
output { 
  file {
    path => "/tmp/processed-logs.json"
    codec => json_lines
  }
}'

配置解析:

  • 整个命令链构成一个管道
  • Grok 过滤器提取时间戳、日志级别和消息字段
  • Mutate 过滤器移除冗余字段
  • 输出阶段使用 JSON 编码

输出结果 /tmp/processed-logs.json

{"message":["2024-10-12 10:05:32 ERROR Database connection failed","Database connection failed"],"host":{"name":"baeldung"},"@version":"1"}
{"message":["2024-10-12 10:10:45 WARN Disk space running low","Disk space running low"],"host":{"name":"baeldung"},"@version":"1"}
{"message":["2024-10-12 10:01:15 INFO User login successful","User login successful"],"host":{"name":"baeldung"},"@version":"1"}

⚠️ 输出包含额外元数据(如 @version),可用于版本控制和下游系统兼容性保障。

4. Kafka 核心机制

4.1 关键组件

Apache Kafka 是构建实时数据管道和应用的开源分布式事件流平台。核心组件包括:

  1. 主题与分区(Topics and Partitions)
    消息按主题分类,每个主题分为多个分区实现并行处理。例如电商系统中订单、支付、用户活动可分属不同主题。

  2. 生产者与消费者(Producers and Consumers)

    • 生产者:向主题发布消息
    • 消费者:订阅主题并按分区顺序读取消息
    • 支持高吞吐量分布式消息传递
  3. 代理节点(Brokers)
    存储和管理主题分区的服务器集群,提供故障转移和高可用性保障。

  4. Kafka Streams 与 Kafka Connect

    • Streams:实时流处理库(如金融交易模式检测)
    • Connect:简化外部系统集成(数据库/云服务连接器)
  5. ZooKeeper 与 KRaft

    • 传统架构依赖 ZooKeeper 管理元数据和领导者选举
    • 新版支持 KRaft 协议实现去 ZooKeeper 化

4.2 实战演示

创建主题:

$ /bin/kafka-topics.sh \
  --create \
  --topic hello-world \
  --bootstrap-server localhost:9092 \
  --partitions 1 \
  --replication-factor 1
Created topic hello-world.

发送消息:

$ /bin/kafka-console-producer.sh \
  --topic hello-world \
  --bootstrap-server localhost:9092 \
  <<< "Hello, World!"

消费消息:

$ /bin/kafka-console-consumer.sh \
  --topic hello-world \
  --from-beginning \
  --bootstrap-server localhost:9092
Hello, World!

5. 核心差异对比

5.1 Logstash 定位

专注于数据摄取、转换和分发,核心优势在于日志解析与数据增强。典型场景:

  • 收集多服务器日志
  • 提取时间戳/错误信息等关键字段
  • 转发至 Elasticsearch 进行监控分析

5.2 Kafka 定位

分布式流处理平台,擅长高吞吐、容错的实时数据流传输。典型场景:

  • 捕获电商用户行为事件
  • 实时流式处理(推荐引擎/欺诈检测)
  • 多系统数据分发

5.3 差异矩阵

特性 Logstash Kafka
核心用途 日志/事件数据处理管道 分布式实时消息流平台
架构模式 插件式管道(输入/过滤/输出) 集群式(生产者/消费者/代理)
消息保留 实时处理,不持久存储 可配置保留期,支持消息重放
数据摄取 多源输入(文件/数据库/日志等) 分布式高吞吐生产者写入
数据转换 强大(Grok/Mutate/GeoIP等过滤器) 有限(通常由下游系统处理)
传递保证 无内置传递语义 支持至少一次/至多一次/精确一次
集成重点 数据源与存储/监控系统对接 分布式流处理与分析平台集成
典型场景 集中日志管理/实时监控 事件驱动架构/流分析/数据管道

6. 协同工作模式

Logstash 与 Kafka 可无缝集成构建强健的数据处理管道:

6.1 Logstash 作为数据入口

  • 收集微服务日志
  • 应用过滤器提取关键信息
  • 转换结构化数据后写入 Kafka 主题

6.2 Kafka 作为数据中枢

  • 存储交易数据流
  • 支持多消费者并行处理:
    • 欺诈检测系统
    • 实时分析平台
    • 报表工具

6.3 协同优势

通过集成可实现: ✅ 高吞吐数据摄取(Logstash)
✅ 可靠消息传递(Kafka)
✅ 解耦数据生产与消费
✅ 弹性扩展架构

7. 总结

本文通过架构分析和实战示例对比了 Logstash 与 Kafka:

  • Logstash 专注数据转换与日志处理
  • Kafka 擅长分布式实时流传输
  • 两者可协同构建高性能数据管道

根据实际需求选择:

  • 需要复杂日志解析 → Logstash
  • 需要高吞吐事件流 → Kafka
  • 需要完整数据管道 → Logstash + Kafka

原始标题:Logstash vs. Kafka | Baeldung