1. 简介
本文将带你入门 Hazelcast Jet —— 由 Hazelcast 公司提供的分布式数据处理引擎,底层基于 Hazelcast IMDG 构建。
如果你对 Hazelcast IMDG 还不熟悉,可以先阅读我们的另一篇文章《Java 中使用 Hazelcast》快速上手。
2. 什么是 Hazelcast Jet?
Hazelcast Jet 是一个以流式处理为核心的分布式计算引擎。它不仅能处理 Kafka 这类实时数据流,也能高效处理数据库或文件中的静态数据。
✅ 核心能力之一是支持对无限数据流进行聚合分析。它是通过“窗口(windowing)”机制实现的:将连续的数据流切分为多个时间或数量区间,再对每个区间独立聚合。
Jet 支持集群部署,提交任务后,集群中所有节点会自动协同处理数据。每个节点负责一部分数据分片,天然具备横向扩展能力,轻松应对高吞吐场景。
典型应用场景包括:
- 实时流处理(如日志分析、监控告警)
- 高速批处理(替代传统 MapReduce)
- 分布式执行 Java 8 Stream 操作
- 微服务架构中的轻量级数据管道
⚠️ 注意:Jet 不是通用消息队列或存储系统,而是专注“计算”的引擎。数据源和结果通常对接外部系统(如 Kafka、HDFS、Map)。
3. 环境搭建
使用 Maven 的项目只需引入一个依赖即可:
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-jet</artifactId>
<version>4.2</version>
</dependency>
这个约 10MB 的 JAR 包已包含运行 Jet 所需的全部组件,开箱即用。
📌 最新版本可参考 Maven Central
4. 示例应用:分布式词频统计
我们通过一个实际例子来理解 Jet 的工作流程:输入若干句子和一个目标词,统计该词出现的总次数。
4.1 构建 Pipeline
Pipeline 是 Jet 应用的核心抽象,数据处理流程遵循三步走:
- ✅ 从 Source 读取数据
- 🔄 进行转换(filter、map、aggregate 等)
- ✅ 写入 Sink
我们的 Pipeline 实现如下:
private Pipeline createPipeLine() {
Pipeline p = Pipeline.create();
p.readFrom(Sources.<String>list(LIST_NAME))
.flatMap(word -> traverseArray(word.toLowerCase().split("\\W+")))
.filter(word -> !word.isEmpty())
.groupingKey(wholeItem())
.aggregate(counting())
.writeTo(Sinks.map(MAP_NAME));
return p;
}
各阶段说明:
阶段 | 操作 | 说明 |
---|---|---|
readFrom |
读取分布式 List | 数据源名称为 LIST_NAME |
flatMap |
拆分句子为单词 | 使用正则 \W+ 按非单词字符分割 |
filter |
过滤空字符串 | 避免空格或标点产生空词 |
groupingKey + aggregate |
分组计数 | 按单词本身分组,使用内置 counting() 聚合器 |
writeTo |
输出到分布式 Map | 结果存入 MAP_NAME 映射 |
✅
wholeItem()
是 Jet 提供的便捷函数,表示以元素自身作为分组键。
4.2 提交 Job 并获取结果
Pipeline 定义好后,需创建 Job 实例来执行:
public Long countWord(List<String> sentences, String word) {
long count = 0;
JetInstance jet = Jet.newJetInstance();
try {
List<String> textList = jet.getList(LIST_NAME);
textList.addAll(sentences);
Pipeline p = createPipeLine();
jet.newJob(p).join();
Map<String, Long> counts = jet.getMap(MAP_NAME);
count = counts.get(word);
} finally {
Jet.shutdownAll();
}
return count;
}
关键点解析:
Jet.newJetInstance()
:启动本地 Jet 节点,用于开发测试jet.getList()
/jet.getMap()
:获取分布式数据结构,自动在集群中共享jet.newJob(p).join()
:提交任务并同步阻塞等待完成。若任务失败会抛出异常Jet.shutdownAll()
:必须显式调用!否则 Jet 内部线程不退出,JVM 无法正常终止
❌ 踩坑提醒:忘记
shutdownAll()
是新手常见问题,会导致单元测试卡住或应用无法退出。
4.3 单元测试验证
@Test
public void whenGivenSentencesAndWord_ThenReturnCountOfWord() {
List<String> sentences = new ArrayList<>();
sentences.add("The first second was alright, but the second second was tough.");
WordCounter wordCounter = new WordCounter();
long countSecond = wordCounter.countWord(sentences, "second");
assertEquals(3, countSecond);
}
✅ 测试通过:句子中 "second" 出现了 3 次(注意大小写已统一处理)
5. 总结
Hazelcast Jet 提供了一套简洁高效的 API,让开发者能轻松构建分布式数据处理任务。其核心优势在于:
- 架构轻量,依赖少
- 支持批处理与流处理统一模型
- 与 Hazelcast IMDG 深度集成,状态管理方便
- 易于上手,适合微服务场景下的实时计算需求
如需深入了解 Jet 的高级特性(如容错、背压、复杂窗口等),建议查阅官方文档:Hazelcast Jet 用户手册
💡 示例代码已托管至 GitHub:https://github.com/dev-example/tutorials/tree/master/hazelcast