1. 概述

Apache Flink 是一个大数据处理框架,能让程序员以高效且可扩展的方式处理海量数据。

本文将介绍 Apache Flink Java API 中的核心概念和标准数据转换操作。该 API 的流式风格使其能轻松操作 Flink 的核心构造——分布式集合。

首先,我们将通过实现一个单词计数程序来了解 Flink 的 DataSet API 转换操作。然后简要介绍 Flink 的 DataStream API,它支持实时处理事件流。

2. Maven 依赖

首先添加以下 Maven 依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.16.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-test-utils</artifactId>
    <version>1.16.1</version>
    <scope>test</scope>
</dependency>

3. 核心 API 概念

使用 Flink 时需要了解几个关键概念:

  • 所有 Flink 程序都在分布式数据集上执行转换操作。提供了多种数据转换函数,包括过滤、映射、连接、分组和聚合
  • Flink 中的 sink 操作会触发流执行以产生程序结果,例如将结果保存到文件系统或打印到标准输出
  • Flink 转换是惰性的,只有在调用 sink 操作时才会执行
  • Apache Flink API 支持两种操作模式——批处理和实时处理。处理有限数据源时使用 DataSet API;处理无界实时数据流时使用 DataStream API

4. DataSet API 转换操作

Flink 程序的入口是 ExecutionEnvironment 类实例,它定义了程序的执行上下文。

创建执行环境:

ExecutionEnvironment env
  = ExecutionEnvironment.getExecutionEnvironment();

⚠️ 注意:在本地机器运行时,程序会在本地 JVM 执行。若要在集群运行,需在集群机器安装 Apache Flink 并相应配置 ExecutionEnvironment

4.1 创建 DataSet

要执行数据转换,首先需要提供数据源:

DataSet<Integer> amounts = env.fromElements(1, 29, 40, 50);

可以从多种数据源创建 DataSet,如 Apache Kafka、CSV 文件、普通文件或任何其他数据源。

4.2 过滤与归约

创建 DataSet 实例后,可对其应用转换操作。例如过滤大于阈值的数字并求和:

int threshold = 30;
List<Integer> collect = amounts
  .filter(a -> a > threshold)
  .reduce((integer, t1) -> integer + t1)
  .collect();

assertThat(collect.get(0)).isEqualTo(90);

collect() 方法是 sink 操作,会触发实际的数据转换

4.3 映射

假设有一个 Person 对象的 DataSet

private static class Person {
    private int age;
    private String name;

    // 标准构造函数/getter/setter
}

创建数据集:

DataSet<Person> personDataSource = env.fromCollection(
  Arrays.asList(
    new Person(23, "Tom"),
    new Person(75, "Michael")));

若只需提取每个对象的 age 字段,使用 map() 转换:

List<Integer> ages = personDataSource
  .map(p -> p.age)
  .collect();

assertThat(ages).hasSize(2);
assertThat(ages).contains(23, 75);

4.4 连接

当需要连接两个数据集时,可使用 join() 转换。创建用户交易和地址数据集:

Tuple3<Integer, String, String> address
  = new Tuple3<>(1, "5th Avenue", "London");
DataSet<Tuple3<Integer, String, String>> addresses
  = env.fromElements(address);

Tuple2<Integer, String> firstTransaction 
  = new Tuple2<>(1, "Transaction_1");
DataSet<Tuple2<Integer, String>> transactions 
  = env.fromElements(firstTransaction, new Tuple2<>(12, "Transaction_2"));

两个元组的第一个字段是 Integer 类型的 id,我们将基于此字段连接。实现 KeySelector 接口:

private static class IdKeySelectorTransaction 
  implements KeySelector<Tuple2<Integer, String>, Integer> {
    @Override
    public Integer getKey(Tuple2<Integer, String> value) {
        return value.f0;
    }
}

private static class IdKeySelectorAddress 
  implements KeySelector<Tuple3<Integer, String, String>, Integer> {
    @Override
    public Integer getKey(Tuple3<Integer, String, String> value) {
        return value.f0;
    }
}

这里不能使用 lambda 表达式,因为 Flink 需要泛型类型信息

执行连接:

List<Tuple2<Tuple2<Integer, String>, Tuple3<Integer, String, String>>>
  joined = transactions.join(addresses)
  .where(new IdKeySelectorTransaction())
  .equalTo(new IdKeySelectorAddress())
  .collect();

assertThat(joined).hasSize(1);
assertThat(joined).contains(new Tuple2<>(firstTransaction, address));

4.5 排序

Tuple2 集合排序:

Tuple2<Integer, String> secondPerson = new Tuple2<>(4, "Tom");
Tuple2<Integer, String> thirdPerson = new Tuple2<>(5, "Scott");
Tuple2<Integer, String> fourthPerson = new Tuple2<>(200, "Michael");
Tuple2<Integer, String> firstPerson = new Tuple2<>(1, "Jack");
DataSet<Tuple2<Integer, String>> transactions = env.fromElements(
  fourthPerson, secondPerson, thirdPerson, firstPerson);

按元组第一个字段排序:

List<Tuple2<Integer, String>> sorted = transactions
  .sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING)
  .collect();

assertThat(sorted)
  .containsExactly(firstPerson, secondPerson, thirdPerson, fourthPerson);

5. 单词计数

单词计数是展示大数据处理框架能力的经典案例。我们使用 Flink 实现这个功能。

首先创建 LineSplitter 类,将输入拆分为单词,并为每个单词生成键值对(单词, 1)。该类实现 FlatMapFunction 接口:

public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        Stream.of(value.toLowerCase().split("\\W+"))
          .filter(t -> t.length() > 0)
          .forEach(token -> out.collect(new Tuple2<>(token, 1)));
    }
}

通过 Collectorcollect() 方法将数据推入处理管道。

最后按单词分组并对计数求和:

public static DataSet<Tuple2<String, Integer>> startWordCount(
  ExecutionEnvironment env, List<String> lines) throws Exception {
    DataSet<String> text = env.fromCollection(lines);

    return text.flatMap(new LineSplitter())
      .groupBy(0)
      .aggregate(Aggregations.SUM, 1);
}

使用了三种 Flink 转换:flatMap()groupBy()aggregate()

测试验证:

List<String> lines = Arrays.asList(
  "This is a first sentence",
  "This is a second sentence with a one word");

DataSet<Tuple2<String, Integer>> result = WordCount.startWordCount(env, lines);

List<Tuple2<String, Integer>> collect = result.collect();
 
assertThat(collect).containsExactlyInAnyOrder(
  new Tuple2<>("a", 3), new Tuple2<>("sentence", 2), new Tuple2<>("word", 1),
  new Tuple2<>("is", 2), new Tuple2<>("this", 2), new Tuple2<>("second", 1),
  new Tuple2<>("first", 1), new Tuple2<>("with", 1), new Tuple2<>("one", 1));

6. DataStream API

6.1 创建 DataStream

Flink 通过 DataStream API 支持事件流处理。首先创建 StreamExecutionEnvironment

StreamExecutionEnvironment executionEnvironment
 = StreamExecutionEnvironment.getExecutionEnvironment();

从字符串元素创建事件流:

DataStream<String> dataStream = executionEnvironment.fromElements(
  "This is a first sentence", 
  "This is a second sentence with a one word");

应用转换操作:

SingleOutputStreamOperator<String> upperCase = text.map(String::toUpperCase);

触发执行(print() 是 sink 操作):

upperCase.print();
env.execute();

输出结果:

1> THIS IS A FIRST SENTENCE
2> THIS IS A SECOND SENTENCE WITH A ONE WORD

6.2 事件窗口

实时处理事件流时,可能需要将事件分组并在时间窗口上计算。假设事件包含编号和时间戳,允许最多 20 秒的乱序事件。

创建模拟事件流并定义时间戳提取器:

SingleOutputStreamOperator<Tuple2<Integer, Long>> windowed
  = env.fromElements(
  new Tuple2<>(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()),
  new Tuple2<>(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond()))
  .assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor
      <Tuple2<Integer, Long>>(Time.seconds(20)) {
 
        @Override
        public long extractTimestamp(Tuple2<Integer, Long> element) {
          return element.f1 * 1000;
        }
    });

定义 5 秒滚动窗口并获取最大值:

SingleOutputStreamOperator<Tuple2<Integer, Long>> reduced = windowed
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
  .maxBy(0, true);
reduced.print();

输出结果(第二个事件因超时被丢弃):

1> (15,1491221519)

7. 总结

本文介绍了 Apache Flink 框架及其 API 的核心转换操作。我们使用 Flink 流畅的 DataSet API 实现了单词计数程序,并探讨了 DataStream API 的实时事件处理能力。

所有代码示例可在 GitHub 获取,这是一个 Maven 项目,可直接导入运行。


原始标题:Introduction to Apache Flink with Java