1. 简介

ksqlDB 是构建在 Apache KafkaKafka Streams 之上的实时事件流数据库。它将强大的流处理能力与关系型数据库模型结合,并通过 SQL 语法提供操作接口。

本教程将带你深入理解 ksqlDB 的核心概念,并通过构建一个实战应用来展示其典型应用场景。

2. 核心概念

作为事件流数据库,ksqlDB 的核心抽象是流(Stream)和表(Table)。本质上,它们都是可实时转换和处理的数据集合。

流处理能对这些无界的事件流进行持续计算。我们可以通过 SQL 对集合进行转换、过滤、聚合和连接操作,从而生成新集合或物化视图。当新事件不断到达时,这些集合和视图会持续更新,提供实时数据。

最终,查询会发布各种流处理操作的结果。ksqlDB 查询同时支持异步实时应用流和同步请求/响应流(类似传统数据库)

3. 环境搭建

为演示 ksqlDB 的实际应用,我们将构建一个事件驱动的 Java 应用,用于聚合和查询来自多个传感器的无界数据流。

核心需求是:当特定时间段内传感器读数的平均值超过阈值时触发告警。关键要求是提供实时信息,可用于构建仪表盘或告警系统。

我们将使用 ksqlDB Java 客户端与服务器交互,创建表、聚合查询并执行各类查询操作。

3.1. Docker 环境

ksqlDB 运行在 Kafka 之上,我们将使用 Docker Compose 启动 Kafka 组件、ksqlDB 服务器和 CLI 客户端:

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.3.2
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.3.2
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:9092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:7.3.2
    container_name: ksqldb-cli
    depends_on:
      - broker
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

我们还会在 Java 应用中使用该配置文件,通过 Testcontainers 框架为集成测试启动环境。

首先启动整个栈:

docker-compose up -d

服务启动后,连接交互式 CLI(用于测试和服务器交互):

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

设置 ksqlDB 从每个主题的最早位置开始消费:

SET 'auto.offset.reset' = 'earliest';

3.2. 项目依赖

本项目主要通过 Java 客户端与 ksqlDB 交互。我们使用 Confluent Platform (CP) 版本的 ksqlDB,需在 POM 文件中添加 CP Maven 仓库

<repository>
    <id>confluent</id>
    <url>https://packages.confluent.io/maven/</url>
</repository>

添加 客户端依赖

<dependency>
    <groupId>io.confluent.ksql</groupId>
    <artifactId>ksqldb-api-client</artifactId>
    <version>6.2.0</version>
</dependency>

4. 实时数据聚合

本节将创建应用所需的实时聚合物化视图。

4.1. 创建流

在 Kafka 中,主题存储事件集合。类似地,ksqlDB 中的流(Stream)表示事件,底层由 Kafka 主题支撑

创建存储传感器数据的流:

CREATE STREAM readings (
    sensor_id VARCHAR KEY,
    timestamp VARCHAR,
    value DOUBLE
) WITH (
    KAFKA_TOPIC = 'readings',
    VALUE_FORMAT = 'JSON',
    TIMESTAMP = 'timestamp',
    TIMESTAMP_FORMAT = 'yyyy-MM-dd HH:mm:ss'
);

ksqlDB 创建 readings 主题并以 JSON 格式存储流数据。由于事件代表时序数据,每个读数需包含事件时间戳。timestamp 字段按指定格式存储时间,确保 ksqlDB 对时间相关操作和乱序事件使用事件时间语义。

创建 Client 实例并执行 SQL 语句:

ClientOptions options = ClientOptions.create()
    .setHost("localhost")
    .setPort(8088);

Client client = Client.create(options);

Map<String, Object> properties = new HashMap<>();
properties.put("auto.offset.reset", "earliest");

client.executeStatement(
    "CREATE STREAM readings (" +
    "  sensor_id VARCHAR KEY," +
    "  timestamp VARCHAR," +
    "  value DOUBLE" +
    ") WITH (" +
    "  KAFKA_TOPIC = 'readings'," +
    "  VALUE_FORMAT = 'JSON'," +
    "  TIMESTAMP = 'timestamp'," +
    "  TIMESTAMP_FORMAT = 'yyyy-MM-dd HH:mm:ss'" +
    ");",
    properties
).get();

与 CLI 操作类似,设置 auto.offset.reset 为 "earliest",确保在无 Kafka 偏移量时从最早位置读取主题数据。

executeStatement 方法属于客户端的异步 API,在向服务器发送请求前立即返回 CompletableFuture。调用代码可选择阻塞等待完成(调用 getjoin)或执行其他非阻塞操作。

4.2. 创建物化视图

有了底层事件流后,我们从 readings 流派生 alerts 表。持久化查询(或物化视图)在服务器上持续运行,处理源流或表中的事件

需求是:当单个传感器在 30 分钟窗口内的平均读数超过 25 时触发告警:

CREATE TABLE alerts WITH (
    KAFKA_TOPIC = 'alerts',
    VALUE_FORMAT = 'JSON'
) AS
SELECT
    sensor_id,
    TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss') AS window_start,
    TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss') AS window_end,
    AVG(value) AS avg_value
FROM readings
WINDOW TUMBLING (SIZE 30 MINUTES)
GROUP BY sensor_id
HAVING AVG(value) > 25;

查询使用 30 分钟的滚动窗口按传感器聚合事件,并用 TIMESTAMPTOSTRING 函数转换 UNIX 时间戳为可读格式。

关键点:物化视图仅在新事件成功集成到聚合函数后更新数据

使用客户端异步执行语句创建物化视图:

client.executeStatement(
    "CREATE TABLE alerts WITH (" +
    "  KAFKA_TOPIC = 'alerts'," +
    "  VALUE_FORMAT = 'JSON'" +
    ") AS " +
    "SELECT " +
    "  sensor_id, " +
    "  TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss') AS window_start, " +
    "  TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss') AS window_end, " +
    "  AVG(value) AS avg_value " +
    "FROM readings " +
    "WINDOW TUMBLING (SIZE 30 MINUTES) " +
    "GROUP BY sensor_id " +
    "HAVING AVG(value) > 25;",
    properties
).get();

创建后,视图以增量方式更新,这是实现高效实时查询的关键。

4.3. 插入示例数据

运行查询前,生成 10 分钟间隔的传感器读数样本事件:

List<CompletableFuture<Void>> insertFutures = new ArrayList<>();

insertFutures.add(client.insertInto("readings",
    new KsqlObject()
        .put("sensor_id", "sensor-1")
        .put("timestamp", "2023-01-01 00:00:00")
        .put("value", 30.0)
));

insertFutures.add(client.insertInto("readings",
    new KsqlObject()
        .put("sensor_id", "sensor-1")
        .put("timestamp", "2023-01-01 00:10:00")
        .put("value", 20.0)
));

insertFutures.add(client.insertInto("readings",
    new KsqlObject()
        .put("sensor_id", "sensor-1")
        .put("timestamp", "2023-01-01 00:20:00")
        .put("value", 25.0)
));

CompletableFuture.allOf(
    insertFutures.toArray(new CompletableFuture[0])
).get();

使用 KsqlObject 为流列提供键值映射,将所有插入操作合并到单个 Future 中,在所有底层 CompletableFuture 完成时结束。

5. 查询数据

查询允许应用获取物化视图数据,分为两类:

5.1. 推送查询(Push Query)

此类查询将持续更新的结果流推送给客户端。特别适合异步应用流,使客户端能实时响应新信息。

与持久化查询不同,服务器不将此类查询结果存储在 Kafka 主题中。因此应保持查询简单,将复杂操作移至持久化查询

创建推送查询订阅 alerts 物化视图的结果:

SELECT * FROM alerts EMIT CHANGES;

注意 EMIT CHANGES 子句,它将所有变更推送给客户端。由于查询无限制,将持续流式传输结果直到终止。

订阅查询结果获取流数据:

StreamedQueryResult streamedResult = client.streamQuery(
    "SELECT * FROM alerts EMIT CHANGES;",
    properties
).get();

streamedResult.subscribe(new Subscriber<Row>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(Row row) {
        Alert alert = new Alert(
            row.getString("sensor_id"),
            row.getString("window_start"),
            row.getString("window_end"),
            row.getDouble("avg_value")
        );
        System.out.println("Received alert: " + alert);
        subscription.request(1);
    }

    @Override
    public void onError(Throwable t) {
        System.err.println("Error: " + t.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Query completed");
    }
});

调用 streamQuery 方法返回 StreamedQueryResult 用于获取流数据。该类扩展了 Reactive StreamsPublisher 接口,因此可通过响应式Subscriber异步消费结果。示例中的订阅者是简单的 Reactive Streams 实现,接收 ksqlDB 行数据并转换为 Alert POJO。

使用 Compose 文件和 Testcontainers 的 DockerComposeContainer 测试:

@ClassRule
public static DockerComposeContainer environment =
    new DockerComposeContainer(new File("docker-compose.yml"));

@Test
public void whenInsertReadings_thenShouldReceiveAlerts() throws Exception {
    // 插入测试数据(代码略)
    // 验证订阅者接收到告警(代码略)
}

启动完整 ksqlDB 环境进行集成测试:向流插入样本数据,ksqlDB 执行窗口聚合,最后验证订阅者按预期消费最新告警。

5.2. 拉取查询(Pull Query)

与推送查询相反,拉取查询检索非动态更新的数据,类似传统 RDBMS。此类查询立即返回有限结果集,适合同步请求/响应应用流

示例:查询特定传感器触发的所有告警:

SELECT * FROM alerts WHERE sensor_id = 'sensor-1';

与推送查询不同,该查询返回执行时物化视图的所有可用数据,适合查询物化视图的当前状态。

5.3. 其他操作

客户端 API 文档 提供了其他操作详情,如描述源、列出流/表/主题、终止查询等。

6. 总结

本文介绍了支撑 ksqlDB 作为高效事件流数据库的流、表和查询核心概念。

通过简洁可组合的 SQL 构建块,我们实现了简单的响应式应用。还展示了如何使用 Java 客户端创建流和表,对物化视图执行查询并获取实时数据。

完整源代码请访问 GitHub


原始标题:Introduction to ksqlDB