1. 简介

本文将带你入门 Hazelcast Jet —— 由 Hazelcast 公司提供的分布式数据处理引擎,底层基于 Hazelcast IMDG 构建。

如果你对 Hazelcast IMDG 还不熟悉,可以先阅读我们的另一篇文章《Java 中使用 Hazelcast》快速上手。

2. 什么是 Hazelcast Jet?

Hazelcast Jet 是一个以流式处理为核心的分布式计算引擎。它不仅能处理 Kafka 这类实时数据流,也能高效处理数据库或文件中的静态数据。

✅ 核心能力之一是支持对无限数据流进行聚合分析。它是通过“窗口(windowing)”机制实现的:将连续的数据流切分为多个时间或数量区间,再对每个区间独立聚合。

Jet 支持集群部署,提交任务后,集群中所有节点会自动协同处理数据。每个节点负责一部分数据分片,天然具备横向扩展能力,轻松应对高吞吐场景。

典型应用场景包括:

  • 实时流处理(如日志分析、监控告警)
  • 高速批处理(替代传统 MapReduce)
  • 分布式执行 Java 8 Stream 操作
  • 微服务架构中的轻量级数据管道

⚠️ 注意:Jet 不是通用消息队列或存储系统,而是专注“计算”的引擎。数据源和结果通常对接外部系统(如 Kafka、HDFS、Map)。

3. 环境搭建

使用 Maven 的项目只需引入一个依赖即可:

<dependency>
    <groupId>com.hazelcast.jet</groupId>
    <artifactId>hazelcast-jet</artifactId>
    <version>4.2</version>
</dependency>

这个约 10MB 的 JAR 包已包含运行 Jet 所需的全部组件,开箱即用。

📌 最新版本可参考 Maven Central

4. 示例应用:分布式词频统计

我们通过一个实际例子来理解 Jet 的工作流程:输入若干句子和一个目标词,统计该词出现的总次数。

4.1 构建 Pipeline

Pipeline 是 Jet 应用的核心抽象,数据处理流程遵循三步走:

  1. ✅ 从 Source 读取数据
  2. 🔄 进行转换(filter、map、aggregate 等)
  3. ✅ 写入 Sink

我们的 Pipeline 实现如下:

private Pipeline createPipeLine() {
    Pipeline p = Pipeline.create();
    p.readFrom(Sources.<String>list(LIST_NAME))
      .flatMap(word -> traverseArray(word.toLowerCase().split("\\W+")))
      .filter(word -> !word.isEmpty())
      .groupingKey(wholeItem())
      .aggregate(counting())
      .writeTo(Sinks.map(MAP_NAME));
    return p;
}

各阶段说明:

阶段 操作 说明
readFrom 读取分布式 List 数据源名称为 LIST_NAME
flatMap 拆分句子为单词 使用正则 \W+ 按非单词字符分割
filter 过滤空字符串 避免空格或标点产生空词
groupingKey + aggregate 分组计数 按单词本身分组,使用内置 counting() 聚合器
writeTo 输出到分布式 Map 结果存入 MAP_NAME 映射

wholeItem() 是 Jet 提供的便捷函数,表示以元素自身作为分组键。

4.2 提交 Job 并获取结果

Pipeline 定义好后,需创建 Job 实例来执行:

public Long countWord(List<String> sentences, String word) {
    long count = 0;
    JetInstance jet = Jet.newJetInstance();
    try {
        List<String> textList = jet.getList(LIST_NAME);
        textList.addAll(sentences);
        Pipeline p = createPipeLine();
        jet.newJob(p).join();
        Map<String, Long> counts = jet.getMap(MAP_NAME);
        count = counts.get(word);
    } finally {
        Jet.shutdownAll();
    }
    return count;
}

关键点解析:

  • Jet.newJetInstance():启动本地 Jet 节点,用于开发测试
  • jet.getList() / jet.getMap():获取分布式数据结构,自动在集群中共享
  • jet.newJob(p).join():提交任务并同步阻塞等待完成。若任务失败会抛出异常
  • Jet.shutdownAll():必须显式调用!否则 Jet 内部线程不退出,JVM 无法正常终止

❌ 踩坑提醒:忘记 shutdownAll() 是新手常见问题,会导致单元测试卡住或应用无法退出。

4.3 单元测试验证

@Test
public void whenGivenSentencesAndWord_ThenReturnCountOfWord() {
    List<String> sentences = new ArrayList<>();
    sentences.add("The first second was alright, but the second second was tough.");
    WordCounter wordCounter = new WordCounter();
    long countSecond = wordCounter.countWord(sentences, "second");
    assertEquals(3, countSecond);
}

✅ 测试通过:句子中 "second" 出现了 3 次(注意大小写已统一处理)

5. 总结

Hazelcast Jet 提供了一套简洁高效的 API,让开发者能轻松构建分布式数据处理任务。其核心优势在于:

  • 架构轻量,依赖少
  • 支持批处理与流处理统一模型
  • 与 Hazelcast IMDG 深度集成,状态管理方便
  • 易于上手,适合微服务场景下的实时计算需求

如需深入了解 Jet 的高级特性(如容错、背压、复杂窗口等),建议查阅官方文档:Hazelcast Jet 用户手册

💡 示例代码已托管至 GitHub:https://github.com/dev-example/tutorials/tree/master/hazelcast


原始标题:Introduction to Hazelcast Jet