1. 简介
ksqlDB 是构建在 Apache Kafka 和 Kafka Streams 之上的实时事件流数据库。它将强大的流处理能力与关系型数据库模型结合,并通过 SQL 语法提供操作接口。
本教程将带你深入理解 ksqlDB 的核心概念,并通过构建一个实战应用来展示其典型应用场景。
2. 核心概念
作为事件流数据库,ksqlDB 的核心抽象是流(Stream)和表(Table)。本质上,它们都是可实时转换和处理的数据集合。
流处理能对这些无界的事件流进行持续计算。我们可以通过 SQL 对集合进行转换、过滤、聚合和连接操作,从而生成新集合或物化视图。当新事件不断到达时,这些集合和视图会持续更新,提供实时数据。
最终,查询会发布各种流处理操作的结果。ksqlDB 查询同时支持异步实时应用流和同步请求/响应流(类似传统数据库)。
3. 环境搭建
为演示 ksqlDB 的实际应用,我们将构建一个事件驱动的 Java 应用,用于聚合和查询来自多个传感器的无界数据流。
核心需求是:当特定时间段内传感器读数的平均值超过阈值时触发告警。关键要求是提供实时信息,可用于构建仪表盘或告警系统。
我们将使用 ksqlDB Java 客户端与服务器交互,创建表、聚合查询并执行各类查询操作。
3.1. Docker 环境
ksqlDB 运行在 Kafka 之上,我们将使用 Docker Compose 启动 Kafka 组件、ksqlDB 服务器和 CLI 客户端:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.3.2
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.3.2
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:9092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.3.2
container_name: ksqldb-cli
depends_on:
- broker
- ksqldb-server
entrypoint: /bin/sh
tty: true
我们还会在 Java 应用中使用该配置文件,通过 Testcontainers 框架为集成测试启动环境。
首先启动整个栈:
docker-compose up -d
服务启动后,连接交互式 CLI(用于测试和服务器交互):
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
设置 ksqlDB 从每个主题的最早位置开始消费:
SET 'auto.offset.reset' = 'earliest';
3.2. 项目依赖
本项目主要通过 Java 客户端与 ksqlDB 交互。我们使用 Confluent Platform (CP) 版本的 ksqlDB,需在 POM 文件中添加 CP Maven 仓库:
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
添加 客户端依赖:
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-api-client</artifactId>
<version>6.2.0</version>
</dependency>
4. 实时数据聚合
本节将创建应用所需的实时聚合物化视图。
4.1. 创建流
在 Kafka 中,主题存储事件集合。类似地,ksqlDB 中的流(Stream)表示事件,底层由 Kafka 主题支撑。
创建存储传感器数据的流:
CREATE STREAM readings (
sensor_id VARCHAR KEY,
timestamp VARCHAR,
value DOUBLE
) WITH (
KAFKA_TOPIC = 'readings',
VALUE_FORMAT = 'JSON',
TIMESTAMP = 'timestamp',
TIMESTAMP_FORMAT = 'yyyy-MM-dd HH:mm:ss'
);
ksqlDB 创建 readings
主题并以 JSON 格式存储流数据。由于事件代表时序数据,每个读数需包含事件时间戳。timestamp
字段按指定格式存储时间,确保 ksqlDB 对时间相关操作和乱序事件使用事件时间语义。
创建 Client 实例并执行 SQL 语句:
ClientOptions options = ClientOptions.create()
.setHost("localhost")
.setPort(8088);
Client client = Client.create(options);
Map<String, Object> properties = new HashMap<>();
properties.put("auto.offset.reset", "earliest");
client.executeStatement(
"CREATE STREAM readings (" +
" sensor_id VARCHAR KEY," +
" timestamp VARCHAR," +
" value DOUBLE" +
") WITH (" +
" KAFKA_TOPIC = 'readings'," +
" VALUE_FORMAT = 'JSON'," +
" TIMESTAMP = 'timestamp'," +
" TIMESTAMP_FORMAT = 'yyyy-MM-dd HH:mm:ss'" +
");",
properties
).get();
与 CLI 操作类似,设置 auto.offset.reset
为 "earliest",确保在无 Kafka 偏移量时从最早位置读取主题数据。
executeStatement
方法属于客户端的异步 API,在向服务器发送请求前立即返回 CompletableFuture
。调用代码可选择阻塞等待完成(调用 get
或 join
)或执行其他非阻塞操作。
4.2. 创建物化视图
有了底层事件流后,我们从 readings
流派生 alerts
表。持久化查询(或物化视图)在服务器上持续运行,处理源流或表中的事件。
需求是:当单个传感器在 30 分钟窗口内的平均读数超过 25 时触发告警:
CREATE TABLE alerts WITH (
KAFKA_TOPIC = 'alerts',
VALUE_FORMAT = 'JSON'
) AS
SELECT
sensor_id,
TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss') AS window_start,
TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss') AS window_end,
AVG(value) AS avg_value
FROM readings
WINDOW TUMBLING (SIZE 30 MINUTES)
GROUP BY sensor_id
HAVING AVG(value) > 25;
查询使用 30 分钟的滚动窗口按传感器聚合事件,并用 TIMESTAMPTOSTRING
函数转换 UNIX 时间戳为可读格式。
关键点:物化视图仅在新事件成功集成到聚合函数后更新数据。
使用客户端异步执行语句创建物化视图:
client.executeStatement(
"CREATE TABLE alerts WITH (" +
" KAFKA_TOPIC = 'alerts'," +
" VALUE_FORMAT = 'JSON'" +
") AS " +
"SELECT " +
" sensor_id, " +
" TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss') AS window_start, " +
" TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss') AS window_end, " +
" AVG(value) AS avg_value " +
"FROM readings " +
"WINDOW TUMBLING (SIZE 30 MINUTES) " +
"GROUP BY sensor_id " +
"HAVING AVG(value) > 25;",
properties
).get();
创建后,视图以增量方式更新,这是实现高效实时查询的关键。
4.3. 插入示例数据
运行查询前,生成 10 分钟间隔的传感器读数样本事件:
List<CompletableFuture<Void>> insertFutures = new ArrayList<>();
insertFutures.add(client.insertInto("readings",
new KsqlObject()
.put("sensor_id", "sensor-1")
.put("timestamp", "2023-01-01 00:00:00")
.put("value", 30.0)
));
insertFutures.add(client.insertInto("readings",
new KsqlObject()
.put("sensor_id", "sensor-1")
.put("timestamp", "2023-01-01 00:10:00")
.put("value", 20.0)
));
insertFutures.add(client.insertInto("readings",
new KsqlObject()
.put("sensor_id", "sensor-1")
.put("timestamp", "2023-01-01 00:20:00")
.put("value", 25.0)
));
CompletableFuture.allOf(
insertFutures.toArray(new CompletableFuture[0])
).get();
使用 KsqlObject
为流列提供键值映射,将所有插入操作合并到单个 Future
中,在所有底层 CompletableFuture
完成时结束。
5. 查询数据
查询允许应用获取物化视图数据,分为两类:
5.1. 推送查询(Push Query)
此类查询将持续更新的结果流推送给客户端。特别适合异步应用流,使客户端能实时响应新信息。
与持久化查询不同,服务器不将此类查询结果存储在 Kafka 主题中。因此应保持查询简单,将复杂操作移至持久化查询。
创建推送查询订阅 alerts
物化视图的结果:
SELECT * FROM alerts EMIT CHANGES;
注意 EMIT CHANGES
子句,它将所有变更推送给客户端。由于查询无限制,将持续流式传输结果直到终止。
订阅查询结果获取流数据:
StreamedQueryResult streamedResult = client.streamQuery(
"SELECT * FROM alerts EMIT CHANGES;",
properties
).get();
streamedResult.subscribe(new Subscriber<Row>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Row row) {
Alert alert = new Alert(
row.getString("sensor_id"),
row.getString("window_start"),
row.getString("window_end"),
row.getDouble("avg_value")
);
System.out.println("Received alert: " + alert);
subscription.request(1);
}
@Override
public void onError(Throwable t) {
System.err.println("Error: " + t.getMessage());
}
@Override
public void onComplete() {
System.out.println("Query completed");
}
});
调用 streamQuery
方法返回 StreamedQueryResult
用于获取流数据。该类扩展了 Reactive Streams 的 Publisher
接口,因此可通过响应式Subscriber异步消费结果。示例中的订阅者是简单的 Reactive Streams 实现,接收 ksqlDB 行数据并转换为 Alert
POJO。
使用 Compose 文件和 Testcontainers 的 DockerComposeContainer 测试:
@ClassRule
public static DockerComposeContainer environment =
new DockerComposeContainer(new File("docker-compose.yml"));
@Test
public void whenInsertReadings_thenShouldReceiveAlerts() throws Exception {
// 插入测试数据(代码略)
// 验证订阅者接收到告警(代码略)
}
启动完整 ksqlDB 环境进行集成测试:向流插入样本数据,ksqlDB 执行窗口聚合,最后验证订阅者按预期消费最新告警。
5.2. 拉取查询(Pull Query)
与推送查询相反,拉取查询检索非动态更新的数据,类似传统 RDBMS。此类查询立即返回有限结果集,适合同步请求/响应应用流。
示例:查询特定传感器触发的所有告警:
SELECT * FROM alerts WHERE sensor_id = 'sensor-1';
与推送查询不同,该查询返回执行时物化视图的所有可用数据,适合查询物化视图的当前状态。
5.3. 其他操作
客户端 API 文档 提供了其他操作详情,如描述源、列出流/表/主题、终止查询等。
6. 总结
本文介绍了支撑 ksqlDB 作为高效事件流数据库的流、表和查询核心概念。
通过简洁可组合的 SQL 构建块,我们实现了简单的响应式应用。还展示了如何使用 Java 客户端创建流和表,对物化视图执行查询并获取实时数据。
完整源代码请访问 GitHub。