1. 概述

本文将聚焦于在 Java 中使用 Reactive Extensions (Rx) 来组合和消费数据序列。

乍看之下,RxJava 的 API 可能与 Java 8 Stream 相似,但它实际上更灵活且流畅,是一种强大的编程范式。

若想深入了解 RxJava,推荐阅读这篇关于背压的文章

2. 环境配置

在 Maven 项目中使用 RxJava,需在 pom.xml 中添加以下依赖:

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxjava</artifactId>
    <version>${rx.java.version}</version>
</dependency>

Gradle 项目则添加:

compile 'io.reactivex.rxjava:rxjava:x.y.z'

3. 函数式响应式概念

函数式编程:通过组合纯函数构建软件,避免共享状态、可变数据和副作用。
⚠️ 响应式编程:一种异步编程范式,关注数据流和变化的传播。

两者结合形成 函数式响应式编程,优雅地处理事件驱动场景——数据随时间变化,消费者实时响应。该技术有多个实现,社区通过文档定义了通用术语。

3.1. 响应式宣言

响应式宣言 定义了软件系统的行业标准。简单来说,响应式系统需满足:

  • 响应性:系统需及时响应
  • 消息驱动:组件间通过异步消息传递实现松耦合
  • 弹性:高负载下保持响应
  • 容错性:部分组件故障时仍能响应

4. Observable

Rx 中有两个核心类型:

  • Observable:表示可从数据源获取数据的对象,其状态变化可被其他对象订阅
  • Observer:订阅 Observable 的对象,在状态变化时接收通知

Observer 订阅 Observable 序列后,序列会逐个发送数据项。Observer 处理完当前项才会处理下一项。若事件异步涌入,需排队或丢弃。

Rx 保证:Observer 不会收到乱序数据,也不会在前一个回调未完成时被调用。

4.1. Observable 类型

分为两类:

  • 非阻塞型:支持异步执行,可在事件流任意位置取消订阅(本文重点)
  • 阻塞型:所有 onNext 调用同步执行,无法中途取消订阅。可通过 toBlocking 转换:
BlockingObservable<String> blockingObservable = observable.toBlocking();

4.2. 操作符

操作符是接收源 Observable 并返回目标 Observable 的函数。对源 Observable 发射的每个数据项应用函数后,将结果发射到目标 Observable。

操作符可链式组合,构建复杂的数据流过滤逻辑。但需注意:若 Observable 发射速度超过操作符/Observer 的处理速度,可能引发背压问题(详见这里)。

4.3. 创建 Observable

基础操作符 just 创建发射单个数据的 Observable(如字符串 "Hello")。通过实现 Observer 接口并调用 subscribe 获取数据:

Observable<String> observable = Observable.just("Hello");
observable.subscribe(s -> result = s);
 
assertTrue(result.equals("Hello"));

4.4. OnNext、OnError 和 OnCompleted

Observer 接口有三个关键方法:

  1. OnNext:每次新事件发布时调用,在此处理事件
  2. OnCompleted:事件序列完成时调用,表示后续无 onNext
  3. OnError:框架或事件处理代码抛出未捕获异常时调用

subscribe 方法返回 Subscription 对象:

String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
Observable<String> observable = Observable.from(letters);
observable.subscribe(
  i -> result += i,  //OnNext
  Throwable::printStackTrace, //OnError
  () -> result += "_Completed" //OnCompleted
);
assertTrue(result.equals("abcdefg_Completed"));

5. Observable 转换与条件操作符

5.1. Map

map 操作符通过函数转换 Observable 发射的数据项。

假设有字母数组,需转为大写输出:

Observable.from(letters)
  .map(String::toUpperCase)
  .subscribe(letter -> result += letter);
assertTrue(result.equals("ABCDEFG"));

flatMap 用于扁平化嵌套的 Observablemap 与 flatMap 的区别)。

假设方法返回 Observable<String>,打印每个字符串对应的标题列表:

Observable<String> getTitle() {
    return Observable.from(titleList);
}
Observable.just("book1", "book2")
  .flatMap(s -> getTitle())
  .subscribe(l -> result += l);

assertTrue(result.equals("titletitle"));

5.2. Scan

scan 操作符对 Observable 发射的每个数据项顺序应用函数,并发射每次计算结果。可用于在事件间传递状态:

String[] letters = {"a", "b", "c"};
Observable.from(letters)
  .scan(new StringBuilder(), StringBuilder::append)
  .subscribe(total -> result += total.toString());
assertTrue(result.equals("aababc"));

5.3. GroupBy

groupBy 将输入 Observable 的事件分类到输出组

假设有 0-10 的整数数组,按奇偶分组:

Observable.from(numbers)
  .groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD")
  .subscribe(group ->
    group.subscribe((number) -> {
        if (group.getKey().toString().equals("EVEN")) {
            EVEN[0] += number;
        } else {
            ODD[0] += number;
        }
    })
  );
assertTrue(EVEN[0].equals("0246810"));
assertTrue(ODD[0].equals("13579"));

5.4. Filter

filter 操作符仅发射通过谓词测试的数据项。过滤整数数组中的奇数:

Observable.from(numbers)
  .filter(i -> (i % 2 == 1))
  .subscribe(i -> result += i);
 
assertTrue(result.equals("13579"));

5.5. 条件操作符

defaultIfEmpty:源 Observable 为空时发射默认值:

Observable.empty()
  .defaultIfEmpty("Observable is empty")
  .subscribe(s -> result += s);
 
assertTrue(result.equals("Observable is empty"));

若数组非空,则发射首字母(如 "a"):

Observable.from(letters)
  .defaultIfEmpty("Observable is empty")
  .first()
  .subscribe(s -> result += s);
 
assertTrue(result.equals("a"));

takeWhile:在条件变为 false丢弃后续数据项

Observable.from(numbers)
  .takeWhile(i -> i < 5)
  .subscribe(s -> sum[0] += s);
 
assertTrue(sum[0] == 10);

其他操作符如 ContainsSkipWhileSkipUntilTakeUntil 等可按需选用。

6. ConnectableObservable

ConnectableObservable 类似普通 Observable,但订阅后不会立即发射数据,需调用 connect() 才开始。

这允许等待所有 Observer 订阅后再发射数据:

String[] result = {""};
ConnectableObservable<Long> connectable
  = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
connectable.subscribe(i -> result[0] += i);
assertFalse(result[0].equals("01"));

connectable.connect();
Thread.sleep(500);
 
assertTrue(result[0].equals("01"));

7. Single

Single 是特殊 Observable,仅发射单个数据或错误通知。订阅方式:

  • onSuccess:成功时调用指定方法
  • onError:错误时立即通知订阅者
String[] result = {""};
Single<String> single = Observable.just("Hello")
  .toSingle()
  .doOnSuccess(i -> result[0] += i)
  .doOnError(error -> {
      throw new RuntimeException(error.getMessage());
  });
single.subscribe();
 
assertTrue(result[0].equals("Hello"));

8. Subject

Subject 同时是 Subscriber 和 Observable。作为 Subscriber 可发布多个 Observable 的事件;作为 Observable 可将事件转发给订阅者。

以下示例展示 Observer 仅能收到订阅后的事件:

Integer subscriber1 = 0;
Integer subscriber2 = 0;
Observer<Integer> getFirstObserver() {
    return new Observer<Integer>() {
        @Override
        public void onNext(Integer value) {
           subscriber1 += value;
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("error");
        }

        @Override
        public void onCompleted() {
            System.out.println("Subscriber1 completed");
        }
    };
}

Observer<Integer> getSecondObserver() {
    return new Observer<Integer>() {
        @Override
        public void onNext(Integer value) {
            subscriber2 += value;
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("error");
        }

        @Override
        public void onCompleted() {
            System.out.println("Subscriber2 completed");
        }
    };
}

PublishSubject<Integer> subject = PublishSubject.create(); 
subject.subscribe(getFirstObserver()); 
subject.onNext(1); 
subject.onNext(2); 
subject.onNext(3); 
subject.subscribe(getSecondObserver()); 
subject.onNext(4); 
subject.onCompleted();
 
assertTrue(subscriber1 + subscriber2 == 14)

9. 资源管理

using 操作符可将资源(如 JDBC 连接、网络连接、文件)关联到 Observable。

注释中说明了实现步骤,代码示例:

String[] result = {""};
Observable<Character> values = Observable.using(
  () -> "MyResource",
  r -> {
      return Observable.create(o -> {
          for (Character c : r.toCharArray()) {
              o.onNext(c);
          }
          o.onCompleted();
      });
  },
  r -> System.out.println("Disposed: " + r)
);
values.subscribe(
  v -> result[0] += v,
  e -> result[0] += e
);
assertTrue(result[0].equals("MyResource"));

10. 总结

本文介绍了 RxJava 的核心功能及关键特性。完整代码示例可在 GitHub 获取。


原始标题:Introduction to RxJava | Baeldung

« 上一篇: Retrofit 介绍