1. 概述
本文将介绍基于 Akka actor 框架构建的 akka-streams 库,该库遵循 响应式流宣言。Akka Streams API 允许我们通过独立步骤轻松组合数据转换流。
所有处理均以响应式、非阻塞和异步方式完成,确保高吞吐量和低延迟。
2. Maven 依赖
在 pom.xml
中添加以下依赖:
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream_2.11</artifactId>
<version>2.5.2</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-testkit_2.11</artifactId>
<version>2.5.2</version>
</dependency>
3. Akka Streams API 核心概念
✅ 核心组件需重点掌握:
- Source:流处理的入口点
可通过多种方式创建,例如:single()
方法从单个String
创建- 从
Iterable
元素集合创建
- Flow:核心处理单元
每个Flow
实例有且仅有一个输入和一个输出值 - Materializer:用于执行副作用操作
常见用法是传递NotUsed
别名表示无副作用 - Sink:终端操作
触发整个Flow
的计算执行(如数据存储、HTTP 请求等)
4. 创建 Akka Streams 的 Flow
4.1 使用 Flow 解析输入
创建 DataImporter
类:
public class DataImporter {
private ActorSystem actorSystem;
// 标准构造器和 getter...
}
添加 parseLine
方法(注意:这里使用 Java Stream API 解析字符串):
private List<Integer> parseLine(String line) {
String[] fields = line.split(";");
return Arrays.stream(fields)
.map(Integer::parseInt)
.collect(Collectors.toList());
}
创建输入解析 Flow:
private Flow<String, Integer, NotUsed> parseContent() {
return Flow.of(String.class)
.mapConcat(this::parseLine);
}
⚠️ 关键点:
使用 mapConcat()
(等同于 Java 8 的 flatMap()
)将 List<Integer>
展平为 Integer
流,避免后续处理步骤处理集合类型。
4.2 使用 Flow 执行计算
实现分组求平均逻辑:
private Flow<Integer, Double, NotUsed> computeAverage() {
return Flow.of(Integer.class)
.grouped(2) // 分组为元素对
.mapAsyncUnordered(8, integers -> // 8线程并行计算
CompletableFuture.supplyAsync(() -> integers.stream()
.mapToDouble(v -> v)
.average()
.orElse(-1.0)));
}
✅ 优化技巧:
使用 mapAsyncUnordered()
实现并行计算(8线程),无需关心结果顺序,提升吞吐量。
4.3 组合多个 Flow
通过 via()
方法链式组合 Flow:
Flow<String, Double, NotUsed> calculateAverage() {
return Flow.of(String.class)
.via(parseContent()) // 字符串 -> 整数流
.via(computeAverage()); // 整数流 -> 平均值
}
💡 设计优势:
- 粒度化设计提升可测试性
- 各步骤独立可复用
- 声明式组合逻辑清晰
5. 为 Flow 添加 Sink
定义数据存储接口:
CompletionStage<Double> save(Double average) {
return CompletableFuture.supplyAsync(() -> {
// 写入数据库逻辑
return average;
});
}
创建存储型 Sink:
private Sink<Double, CompletionStage<Done>> storeAverages() {
return Flow.of(Double.class)
.mapAsyncUnordered(4, averageRepository::save) // 4线程并行存储
.toMat(Sink.ignore(), Keep.right()); // 返回处理状态
}
⚠️ 注意事项:
toMat()
需要指定Sink.ignore()
和Keep.right()
才能返回CompletionStage<Done>
- 并行存储时需确保线程安全
6. 定义 Flow 的 Source
完整流处理实现:
CompletionStage<Done> calculateAverageForContent(String content) {
return Source.single(content) // 创建单元素 Source
.via(calculateAverage()) // 组合处理逻辑
.runWith(storeAverages(), ActorMaterializer.create(actorSystem))
.whenComplete((d, e) -> { // 结果回调
if (d != null) {
System.out.println("导入完成");
} else {
e.printStackTrace();
}
});
}
✅ 执行要点:
runWith()
触发流执行ActorMaterializer
负责实际资源分配- 回调处理成功/失败场景
7. 测试 Akka Streams
使用 akka-stream-testkit 进行测试:
@Test
public void givenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults() {
// given
Flow<String, Double, NotUsed> tested = new DataImporter(actorSystem).calculateAverage();
String input = "1;9;11;0";
// when
Source<Double, NotUsed> flow = Source.single(input).via(tested);
// then
flow
.runWith(TestSink.probe(actorSystem), ActorMaterializer.create(actorSystem))
.request(4) // 请求4个元素
.expectNextUnordered(5d, 5.5); // 无序验证结果
}
⚠️ 测试技巧:
TestSink.probe()
创建测试接收器expectNextUnordered()
处理异步无序结果- 验证请求数量与预期结果匹配
8. 总结
本文系统介绍了 Akka Streams 的核心组件与使用方法:
- ✅ 通过组合
Source
、Flow
和Sink
构建响应式流 - ✅ 实现了分组平均值的完整计算流程
- ✅ 展示了并行计算与异步存储的优化方案
- ✅ 提供了基于 TestKit 的测试最佳实践
完整代码示例可在 GitHub 项目 中获取(Maven 工程可直接运行)。