1. 概述

本文将介绍基于 Akka actor 框架构建的 akka-streams 库,该库遵循 响应式流宣言Akka Streams API 允许我们通过独立步骤轻松组合数据转换流

所有处理均以响应式、非阻塞和异步方式完成,确保高吞吐量和低延迟。

2. Maven 依赖

pom.xml 中添加以下依赖:

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_2.11</artifactId>
    <version>2.5.2</version>
</dependency>
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream-testkit_2.11</artifactId>
    <version>2.5.2</version>
</dependency>

3. Akka Streams API 核心概念

核心组件需重点掌握

  • Source:流处理的入口点
    可通过多种方式创建,例如:
    • single() 方法从单个 String 创建
    • Iterable 元素集合创建
  • Flow:核心处理单元
    每个 Flow 实例有且仅有一个输入和一个输出值
  • Materializer:用于执行副作用操作
    常见用法是传递 NotUsed 别名表示无副作用
  • Sink:终端操作
    触发整个 Flow 的计算执行(如数据存储、HTTP 请求等)

4. 创建 Akka Streams 的 Flow

4.1 使用 Flow 解析输入

创建 DataImporter 类:

public class DataImporter {
    private ActorSystem actorSystem;

    // 标准构造器和 getter...
}

添加 parseLine 方法(注意:这里使用 Java Stream API 解析字符串):

private List<Integer> parseLine(String line) {
    String[] fields = line.split(";");
    return Arrays.stream(fields)
      .map(Integer::parseInt)
      .collect(Collectors.toList());
}

创建输入解析 Flow:

private Flow<String, Integer, NotUsed> parseContent() {
    return Flow.of(String.class)
      .mapConcat(this::parseLine);
}

⚠️ 关键点
使用 mapConcat()(等同于 Java 8 的 flatMap())将 List<Integer> 展平为 Integer 流,避免后续处理步骤处理集合类型。

4.2 使用 Flow 执行计算

实现分组求平均逻辑:

private Flow<Integer, Double, NotUsed> computeAverage() {
    return Flow.of(Integer.class)
      .grouped(2) // 分组为元素对
      .mapAsyncUnordered(8, integers -> // 8线程并行计算
        CompletableFuture.supplyAsync(() -> integers.stream()
          .mapToDouble(v -> v)
          .average()
          .orElse(-1.0)));
}

优化技巧
使用 mapAsyncUnordered() 实现并行计算(8线程),无需关心结果顺序,提升吞吐量。

4.3 组合多个 Flow

通过 via() 方法链式组合 Flow:

Flow<String, Double, NotUsed> calculateAverage() {
    return Flow.of(String.class)
      .via(parseContent())    // 字符串 -> 整数流
      .via(computeAverage()); // 整数流 -> 平均值
}

💡 设计优势

  • 粒度化设计提升可测试性
  • 各步骤独立可复用
  • 声明式组合逻辑清晰

5. 为 Flow 添加 Sink

定义数据存储接口:

CompletionStage<Double> save(Double average) {
    return CompletableFuture.supplyAsync(() -> {
        // 写入数据库逻辑
        return average;
    });
}

创建存储型 Sink:

private Sink<Double, CompletionStage<Done>> storeAverages() {
    return Flow.of(Double.class)
      .mapAsyncUnordered(4, averageRepository::save) // 4线程并行存储
      .toMat(Sink.ignore(), Keep.right()); // 返回处理状态
}

⚠️ 注意事项

  • toMat() 需要指定 Sink.ignore()Keep.right() 才能返回 CompletionStage<Done>
  • 并行存储时需确保线程安全

6. 定义 Flow 的 Source

完整流处理实现:

CompletionStage<Done> calculateAverageForContent(String content) {
    return Source.single(content)              // 创建单元素 Source
      .via(calculateAverage())                 // 组合处理逻辑
      .runWith(storeAverages(), ActorMaterializer.create(actorSystem))
      .whenComplete((d, e) -> {                 // 结果回调
          if (d != null) {
              System.out.println("导入完成");
          } else {
              e.printStackTrace();
          }
      });
}

执行要点

  • runWith() 触发流执行
  • ActorMaterializer 负责实际资源分配
  • 回调处理成功/失败场景

7. 测试 Akka Streams

使用 akka-stream-testkit 进行测试:

@Test
public void givenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults() {
    // given
    Flow<String, Double, NotUsed> tested = new DataImporter(actorSystem).calculateAverage();
    String input = "1;9;11;0";

    // when
    Source<Double, NotUsed> flow = Source.single(input).via(tested);

    // then
    flow
      .runWith(TestSink.probe(actorSystem), ActorMaterializer.create(actorSystem))
      .request(4)           // 请求4个元素
      .expectNextUnordered(5d, 5.5); // 无序验证结果
}

⚠️ 测试技巧

  • TestSink.probe() 创建测试接收器
  • expectNextUnordered() 处理异步无序结果
  • 验证请求数量与预期结果匹配

8. 总结

本文系统介绍了 Akka Streams 的核心组件与使用方法:

  1. ✅ 通过组合 SourceFlowSink 构建响应式流
  2. ✅ 实现了分组平均值的完整计算流程
  3. ✅ 展示了并行计算与异步存储的优化方案
  4. ✅ 提供了基于 TestKit 的测试最佳实践

完整代码示例可在 GitHub 项目 中获取(Maven 工程可直接运行)。


原始标题:Guide to Akka Streams

« 上一篇: Java周报,180