1. 概述
Apache Kafka 是一个可扩展、高性能、低延迟的平台,支持像消息系统一样读写数据流。在 Java 中使用 Kafka 相对简单,可以快速上手。
Spark Streaming 是 Apache Spark 平台的一部分,支持对数据流进行可扩展、高吞吐量、容错的处理。虽然 Spark 是用 Scala 编写的,但它提供了 Java API,便于我们开发。
Apache Cassandra 是一种分布式的宽列 NoSQL 数据库。我们之前的文章中已经介绍过 Cassandra 的使用。
在本教程中,我们将结合这三者,构建一个 高可扩展、容错的实时数据管道,用于处理实时数据流。
2. 环境准备
在开始之前,我们需要在本地安装 Kafka、Spark 和 Cassandra,以便运行示例应用。后续我们将逐步展示如何使用这些技术构建一个 数据管道。
为了简化操作,我们保留所有默认配置(包括端口),这样可以避免配置冲突,确保教程顺利运行。
2.1. Kafka
在本地安装 Kafka 非常简单,可以参考 官方文档。我们将使用 Kafka 2.1.0 版本。
⚠️ 注意:Kafka 依赖 Apache Zookeeper 才能运行。不过,本教程中我们会使用 Kafka 自带的单节点 Zookeeper 实例。
按照官方指南启动 Zookeeper 和 Kafka 后,我们可以创建一个名为 “messages” 的 topic:
$KAFKA_HOME$\bin\windows\kafka-topics.bat --create \
--zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic messages
📌 上述脚本适用于 Windows 平台,Unix 系统也有对应脚本。
2.2. Spark
Spark 使用 Hadoop 的客户端库来访问 HDFS 和 YARN。因此,版本兼容性非常重要。幸运的是,Spark 官方下载页面 提供了预编译的 Hadoop 版本。本教程使用的是 “pre-built for Apache Hadoop 2.7 and later” 的 Spark 2.3.0。
解压后,可以使用自带脚本提交应用。后续我们将在 Spring Boot 项目中演示。
2.3. Cassandra
DataStax 提供了适用于不同平台(包括 Windows)的 Cassandra 社区版。可以按照 官方文档 安装。本教程使用 Cassandra 3.9.0。
安装并启动 Cassandra 后,使用 CQL Shell 创建 keyspace 和表:
CREATE KEYSPACE vocabulary
WITH REPLICATION = {
'class' : 'SimpleStrategy',
'replication_factor' : 1
};
USE vocabulary;
CREATE TABLE words (word text PRIMARY KEY, count int);
📌 我们创建了一个名为 vocabulary
的 keyspace,以及一个包含 word
和 count
两个字段的表 words
。
3. 依赖配置
我们通过 Maven 引入 Kafka 和 Spark 的依赖,这些依赖都可以从 Maven Central 获取:
将以下依赖添加到 pom.xml
:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.11</artifactId>
<version>1.5.2</version>
</dependency>
📌 注意:部分依赖标记为 provided
,因为它们将在运行时由 Spark 环境提供。
4. Spark Streaming 与 Kafka 集成策略
在继续之前,有必要了解一下 Spark 与 Kafka 的集成方式。
Kafka 在 0.8 和 0.10 版本之间引入了新的消费者 API,因此 Spark Streaming 提供了对应的两个版本集成包。选择时需注意版本兼容性。
4.1. Spark Streaming Kafka 0.8
0.8 版本是稳定版本,支持 Receiver-based 和 Direct 两种方式。具体细节可以参考 官方文档。该版本兼容 Kafka Broker 0.8.2.1 及以上。
4.2. Spark Streaming Kafka 0.10
0.10 版本目前处于实验阶段,仅支持 Direct 方式,并使用 Kafka 新的消费者 API。更多信息参考 官方文档。⚠️ 不兼容旧版本 Kafka Broker。
本教程使用的是 0.10 版本,前文的依赖也对应于此。
5. 开发数据管道
我们将用 Java 编写一个 Spark 应用,从 Kafka 读取消息,统计词频,并写入 Cassandra。
数据流向如下:
5.1. 初始化 JavaStreamingContext
JavaStreamingContext
是 Spark Streaming 应用的入口:
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("WordCountingApp");
sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");
JavaStreamingContext streamingContext = new JavaStreamingContext(
sparkConf, Durations.seconds(1));
5.2. 从 Kafka 获取 DStream
通过 JavaStreamingContext
连接 Kafka:
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("messages");
JavaInputDStream<ConsumerRecord<String, String>> messages =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));
📌 注意:我们提供了 key 和 value 的反序列化器。对于 String 类型,Spark 提供了默认实现;如需自定义类型,需提供自定义反序列化器。
我们获取的是 JavaInputDStream
,它是 DStream(离散化流)的实现,Spark Streaming 的核心抽象,内部是一系列连续的 RDD。
5.3. 处理 DStream
对 DStream 执行一系列操作,统计词频:
JavaPairDStream<String, String> results = messages
.mapToPair(
record -> new Tuple2<>(record.key(), record.value())
);
JavaDStream<String> lines = results
.map(
tuple2 -> tuple2._2()
);
JavaDStream<String> words = lines
.flatMap(
x -> Arrays.asList(x.split("\\s+")).iterator()
);
JavaPairDStream<String, Integer> wordCounts = words
.mapToPair(
s -> new Tuple2<>(s, 1)
).reduceByKey(
(i1, i2) -> i1 + i2
);
5.4. 将处理结果写入 Cassandra
遍历处理后的 JavaPairDStream
,将词频写入 Cassandra:
wordCounts.foreachRDD(
javaRdd -> {
Map<String, Integer> wordCountMap = javaRdd.collectAsMap();
for (String key : wordCountMap.keySet()) {
List<Word> wordList = Arrays.asList(new Word(key, wordCountMap.get(key)));
JavaRDD<Word> rdd = streamingContext.sparkContext().parallelize(wordList);
javaFunctions(rdd).writerBuilder(
"vocabulary", "words", mapToRow(Word.class)).saveToCassandra();
}
}
);
5.5. 启动应用
由于是流处理应用,我们需要让它持续运行:
streamingContext.start();
streamingContext.awaitTermination();
6. 使用 Checkpoint 保存状态
在流处理中,跨批次保留状态非常有用。
比如,之前我们只统计了当前批次的词频。如果想统计累计词频,可以通过 Spark 的 checkpoint 机制 实现。
数据流如下:
📌 注意:我们这里只用于会话状态保存,不提供容错。但 checkpoint 也可用于容错。
需要修改代码,设置 checkpoint 路径:
streamingContext.checkpoint("./.checkpoint");
📌 本地路径仅用于演示,生产环境应使用 HDFS、S3 等。
然后使用 mapWithState
累计词频:
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> cumulativeWordCounts = wordCounts
.mapWithState(
StateSpec.function(
(word, one, state) -> {
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
state.update(sum);
return output;
}
)
);
⚠️ 注意:checkpoint 会带来延迟开销,需合理设置 checkpoint 间隔。
7. 理解 Kafka Offset
回顾之前设置的 Kafka 参数:
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
📌 这表示我们不自动提交 offset,每次启动消费者时从最新位置读取。因此,应用只能消费运行期间的消息。
如果希望消费所有消息(包括应用未运行时的),需要手动管理 offset,这超出了本教程范围。
✅ Spark Streaming 通过 offset 管理可实现 “exactly once” 语义,即每条消息只处理一次。
8. 部署应用
使用 Spark 提供的 spark-submit
脚本部署应用:
$SPARK_HOME$\bin\spark-submit \
--class com.baeldung.data.pipeline.WordCountingAppWithCheckpoint \
--master local[2]
\target\spark-streaming-app-0.0.1-SNAPSHOT-jar-with-dependencies.jar
📌 注意:Maven 打包时,需包含非 provided 的依赖。
提交后,向 Kafka topic 发送消息,即可在 Cassandra 中看到累计词频。
9. 总结
本教程中,我们使用 Kafka、Spark Streaming 和 Cassandra 构建了一个简单的实时数据管道,并展示了如何使用 checkpoint 保持状态。
一如既往,示例代码可在 GitHub 获取。