1. 概述

Apache Flink 是一个易于与 Java 集成的流处理框架。Apache Kafka 则是一个高可用、分布式的流处理系统。

本教程将介绍如何使用这两个技术构建一个 数据管道

2. 安装配置

关于 Kafka 的安装和配置,请参考 官方指南。安装完成后,可以使用以下命令创建两个新主题:flink_inputflink_output

bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 \
  --topic flink_output

bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 \
  --topic flink_input

为了演示方便,本文使用默认配置和默认端口。

Apache Flink 支持实时流处理,并且支持多种第三方系统作为数据源(source)或数据汇(sink)。

以下是 Flink 支持的一些常见连接器:

  • Apache Kafka(source/sink)✅
  • Apache Cassandra(sink)
  • Amazon Kinesis Streams(source/sink)
  • Elasticsearch(sink)
  • Hadoop FileSystem(sink)
  • RabbitMQ(source/sink)
  • Apache NiFi(source/sink)
  • Twitter Streaming API(source)

要在项目中引入 Flink,需添加以下 Maven 依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.16.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.16.1</version>
</dependency>

添加这些依赖后即可实现与 Kafka 的数据读写操作。当前最新版本可从 Maven Central 获取。

4. Kafka 字符串消费者

要使用 Flink 从 Kafka 消费数据,需要提供 topic 和 Kafka 地址。 同时还应指定 group id,用于记录 offset,避免每次从头开始消费。

我们可以通过一个静态方法简化 FlinkKafkaConsumer 的创建过程:

public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(
  String topic, String kafkaAddress, String kafkaGroup ) {
 
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", kafkaAddress);
    props.setProperty("group.id", kafkaGroup);
    FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
      topic, new SimpleStringSchema(), props);

    return consumer;
}

该方法接收 topickafkaAddresskafkaGroup 参数,并返回一个以字符串形式消费指定 topic 的消费者实例。

其中类名中的数字 011 表示 Kafka 版本号。

5. Kafka 字符串生产者

要向 Kafka 发送数据,需要提供 Kafka 地址和目标 topic。 我们同样可以封装一个静态方法来创建不同 topic 的 producer:

public static FlinkKafkaProducer011<String> createStringProducer(
  String topic, String kafkaAddress){

    return new FlinkKafkaProducer011<>(kafkaAddress,
      topic, new SimpleStringSchema());
}

这个方法只需要 topickafkaAddress 两个参数,因为生产者不需要 group id。

6. 字符串流处理

有了可用的消费者和生产者之后,我们可以尝试对 Kafka 中的数据进行流处理,并将结果写回 Kafka。

完整的流处理函数列表可以参考 这里

在下面的例子中,我们将每条消息中的单词转为大写并写回 Kafka。

为此,我们需要自定义一个 MapFunction

public class WordsCapitalizer implements MapFunction<String, String> {
    @Override
    public String map(String s) {
        return s.toUpperCase();
    }
}

然后在流处理逻辑中使用它:

public static void capitalize() {
    String inputTopic = "flink_input";
    String outputTopic = "flink_output";
    String consumerGroup = "baeldung";
    String address = "localhost:9092";
    StreamExecutionEnvironment environment = StreamExecutionEnvironment
      .getExecutionEnvironment();
    FlinkKafkaConsumer011<String> flinkKafkaConsumer = createStringConsumerForTopic(
      inputTopic, address, consumerGroup);
    DataStream<String> stringInputStream = environment
      .addSource(flinkKafkaConsumer);

    FlinkKafkaProducer011<String> flinkKafkaProducer = createStringProducer(
      outputTopic, address);

    stringInputStream
      .map(new WordsCapitalizer())
      .addSink(flinkKafkaProducer);
}

✅ 应用会从 flink_input 主题读取数据,处理后写入 flink_output 主题。

上面展示了如何使用 Flink 处理字符串数据,但在实际场景中通常需要处理自定义对象。接下来我们将介绍如何实现这一点。

7. 自定义对象反序列化

以下类表示一条包含发送方、接收方等信息的简单消息:

@JsonSerialize
public class InputMessage {
    String sender;
    String recipient;
    LocalDateTime sentAt;
    String message;
}

之前我们使用 SimpleStringSchema 来反序列化 Kafka 消息为字符串,现在我们要直接将其反序列化为自定义对象。

为此需要实现自定义的 DeserializationSchema

public class InputMessageDeserializationSchema implements
  DeserializationSchema<InputMessage> {

    static ObjectMapper objectMapper = new ObjectMapper()
      .registerModule(new JavaTimeModule());

    @Override
    public InputMessage deserialize(byte[] bytes) throws IOException {
        return objectMapper.readValue(bytes, InputMessage.class);
    }

    @Override
    public boolean isEndOfStream(InputMessage inputMessage) {
        return false;
    }

    @Override
    public TypeInformation&lt;InputMessage&gt; getProducedType() {
        return TypeInformation.of(InputMessage.class);
    }
}

我们假设 Kafka 中的消息是 JSON 格式。由于包含 LocalDateTime 类型字段,需要注册 JavaTimeModule 以支持时间类型的映射。

⚠️ 注意:Flink 中的所有算子(如 schema 或 function)都会在作业启动时被序列化,因此不能包含不可序列化的字段。

类似的问题在 Spark 中也存在,一种常见的解决方案是将字段声明为 static,比如上面的 ObjectMapper。虽然不够优雅,但确实有效。

isEndOfStream 方法用于处理特殊场景,比如只处理到某个特定数据为止。在当前示例中不需要使用。

8. 自定义对象序列化

假设我们要实现自动备份功能,每天生成一份完整的消息备份,并为每个备份分配唯一 ID。

为此可创建如下类:

public class Backup {
    @JsonProperty("inputMessages")
    List<InputMessage> inputMessages;
    @JsonProperty("backupTimestamp")
    LocalDateTime backupTimestamp;
    @JsonProperty("uuid")
    UUID uuid;

    public Backup(List<InputMessage> inputMessages, 
      LocalDateTime backupTimestamp) {
        this.inputMessages = inputMessages;
        this.backupTimestamp = backupTimestamp;
        this.uuid = UUID.randomUUID();
    }
}

⚠️ 注意:UUID 的生成机制并不完美,存在重复风险,但对于本示例已足够。

我们要将 Backup 对象以 JSON 形式写入 Kafka,因此需要实现 SerializationSchema

public class BackupSerializationSchema
  implements SerializationSchema<Backup> {

    ObjectMapper objectMapper;
    Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class);

    @Override
    public byte[] serialize(Backup backupMessage) {
        if(objectMapper == null) {
            objectMapper = new ObjectMapper()
              .registerModule(new JavaTimeModule());
        }
        try {
            return objectMapper.writeValueAsString(backupMessage).getBytes();
        } catch (com.fasterxml.jackson.core.JsonProcessingException e) {
            logger.error("Failed to parse JSON", e);
        }
        return new byte[0];
    }
}

9. 消息时间戳

为确保每天的备份只包含当天的消息,需要为每条消息打上时间戳。

Flink 提供三种时间语义:EventTimeProcessingTimeIngestionTime

在本例中,我们希望使用消息发送时间,因此选择 EventTime

要使用 EventTime需要实现 TimestampAssigner 来提取时间戳

public class InputMessageTimestampAssigner 
  implements AssignerWithPunctuatedWatermarks<InputMessage> {
 
    @Override
    public long extractTimestamp(InputMessage element, 
      long previousElementTimestamp) {
        ZoneId zoneId = ZoneId.systemDefault();
        return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000;
    }

    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(InputMessage lastElement, 
      long extractedTimestamp) {
        return new Watermark(extractedTimestamp - 1500);
    }
}

我们将 LocalDateTime 转换为 Flink 所需的毫秒级时间戳。由于 toEpochSecond() 返回的是秒级时间戳,需要乘以 1000。

Flink 引入了 Watermark 概念,用于处理乱序数据。水印定义了允许的最大延迟时间,时间戳小于水印的数据将不会被处理。

10. 时间窗口划分

为确保备份只包含一天内的消息,可使用 timeWindowAll 方法将数据划分为时间窗口。

此外,还需将窗口内的消息聚合为 Backup 对象。

为此需实现自定义的 AggregateFunction

public class BackupAggregator 
  implements AggregateFunction<InputMessage, List<InputMessage>, Backup> {
 
    @Override
    public List<InputMessage> createAccumulator() {
        return new ArrayList<>();
    }

    @Override
    public List<InputMessage> add(
      InputMessage inputMessage,
      List<InputMessage> inputMessages) {
        inputMessages.add(inputMessage);
        return inputMessages;
    }

    @Override
    public Backup getResult(List<InputMessage> inputMessages) {
        return new Backup(inputMessages, LocalDateTime.now());
    }

    @Override
    public List<InputMessage> merge(List<InputMessage> inputMessages,
      List<InputMessage> acc1) {
        inputMessages.addAll(acc1);
        return inputMessages;
    }
}

11. 备份聚合

完成时间戳分配和聚合函数实现后,就可以开始处理 Kafka 输入数据:

public static void createBackup () throws Exception {
    String inputTopic = "flink_input";
    String outputTopic = "flink_output";
    String consumerGroup = "baeldung";
    String kafkaAddress = "192.168.99.100:9092";
    StreamExecutionEnvironment environment
      = StreamExecutionEnvironment.getExecutionEnvironment();
    environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    FlinkKafkaConsumer011<InputMessage> flinkKafkaConsumer
      = createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
    flinkKafkaConsumer.setStartFromEarliest();

    flinkKafkaConsumer.assignTimestampsAndWatermarks(
      new InputMessageTimestampAssigner());
    FlinkKafkaProducer011<Backup> flinkKafkaProducer
      = createBackupProducer(outputTopic, kafkaAddress);

    DataStream<InputMessage> inputMessagesStream
      = environment.addSource(flinkKafkaConsumer);

    inputMessagesStream
      .timeWindowAll(Time.hours(24))
      .aggregate(new BackupAggregator())
      .addSink(flinkKafkaProducer);

    environment.execute();
}

12. 总结

本文介绍了如何使用 Apache Flink 和 Apache Kafka 构建一个简单的数据管道。

示例代码可在 GitHub 查看。


原始标题:Building a Data Pipeline with Flink and Kafka