1. 概述

Apache Kafka 是一个可扩展、高性能、低延迟的平台,支持像消息系统一样读写数据流。在 Java 中使用 Kafka 相对简单,可以快速上手。

Spark StreamingApache Spark 平台的一部分,支持对数据流进行可扩展、高吞吐量、容错的处理。虽然 Spark 是用 Scala 编写的,但它提供了 Java API,便于我们开发。

Apache Cassandra 是一种分布式的宽列 NoSQL 数据库。我们之前的文章中已经介绍过 Cassandra 的使用

在本教程中,我们将结合这三者,构建一个 高可扩展、容错的实时数据管道,用于处理实时数据流。

2. 环境准备

在开始之前,我们需要在本地安装 Kafka、Spark 和 Cassandra,以便运行示例应用。后续我们将逐步展示如何使用这些技术构建一个 数据管道

为了简化操作,我们保留所有默认配置(包括端口),这样可以避免配置冲突,确保教程顺利运行。

2.1. Kafka

在本地安装 Kafka 非常简单,可以参考 官方文档。我们将使用 Kafka 2.1.0 版本。

⚠️ 注意:Kafka 依赖 Apache Zookeeper 才能运行。不过,本教程中我们会使用 Kafka 自带的单节点 Zookeeper 实例。

按照官方指南启动 Zookeeper 和 Kafka 后,我们可以创建一个名为 “messages” 的 topic:

$KAFKA_HOME$\bin\windows\kafka-topics.bat --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 \
  --topic messages

📌 上述脚本适用于 Windows 平台,Unix 系统也有对应脚本。

2.2. Spark

Spark 使用 Hadoop 的客户端库来访问 HDFS 和 YARN。因此,版本兼容性非常重要。幸运的是,Spark 官方下载页面 提供了预编译的 Hadoop 版本。本教程使用的是 “pre-built for Apache Hadoop 2.7 and later” 的 Spark 2.3.0。

解压后,可以使用自带脚本提交应用。后续我们将在 Spring Boot 项目中演示。

2.3. Cassandra

DataStax 提供了适用于不同平台(包括 Windows)的 Cassandra 社区版。可以按照 官方文档 安装。本教程使用 Cassandra 3.9.0。

安装并启动 Cassandra 后,使用 CQL Shell 创建 keyspace 和表:

CREATE KEYSPACE vocabulary
    WITH REPLICATION = {
        'class' : 'SimpleStrategy',
        'replication_factor' : 1
    };
USE vocabulary;
CREATE TABLE words (word text PRIMARY KEY, count int);

📌 我们创建了一个名为 vocabulary 的 keyspace,以及一个包含 wordcount 两个字段的表 words

3. 依赖配置

我们通过 Maven 引入 Kafka 和 Spark 的依赖,这些依赖都可以从 Maven Central 获取:

将以下依赖添加到 pom.xml

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.3.0</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.3.0</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.3.0</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.3.0</version>
</dependency>
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.11</artifactId>
    <version>2.3.0</version>
</dependency>
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector-java_2.11</artifactId>
    <version>1.5.2</version>
</dependency>

📌 注意:部分依赖标记为 provided,因为它们将在运行时由 Spark 环境提供。

4. Spark Streaming 与 Kafka 集成策略

在继续之前,有必要了解一下 Spark 与 Kafka 的集成方式。

Kafka 在 0.8 和 0.10 版本之间引入了新的消费者 API,因此 Spark Streaming 提供了对应的两个版本集成包。选择时需注意版本兼容性。

4.1. Spark Streaming Kafka 0.8

0.8 版本是稳定版本,支持 Receiver-based 和 Direct 两种方式。具体细节可以参考 官方文档。该版本兼容 Kafka Broker 0.8.2.1 及以上。

4.2. Spark Streaming Kafka 0.10

0.10 版本目前处于实验阶段,仅支持 Direct 方式,并使用 Kafka 新的消费者 API。更多信息参考 官方文档。⚠️ 不兼容旧版本 Kafka Broker

本教程使用的是 0.10 版本,前文的依赖也对应于此。

5. 开发数据管道

我们将用 Java 编写一个 Spark 应用,从 Kafka 读取消息,统计词频,并写入 Cassandra。

数据流向如下:

Simple Data Pipeline 1

5.1. 初始化 JavaStreamingContext

JavaStreamingContext 是 Spark Streaming 应用的入口:

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("WordCountingApp");
sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");

JavaStreamingContext streamingContext = new JavaStreamingContext(
  sparkConf, Durations.seconds(1));

5.2. 从 Kafka 获取 DStream

通过 JavaStreamingContext 连接 Kafka:

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("messages");

JavaInputDStream<ConsumerRecord<String, String>> messages = 
  KafkaUtils.createDirectStream(
    streamingContext, 
    LocationStrategies.PreferConsistent(), 
    ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));

📌 注意:我们提供了 key 和 value 的反序列化器。对于 String 类型,Spark 提供了默认实现;如需自定义类型,需提供自定义反序列化器。

我们获取的是 JavaInputDStream,它是 DStream(离散化流)的实现,Spark Streaming 的核心抽象,内部是一系列连续的 RDD。

5.3. 处理 DStream

对 DStream 执行一系列操作,统计词频:

JavaPairDStream<String, String> results = messages
  .mapToPair( 
      record -> new Tuple2<>(record.key(), record.value())
  );
JavaDStream<String> lines = results
  .map(
      tuple2 -> tuple2._2()
  );
JavaDStream<String> words = lines
  .flatMap(
      x -> Arrays.asList(x.split("\\s+")).iterator()
  );
JavaPairDStream<String, Integer> wordCounts = words
  .mapToPair(
      s -> new Tuple2<>(s, 1)
  ).reduceByKey(
      (i1, i2) -> i1 + i2
    );

5.4. 将处理结果写入 Cassandra

遍历处理后的 JavaPairDStream,将词频写入 Cassandra:

wordCounts.foreachRDD(
    javaRdd -> {
      Map<String, Integer> wordCountMap = javaRdd.collectAsMap();
      for (String key : wordCountMap.keySet()) {
        List<Word> wordList = Arrays.asList(new Word(key, wordCountMap.get(key)));
        JavaRDD<Word> rdd = streamingContext.sparkContext().parallelize(wordList);
        javaFunctions(rdd).writerBuilder(
          "vocabulary", "words", mapToRow(Word.class)).saveToCassandra();
      }
    }
  );

5.5. 启动应用

由于是流处理应用,我们需要让它持续运行:

streamingContext.start();
streamingContext.awaitTermination();

6. 使用 Checkpoint 保存状态

在流处理中,跨批次保留状态非常有用。

比如,之前我们只统计了当前批次的词频。如果想统计累计词频,可以通过 Spark 的 checkpoint 机制 实现。

数据流如下:

Data Pipeline With Checkpoints 1

📌 注意:我们这里只用于会话状态保存,不提供容错。但 checkpoint 也可用于容错。

需要修改代码,设置 checkpoint 路径:

streamingContext.checkpoint("./.checkpoint");

📌 本地路径仅用于演示,生产环境应使用 HDFS、S3 等。

然后使用 mapWithState 累计词频:

JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> cumulativeWordCounts = wordCounts
  .mapWithState(
    StateSpec.function( 
        (word, one, state) -> {
          int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
          Tuple2<String, Integer> output = new Tuple2<>(word, sum);
          state.update(sum);
          return output;
        }
      )
    );

⚠️ 注意:checkpoint 会带来延迟开销,需合理设置 checkpoint 间隔。

7. 理解 Kafka Offset

回顾之前设置的 Kafka 参数:

kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);

📌 这表示我们不自动提交 offset,每次启动消费者时从最新位置读取。因此,应用只能消费运行期间的消息。

如果希望消费所有消息(包括应用未运行时的),需要手动管理 offset,这超出了本教程范围。

✅ Spark Streaming 通过 offset 管理可实现 “exactly once” 语义,即每条消息只处理一次。

8. 部署应用

使用 Spark 提供的 spark-submit 脚本部署应用:

$SPARK_HOME$\bin\spark-submit \
  --class com.baeldung.data.pipeline.WordCountingAppWithCheckpoint \
  --master local[2] 
  \target\spark-streaming-app-0.0.1-SNAPSHOT-jar-with-dependencies.jar

📌 注意:Maven 打包时,需包含非 provided 的依赖。

提交后,向 Kafka topic 发送消息,即可在 Cassandra 中看到累计词频。

9. 总结

本教程中,我们使用 Kafka、Spark Streaming 和 Cassandra 构建了一个简单的实时数据管道,并展示了如何使用 checkpoint 保持状态。

一如既往,示例代码可在 GitHub 获取。


原始标题:Building a Data Pipeline with Kafka, Spark Streaming and Cassandra | Baeldung