1. 简介
Parallel-collectors 是一个轻量级 Java 库,它提供了一组基于 Stream API 的 Collector 实现,用于实现并行处理数据流。与标准的并行 Stream 不同,它更适合处理 I/O 密集型任务,而不是仅限于 CPU 密集型任务。
2. Maven 依赖
要使用该库,只需要在 Maven 的 pom.xml
文件中添加如下依赖:
<dependency>
<groupId>com.pivovarit</groupId>
<artifactId>parallel-collectors</artifactId>
<version>2.6.0</version>
</dependency>
如果是 Gradle 项目,添加如下一行即可:
implementation 'com.pivovarit:parallel-collectors:2.6.0'
最新版本可以在这里查看:Maven Central
3. 并行 Stream 的局限性
Java 8 引入的并行 Stream 是一个亮点,但它的设计初衷是为 CPU 密集型任务服务的。
其背后使用的是 JVM 全局共享的 ForkJoinPool,这意味着所有并行 Stream 都会共享这个线程池资源。这带来了几个问题:
✅ 优点:
- 简单易用,开箱即用
❌ 缺点:
- 无法自定义线程池
- 所有并行 Stream 共享线程池,容易造成资源争用
- 对于 I/O 操作(如网络请求、数据库查询)不友好,容易导致线程阻塞
举个例子:
List<Integer> ids = Arrays.asList(1, 2, 3);
List<String> results = ids.parallelStream()
.map(i -> fetchById(i)) // 每次调用耗时 1s
.collect(Collectors.toList());
System.out.println(results); // [user-1, user-2, user-3]
虽然看起来能并行执行,但如果同时运行多个阻塞任务,很容易耗尽线程池资源,造成延迟飙升。
⚠️ 因此,推荐做法是:为不同任务创建独立的线程池,避免相互干扰。
虽然可以通过某些技巧为 Parallel Stream 自定义 ForkJoinPool,但这种方式依赖未公开 API,直到 JDK10 才修复部分问题。详见:JDK8190974
4. Parallel Collectors 实战
Parallel Collectors 本质是 Stream API 的 Collector 实现,但它在 collect 阶段实现并行处理。
主要入口类是 ParallelCollectors
,它与标准的 Collectors
类似,提供了统一的 API 入口。
我们来看一个等效替代 Parallel Stream 的例子:
ExecutorService executor = Executors.newFixedThreadPool(10);
List<Integer> ids = Arrays.asList(1, 2, 3);
CompletableFuture<List<String>> results = ids.stream()
.collect(ParallelCollectors.parallel(i -> fetchById(i), Collectors.toList(), executor, 4));
System.out.println(results.join()); // [user-1, user-2, user-3]
✅ 优势:
- 可自定义线程池
- 可指定并行度
- 返回结果封装在
CompletableFuture
中,非阻塞
❌ 传统 Parallel Stream 做不到这些。
4.1 使用标准 Collector 并行收集
如果你希望并行处理并收集为 List、Set 等结构,可以直接使用:
List<Integer> ids = Arrays.asList(1, 2, 3);
CompletableFuture<List<String>> results = ids.stream()
.collect(ParallelCollectors.parallel(i -> fetchById(i), Collectors.toList(), executor, 4));
assertThat(results.join()).containsExactly("user-1", "user-2", "user-3");
4.2 收集为 Stream
如果不想立即收集为具体集合,而是希望得到一个 Stream,可以这样写:
List<Integer> ids = Arrays.asList(1, 2, 3);
CompletableFuture<Stream<String>> results = ids.stream()
.collect(ParallelCollectors.parallel(i -> fetchById(i), executor, 4));
assertThat(results.join()).containsExactly("user-1", "user-2", "user-3");
4.3 parallelToStream()
:按完成顺序返回结果
如果你不介意阻塞主线程,并且希望按完成顺序获取结果,可以使用:
List<Integer> ids = Arrays.asList(1, 2, 3);
Stream<String> result = ids.stream()
.collect(ParallelCollectors.parallelToStream(i -> fetchByIdWithRandomDelay(i), executor, 4));
assertThat(result).contains("user-1", "user-2", "user-3");
⚠️ 注意:由于加入了随机延迟,每次执行结果顺序可能不同,因此测试时使用 contains()
而不是 containsExactly()
。
4.4 parallelToOrderedStream()
:保持原始顺序
如果你希望结果顺序与原始输入顺序一致,使用:
List<Integer> ids = Arrays.asList(1, 2, 3);
Stream<String> result = ids.stream()
.collect(ParallelCollectors.parallelToOrderedStream(
ParallelCollectorsUnitTest::fetchByIdWithRandomDelay, executor, 4));
assertThat(result).containsExactly("user-1", "user-2", "user-3");
✅ 优点:保持顺序
❌ 缺点:性能略差于 parallelToStream()
5. 局限性
虽然 Parallel Collectors 功能强大,但也有几个需要注意的限制:
5.1 不支持无限流
即使使用了短路操作(如 limit、findFirst),也无法处理无限流。这是 Stream API 内部机制决定的,因为 Collector 被视为非短路操作,必须处理完所有元素才能结束。
5.2 短路操作不中断其他任务
比如使用 findFirst()
,即使找到了目标元素,也不会中断其他正在执行的任务。这是因为 CompletableFuture
本身不支持中断机制。
⚠️ 这点在实际使用中需要注意,避免资源浪费或响应延迟。
6. 总结
Parallel Collectors 提供了一种更灵活、可控的并行处理方式,适用于 I/O 密集型任务,同时支持:
✅ 自定义线程池
✅ 自定义并行度
✅ 返回 CompletableFuture 或 Stream
✅ 支持顺序/非顺序结果处理
对于需要精细控制线程资源、避免线程争用的场景,Parallel Collectors 是比 Parallel Stream 更优的选择。
完整示例代码已上传至 GitHub:示例项目地址(示例链接可根据实际项目替换)