1. 简介

Parallel-collectors 是一个轻量级 Java 库,它提供了一组基于 Stream API 的 Collector 实现,用于实现并行处理数据流。与标准的并行 Stream 不同,它更适合处理 I/O 密集型任务,而不是仅限于 CPU 密集型任务。

2. Maven 依赖

要使用该库,只需要在 Maven 的 pom.xml 文件中添加如下依赖:

<dependency>
    <groupId>com.pivovarit</groupId>
    <artifactId>parallel-collectors</artifactId>
    <version>2.6.0</version>
</dependency>

如果是 Gradle 项目,添加如下一行即可:

implementation 'com.pivovarit:parallel-collectors:2.6.0'

最新版本可以在这里查看:Maven Central

3. 并行 Stream 的局限性

Java 8 引入的并行 Stream 是一个亮点,但它的设计初衷是为 CPU 密集型任务服务的。

其背后使用的是 JVM 全局共享的 ForkJoinPool,这意味着所有并行 Stream 都会共享这个线程池资源。这带来了几个问题:

✅ 优点:

  • 简单易用,开箱即用

❌ 缺点:

  • 无法自定义线程池
  • 所有并行 Stream 共享线程池,容易造成资源争用
  • 对于 I/O 操作(如网络请求、数据库查询)不友好,容易导致线程阻塞

举个例子:

List<Integer> ids = Arrays.asList(1, 2, 3);
List<String> results = ids.parallelStream()
  .map(i -> fetchById(i)) // 每次调用耗时 1s
  .collect(Collectors.toList());

System.out.println(results); // [user-1, user-2, user-3]

虽然看起来能并行执行,但如果同时运行多个阻塞任务,很容易耗尽线程池资源,造成延迟飙升。

⚠️ 因此,推荐做法是:为不同任务创建独立的线程池,避免相互干扰

虽然可以通过某些技巧为 Parallel Stream 自定义 ForkJoinPool,但这种方式依赖未公开 API,直到 JDK10 才修复部分问题。详见:JDK8190974

4. Parallel Collectors 实战

Parallel Collectors 本质是 Stream API 的 Collector 实现,但它在 collect 阶段实现并行处理。

主要入口类是 ParallelCollectors,它与标准的 Collectors 类似,提供了统一的 API 入口。

我们来看一个等效替代 Parallel Stream 的例子:

ExecutorService executor = Executors.newFixedThreadPool(10);

List<Integer> ids = Arrays.asList(1, 2, 3);

CompletableFuture<List<String>> results = ids.stream()
  .collect(ParallelCollectors.parallel(i -> fetchById(i), Collectors.toList(), executor, 4));

System.out.println(results.join()); // [user-1, user-2, user-3]

✅ 优势:

  • 可自定义线程池
  • 可指定并行度
  • 返回结果封装在 CompletableFuture 中,非阻塞

❌ 传统 Parallel Stream 做不到这些。

4.1 使用标准 Collector 并行收集

如果你希望并行处理并收集为 List、Set 等结构,可以直接使用:

List<Integer> ids = Arrays.asList(1, 2, 3);

CompletableFuture<List<String>> results = ids.stream()
  .collect(ParallelCollectors.parallel(i -> fetchById(i), Collectors.toList(), executor, 4));

assertThat(results.join()).containsExactly("user-1", "user-2", "user-3");

4.2 收集为 Stream

如果不想立即收集为具体集合,而是希望得到一个 Stream,可以这样写:

List<Integer> ids = Arrays.asList(1, 2, 3);

CompletableFuture<Stream<String>> results = ids.stream()
  .collect(ParallelCollectors.parallel(i -> fetchById(i), executor, 4));

assertThat(results.join()).containsExactly("user-1", "user-2", "user-3");

4.3 parallelToStream():按完成顺序返回结果

如果你不介意阻塞主线程,并且希望按完成顺序获取结果,可以使用:

List<Integer> ids = Arrays.asList(1, 2, 3);

Stream<String> result = ids.stream()
  .collect(ParallelCollectors.parallelToStream(i -> fetchByIdWithRandomDelay(i), executor, 4));

assertThat(result).contains("user-1", "user-2", "user-3");

⚠️ 注意:由于加入了随机延迟,每次执行结果顺序可能不同,因此测试时使用 contains() 而不是 containsExactly()

4.4 parallelToOrderedStream():保持原始顺序

如果你希望结果顺序与原始输入顺序一致,使用:

List<Integer> ids = Arrays.asList(1, 2, 3);

Stream<String> result = ids.stream()
  .collect(ParallelCollectors.parallelToOrderedStream(
      ParallelCollectorsUnitTest::fetchByIdWithRandomDelay, executor, 4));

assertThat(result).containsExactly("user-1", "user-2", "user-3");

✅ 优点:保持顺序
❌ 缺点:性能略差于 parallelToStream()

5. 局限性

虽然 Parallel Collectors 功能强大,但也有几个需要注意的限制:

5.1 不支持无限流

即使使用了短路操作(如 limit、findFirst),也无法处理无限流。这是 Stream API 内部机制决定的,因为 Collector 被视为非短路操作,必须处理完所有元素才能结束。

5.2 短路操作不中断其他任务

比如使用 findFirst(),即使找到了目标元素,也不会中断其他正在执行的任务。这是因为 CompletableFuture 本身不支持中断机制。

⚠️ 这点在实际使用中需要注意,避免资源浪费或响应延迟。

6. 总结

Parallel Collectors 提供了一种更灵活、可控的并行处理方式,适用于 I/O 密集型任务,同时支持:

✅ 自定义线程池
✅ 自定义并行度
✅ 返回 CompletableFuture 或 Stream
✅ 支持顺序/非顺序结果处理

对于需要精细控制线程资源、避免线程争用的场景,Parallel Collectors 是比 Parallel Stream 更优的选择。

完整示例代码已上传至 GitHub:示例项目地址(示例链接可根据实际项目替换)


原始标题:Guide to Java Parallel Collectors Library | Baeldung