1. 概述
Apache Flink 是一个大数据处理框架,能让程序员以高效且可扩展的方式处理海量数据。
本文将介绍 Apache Flink Java API 中的核心概念和标准数据转换操作。该 API 的流式风格使其能轻松操作 Flink 的核心构造——分布式集合。
首先,我们将通过实现一个单词计数程序来了解 Flink 的 DataSet API 转换操作。然后简要介绍 Flink 的 DataStream API,它支持实时处理事件流。
2. Maven 依赖
首先添加以下 Maven 依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.16.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>1.16.1</version>
<scope>test</scope>
</dependency>
3. 核心 API 概念
使用 Flink 时需要了解几个关键概念:
- 所有 Flink 程序都在分布式数据集上执行转换操作。提供了多种数据转换函数,包括过滤、映射、连接、分组和聚合
- Flink 中的 sink 操作会触发流执行以产生程序结果,例如将结果保存到文件系统或打印到标准输出
- Flink 转换是惰性的,只有在调用 sink 操作时才会执行
- Apache Flink API 支持两种操作模式——批处理和实时处理。处理有限数据源时使用 DataSet API;处理无界实时数据流时使用 DataStream API
4. DataSet API 转换操作
Flink 程序的入口是 ExecutionEnvironment
类实例,它定义了程序的执行上下文。
创建执行环境:
ExecutionEnvironment env
= ExecutionEnvironment.getExecutionEnvironment();
⚠️ 注意:在本地机器运行时,程序会在本地 JVM 执行。若要在集群运行,需在集群机器安装 Apache Flink 并相应配置 ExecutionEnvironment
4.1 创建 DataSet
要执行数据转换,首先需要提供数据源:
DataSet<Integer> amounts = env.fromElements(1, 29, 40, 50);
可以从多种数据源创建 DataSet,如 Apache Kafka、CSV 文件、普通文件或任何其他数据源。
4.2 过滤与归约
创建 DataSet 实例后,可对其应用转换操作。例如过滤大于阈值的数字并求和:
int threshold = 30;
List<Integer> collect = amounts
.filter(a -> a > threshold)
.reduce((integer, t1) -> integer + t1)
.collect();
assertThat(collect.get(0)).isEqualTo(90);
✅ collect()
方法是 sink 操作,会触发实际的数据转换
4.3 映射
假设有一个 Person 对象的 DataSet:
private static class Person {
private int age;
private String name;
// 标准构造函数/getter/setter
}
创建数据集:
DataSet<Person> personDataSource = env.fromCollection(
Arrays.asList(
new Person(23, "Tom"),
new Person(75, "Michael")));
若只需提取每个对象的 age 字段,使用 map()
转换:
List<Integer> ages = personDataSource
.map(p -> p.age)
.collect();
assertThat(ages).hasSize(2);
assertThat(ages).contains(23, 75);
4.4 连接
当需要连接两个数据集时,可使用 join()
转换。创建用户交易和地址数据集:
Tuple3<Integer, String, String> address
= new Tuple3<>(1, "5th Avenue", "London");
DataSet<Tuple3<Integer, String, String>> addresses
= env.fromElements(address);
Tuple2<Integer, String> firstTransaction
= new Tuple2<>(1, "Transaction_1");
DataSet<Tuple2<Integer, String>> transactions
= env.fromElements(firstTransaction, new Tuple2<>(12, "Transaction_2"));
两个元组的第一个字段是 Integer 类型的 id,我们将基于此字段连接。实现 KeySelector
接口:
private static class IdKeySelectorTransaction
implements KeySelector<Tuple2<Integer, String>, Integer> {
@Override
public Integer getKey(Tuple2<Integer, String> value) {
return value.f0;
}
}
private static class IdKeySelectorAddress
implements KeySelector<Tuple3<Integer, String, String>, Integer> {
@Override
public Integer getKey(Tuple3<Integer, String, String> value) {
return value.f0;
}
}
❌ 这里不能使用 lambda 表达式,因为 Flink 需要泛型类型信息
执行连接:
List<Tuple2<Tuple2<Integer, String>, Tuple3<Integer, String, String>>>
joined = transactions.join(addresses)
.where(new IdKeySelectorTransaction())
.equalTo(new IdKeySelectorAddress())
.collect();
assertThat(joined).hasSize(1);
assertThat(joined).contains(new Tuple2<>(firstTransaction, address));
4.5 排序
对 Tuple2 集合排序:
Tuple2<Integer, String> secondPerson = new Tuple2<>(4, "Tom");
Tuple2<Integer, String> thirdPerson = new Tuple2<>(5, "Scott");
Tuple2<Integer, String> fourthPerson = new Tuple2<>(200, "Michael");
Tuple2<Integer, String> firstPerson = new Tuple2<>(1, "Jack");
DataSet<Tuple2<Integer, String>> transactions = env.fromElements(
fourthPerson, secondPerson, thirdPerson, firstPerson);
按元组第一个字段排序:
List<Tuple2<Integer, String>> sorted = transactions
.sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING)
.collect();
assertThat(sorted)
.containsExactly(firstPerson, secondPerson, thirdPerson, fourthPerson);
5. 单词计数
单词计数是展示大数据处理框架能力的经典案例。我们使用 Flink 实现这个功能。
首先创建 LineSplitter
类,将输入拆分为单词,并为每个单词生成键值对(单词, 1)。该类实现 FlatMapFunction
接口:
public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
Stream.of(value.toLowerCase().split("\\W+"))
.filter(t -> t.length() > 0)
.forEach(token -> out.collect(new Tuple2<>(token, 1)));
}
}
通过 Collector
的 collect()
方法将数据推入处理管道。
最后按单词分组并对计数求和:
public static DataSet<Tuple2<String, Integer>> startWordCount(
ExecutionEnvironment env, List<String> lines) throws Exception {
DataSet<String> text = env.fromCollection(lines);
return text.flatMap(new LineSplitter())
.groupBy(0)
.aggregate(Aggregations.SUM, 1);
}
✅ 使用了三种 Flink 转换:flatMap()
、groupBy()
和 aggregate()
测试验证:
List<String> lines = Arrays.asList(
"This is a first sentence",
"This is a second sentence with a one word");
DataSet<Tuple2<String, Integer>> result = WordCount.startWordCount(env, lines);
List<Tuple2<String, Integer>> collect = result.collect();
assertThat(collect).containsExactlyInAnyOrder(
new Tuple2<>("a", 3), new Tuple2<>("sentence", 2), new Tuple2<>("word", 1),
new Tuple2<>("is", 2), new Tuple2<>("this", 2), new Tuple2<>("second", 1),
new Tuple2<>("first", 1), new Tuple2<>("with", 1), new Tuple2<>("one", 1));
6. DataStream API
6.1 创建 DataStream
Flink 通过 DataStream API 支持事件流处理。首先创建 StreamExecutionEnvironment
:
StreamExecutionEnvironment executionEnvironment
= StreamExecutionEnvironment.getExecutionEnvironment();
从字符串元素创建事件流:
DataStream<String> dataStream = executionEnvironment.fromElements(
"This is a first sentence",
"This is a second sentence with a one word");
应用转换操作:
SingleOutputStreamOperator<String> upperCase = text.map(String::toUpperCase);
触发执行(print()
是 sink 操作):
upperCase.print();
env.execute();
输出结果:
1> THIS IS A FIRST SENTENCE
2> THIS IS A SECOND SENTENCE WITH A ONE WORD
6.2 事件窗口
实时处理事件流时,可能需要将事件分组并在时间窗口上计算。假设事件包含编号和时间戳,允许最多 20 秒的乱序事件。
创建模拟事件流并定义时间戳提取器:
SingleOutputStreamOperator<Tuple2<Integer, Long>> windowed
= env.fromElements(
new Tuple2<>(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()),
new Tuple2<>(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond()))
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor
<Tuple2<Integer, Long>>(Time.seconds(20)) {
@Override
public long extractTimestamp(Tuple2<Integer, Long> element) {
return element.f1 * 1000;
}
});
定义 5 秒滚动窗口并获取最大值:
SingleOutputStreamOperator<Tuple2<Integer, Long>> reduced = windowed
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.maxBy(0, true);
reduced.print();
输出结果(第二个事件因超时被丢弃):
1> (15,1491221519)
7. 总结
本文介绍了 Apache Flink 框架及其 API 的核心转换操作。我们使用 Flink 流畅的 DataSet API 实现了单词计数程序,并探讨了 DataStream API 的实时事件处理能力。
所有代码示例可在 GitHub 获取,这是一个 Maven 项目,可直接导入运行。