1. 概述
本教程将带你了解 Apache Calcite。这是一个强大的数据管理框架,适用于各种数据访问场景。Calcite 专注于从任何数据源检索数据,而非存储数据。此外,其查询优化能力能实现更快、更高效的数据检索。
让我们深入探讨,先从 Apache Calcite 的典型应用场景开始。
2. Apache Calcite 应用场景
凭借其核心能力,Apache Calcite 可在以下场景中大显身手:
构建新数据库的查询引擎通常需要数年时间。但 Calcite 能让你立即上手,提供开箱即用的可扩展 SQL 解析器、验证器和优化器。它已被用于构建 HerdDB、Apache Druid、MapD 等众多数据库。
由于 Calcite 能集成多种数据库,它被广泛用于构建数据仓库和商业智能工具,例如 Apache Kyline、Apache Wayang、阿里巴巴 MaxCompute 等。
Calcite 也是流处理平台的核心组件,如 Apache Kafka、Apache Apex 和 Flink,助力构建实时数据展示与分析工具。
3. 任何数据,任何地方
Apache Calcite 提供现成的适配器(adapters)用于集成第三方数据源,包括 Cassandra、Elasticsearch、MongoDB 等。
让我们深入探讨这个特性。
3.1. 核心类解析
Apache Calcite 提供了强大的数据检索框架,该框架可扩展,因此也能创建自定义适配器。以下是几个关键 Java 类:
Calcite 适配器提供了如 ElasticsearchSchemaFactory
、MongoSchemaFactory
、FileSchemaFactory
等类,它们都实现了 SchemaFactory
接口。**SchemaFactory
通过创建在 JSON/YAML 模型文件中定义的虚拟 Schema
,以统一方式连接底层数据源**。
3.2. CSV 适配器实战
现在看个例子:用 SQL 查询读取 CSV 文件数据。首先在 pom.xml
中添加文件适配器所需的 Maven 依赖:
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.36.0</version>
</dependency>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-file</artifactId>
<version>1.36.0</version>
</dependency>
接下来在 model.json
中定义模型:
{
"version": "1.0",
"defaultSchema": "TRADES",
"schemas": [
{
"name": "TRADES",
"type": "custom",
"factory": "org.apache.calcite.adapter.file.FileSchemaFactory",
"operand": {
"directory": "trades"
}
}
]
}
model.json
中指定的 FileSchemaFactory
会扫描 trades
目录下的 CSV 文件,并创建虚拟的 TRADES
schema。随后,该目录下的 CSV 文件会被视为数据表。
在演示文件适配器前,先看看待查询的 trade.csv
文件:
tradeid:int,product:string,qty:int
232312123,"RFTXC",100
232312124,"RFUXC",200
232312125,"RFSXC",1000
该 CSV 文件包含三列:tradeid
、product
和 qty
,列头同时指定了数据类型。文件中共有三条交易记录。
最后看如何通过 Calcite 适配器获取数据:
@Test
void whenCsvSchema_thenQuerySuccess() throws SQLException {
Properties info = new Properties();
info.put("model", getPath("model.json"));
try (Connection connection = DriverManager.getConnection("jdbc:calcite:", info);) {
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select * from trades.trade");
assertEquals(3, resultSet.getMetaData().getColumnCount());
List<Integer> tradeIds = new ArrayList<>();
while (resultSet.next()) {
tradeIds.add(resultSet.getInt("tradeid"));
}
assertEquals(3, tradeIds.size());
}
}
Calcite 适配器通过 model
属性创建模拟文件系统的虚拟 schema,然后使用标准 JDBC 语义从 trade.csv
获取记录。
文件适配器不仅能读 CSV 文件,还支持 HTML 和 JSON 文件。对于高级 CSV 处理场景,Calcite 还提供了专用的 CSVSchemaFactory
。
3.3. Java 对象的内存 SQL 操作
类似 CSV 适配器,再看个例子:用 Apache Calcite 对 Java 对象执行 SQL 查询。
假设 CompanySchema
类中有 Employee
和 Department
两个数组:
public class CompanySchema {
public Employee[] employees;
public Department[] departments;
}
Employee
类定义如下:
public class Employee {
public String name;
public String id;
public String deptId;
public Employee(String name, String id, String deptId) {
this.name = name;
this.id = id;
this.deptId = deptId;
}
}
类似地定义 Department
类:
public class Department {
public String deptId;
public String deptName;
public Department(String deptId, String deptName) {
this.deptId = deptId;
this.deptName = deptName;
}
}
假设有三个部门:财务部、市场部和人力资源部。现在对 CompanySchema
对象执行查询,统计各部门员工数:
@Test
void whenQueryEmployeesObject_thenSuccess() throws SQLException {
Properties info = new Properties();
info.setProperty("lex", "JAVA");
Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
SchemaPlus rootSchema = calciteConnection.getRootSchema();
Schema schema = new ReflectiveSchema(companySchema);
rootSchema.add("company", schema);
Statement statement = calciteConnection.createStatement();
String query = "select dept.deptName, count(emp.id) "
+ "from company.employees as emp "
+ "join company.departments as dept "
+ "on (emp.deptId = dept.deptId) "
+ "group by dept.deptName";
assertDoesNotThrow(() -> {
ResultSet resultSet = statement.executeQuery(query);
while (resultSet.next()) {
logger.info("Dept Name:" + resultSet.getString(1)
+ " No. of employees:" + resultSet.getInt(2));
}
});
}
有趣的是,这段代码能正常运行并返回结果。方法中,Calcite 的 ReflectiveSchema
类为 CompanySchema
对象创建 Schema,然后执行 SQL 查询并通过标准 JDBC 语义获取记录。
这个例子证明:无论数据源是什么,Calcite 都能用 SQL 语句从任何地方获取数据。
4. 查询处理机制
查询处理是 Apache Calcite 的核心功能。
标准 JDBC 驱动或 SQL 客户端直接在数据库上执行查询。而 Apache Calcite 在解析和验证查询后,会智能优化查询以提高执行效率,节省资源并提升性能。
4.1. 查询处理步骤拆解
Calcite 提供了标准化的查询处理组件:
这些组件均可扩展以满足特定数据库需求。下面详细解析各步骤。
4.2. SQL 解析与验证
解析阶段,解析器将 SQL 查询转换为树状结构 AST(抽象语法树)。
假设针对 Teacher
和 Department
表执行以下 SQL:
Select Teacher.name, Department.name
From Teacher join
Department On (Department.deptid = Teacher.deptid)
Where Department.name = 'Science'
首先解析器将查询转换为 AST,并进行基础语法验证:
接着 验证器对节点进行语义验证:
✅ 验证函数和操作符
✅ 对照数据库目录验证表、列等数据库对象
4.3. 关系表达式构建器
验证通过后,关系表达式构建器使用通用关系操作符将语法树转换为:
-
LogicalTableScan
:从表读取数据 -
LogicalFilter
:按条件筛选行 -
LogicalProject
:选择特定列 -
LogicalJoin
:基于匹配值合并两个表的行
基于前述 AST,生成的逻辑关系表达式如下:
LogicalProject(
projects=[
$0.name AS name0,
$1.name AS name1
],
input=LogicalFilter(
condition=[
($1.name = 'Science')
],
input=LogicalJoin(
condition=[
($0.deptid = $1.deptid)
],
left=LogicalTableScan(table=[[Teacher]]),
right=LogicalTableScan(table=[[Department]])
)
)
)
表达式中 $0
和 $1
分别代表 Teacher
和 Department
表。本质上这是数学表达式,描述获取结果所需的操作,但不包含执行相关信息。
4.4. 查询优化器
Calcite 优化器对关系表达式应用优化策略,常见优化包括:
⚠️ 谓词下推:将过滤条件尽可能推近数据源,减少数据传输量
⚠️ 连接重排序:调整连接顺序以减少中间结果集
⚠️ 投影下推:下推投影操作,避免处理无关列
⚠️ 索引利用:识别并利用索引加速数据检索
4.5. 查询规划器、生成器与执行器
优化完成后,Calcite 查询规划器创建执行计划。执行计划指定查询引擎获取和处理数据的确切步骤,也称为后端查询引擎的物理计划。
然后 Calcite 查询生成器生成特定执行引擎的代码。
最后执行器连接数据库执行最终查询。
5. 总结
本文探讨了 Apache Calcite 的核心能力:它能为数据库快速提供标准化的 SQL 解析器、验证器和优化器。这让厂商免于耗时数年开发查询引擎,转而专注于后端存储实现。此外,Calcite 的现成适配器简化了多数据库连接,助力构建统一集成接口。
通过 Calcite,数据库开发者能加速产品上市,交付强大且灵活的 SQL 功能。
本文代码可在 GitHub 获取。