1. 概述
本文将聚焦于在 Java 中使用 Reactive Extensions (Rx) 来组合和消费数据序列。
乍看之下,RxJava 的 API 可能与 Java 8 Stream 相似,但它实际上更灵活且流畅,是一种强大的编程范式。
若想深入了解 RxJava,推荐阅读这篇关于背压的文章。
2. 环境配置
在 Maven 项目中使用 RxJava,需在 pom.xml
中添加以下依赖:
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>${rx.java.version}</version>
</dependency>
Gradle 项目则添加:
compile 'io.reactivex.rxjava:rxjava:x.y.z'
3. 函数式响应式概念
✅ 函数式编程:通过组合纯函数构建软件,避免共享状态、可变数据和副作用。
⚠️ 响应式编程:一种异步编程范式,关注数据流和变化的传播。
两者结合形成 函数式响应式编程,优雅地处理事件驱动场景——数据随时间变化,消费者实时响应。该技术有多个实现,社区通过文档定义了通用术语。
3.1. 响应式宣言
响应式宣言 定义了软件系统的行业标准。简单来说,响应式系统需满足:
- 响应性:系统需及时响应
- 消息驱动:组件间通过异步消息传递实现松耦合
- 弹性:高负载下保持响应
- 容错性:部分组件故障时仍能响应
4. Observable
Rx 中有两个核心类型:
- Observable:表示可从数据源获取数据的对象,其状态变化可被其他对象订阅
- Observer:订阅 Observable 的对象,在状态变化时接收通知
Observer 订阅 Observable 序列后,序列会逐个发送数据项。Observer 处理完当前项才会处理下一项。若事件异步涌入,需排队或丢弃。
Rx 保证:Observer 不会收到乱序数据,也不会在前一个回调未完成时被调用。
4.1. Observable 类型
分为两类:
- 非阻塞型:支持异步执行,可在事件流任意位置取消订阅(本文重点)
- 阻塞型:所有
onNext
调用同步执行,无法中途取消订阅。可通过toBlocking
转换:
BlockingObservable<String> blockingObservable = observable.toBlocking();
4.2. 操作符
操作符是接收源 Observable 并返回目标 Observable 的函数。对源 Observable 发射的每个数据项应用函数后,将结果发射到目标 Observable。
操作符可链式组合,构建复杂的数据流过滤逻辑。但需注意:若 Observable 发射速度超过操作符/Observer 的处理速度,可能引发背压问题(详见这里)。
4.3. 创建 Observable
基础操作符 just
创建发射单个数据的 Observable(如字符串 "Hello"
)。通过实现 Observer 接口并调用 subscribe
获取数据:
Observable<String> observable = Observable.just("Hello");
observable.subscribe(s -> result = s);
assertTrue(result.equals("Hello"));
4.4. OnNext、OnError 和 OnCompleted
Observer 接口有三个关键方法:
- OnNext:每次新事件发布时调用,在此处理事件
- OnCompleted:事件序列完成时调用,表示后续无
onNext
- OnError:框架或事件处理代码抛出未捕获异常时调用
subscribe
方法返回 Subscription
对象:
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
Observable<String> observable = Observable.from(letters);
observable.subscribe(
i -> result += i, //OnNext
Throwable::printStackTrace, //OnError
() -> result += "_Completed" //OnCompleted
);
assertTrue(result.equals("abcdefg_Completed"));
5. Observable 转换与条件操作符
5.1. Map
map
操作符通过函数转换 Observable 发射的数据项。
假设有字母数组,需转为大写输出:
Observable.from(letters)
.map(String::toUpperCase)
.subscribe(letter -> result += letter);
assertTrue(result.equals("ABCDEFG"));
flatMap
用于扁平化嵌套的 Observable(map 与 flatMap 的区别)。
假设方法返回 Observable<String>
,打印每个字符串对应的标题列表:
Observable<String> getTitle() {
return Observable.from(titleList);
}
Observable.just("book1", "book2")
.flatMap(s -> getTitle())
.subscribe(l -> result += l);
assertTrue(result.equals("titletitle"));
5.2. Scan
scan
操作符对 Observable 发射的每个数据项顺序应用函数,并发射每次计算结果。可用于在事件间传递状态:
String[] letters = {"a", "b", "c"};
Observable.from(letters)
.scan(new StringBuilder(), StringBuilder::append)
.subscribe(total -> result += total.toString());
assertTrue(result.equals("aababc"));
5.3. GroupBy
groupBy
将输入 Observable 的事件分类到输出组。
假设有 0-10 的整数数组,按奇偶分组:
Observable.from(numbers)
.groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD")
.subscribe(group ->
group.subscribe((number) -> {
if (group.getKey().toString().equals("EVEN")) {
EVEN[0] += number;
} else {
ODD[0] += number;
}
})
);
assertTrue(EVEN[0].equals("0246810"));
assertTrue(ODD[0].equals("13579"));
5.4. Filter
filter
操作符仅发射通过谓词测试的数据项。过滤整数数组中的奇数:
Observable.from(numbers)
.filter(i -> (i % 2 == 1))
.subscribe(i -> result += i);
assertTrue(result.equals("13579"));
5.5. 条件操作符
defaultIfEmpty
:源 Observable 为空时发射默认值:
Observable.empty()
.defaultIfEmpty("Observable is empty")
.subscribe(s -> result += s);
assertTrue(result.equals("Observable is empty"));
若数组非空,则发射首字母(如 "a"
):
Observable.from(letters)
.defaultIfEmpty("Observable is empty")
.first()
.subscribe(s -> result += s);
assertTrue(result.equals("a"));
takeWhile
:在条件变为 false
后丢弃后续数据项:
Observable.from(numbers)
.takeWhile(i -> i < 5)
.subscribe(s -> sum[0] += s);
assertTrue(sum[0] == 10);
其他操作符如 Contains
、SkipWhile
、SkipUntil
、TakeUntil
等可按需选用。
6. ConnectableObservable
ConnectableObservable 类似普通 Observable,但订阅后不会立即发射数据,需调用 connect()
才开始。
这允许等待所有 Observer 订阅后再发射数据:
String[] result = {""};
ConnectableObservable<Long> connectable
= Observable.interval(200, TimeUnit.MILLISECONDS).publish();
connectable.subscribe(i -> result[0] += i);
assertFalse(result[0].equals("01"));
connectable.connect();
Thread.sleep(500);
assertTrue(result[0].equals("01"));
7. Single
Single 是特殊 Observable,仅发射单个数据或错误通知。订阅方式:
-
onSuccess
:成功时调用指定方法 -
onError
:错误时立即通知订阅者
String[] result = {""};
Single<String> single = Observable.just("Hello")
.toSingle()
.doOnSuccess(i -> result[0] += i)
.doOnError(error -> {
throw new RuntimeException(error.getMessage());
});
single.subscribe();
assertTrue(result[0].equals("Hello"));
8. Subject
Subject 同时是 Subscriber 和 Observable。作为 Subscriber 可发布多个 Observable 的事件;作为 Observable 可将事件转发给订阅者。
以下示例展示 Observer 仅能收到订阅后的事件:
Integer subscriber1 = 0;
Integer subscriber2 = 0;
Observer<Integer> getFirstObserver() {
return new Observer<Integer>() {
@Override
public void onNext(Integer value) {
subscriber1 += value;
}
@Override
public void onError(Throwable e) {
System.out.println("error");
}
@Override
public void onCompleted() {
System.out.println("Subscriber1 completed");
}
};
}
Observer<Integer> getSecondObserver() {
return new Observer<Integer>() {
@Override
public void onNext(Integer value) {
subscriber2 += value;
}
@Override
public void onError(Throwable e) {
System.out.println("error");
}
@Override
public void onCompleted() {
System.out.println("Subscriber2 completed");
}
};
}
PublishSubject<Integer> subject = PublishSubject.create();
subject.subscribe(getFirstObserver());
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
subject.subscribe(getSecondObserver());
subject.onNext(4);
subject.onCompleted();
assertTrue(subscriber1 + subscriber2 == 14)
9. 资源管理
using
操作符可将资源(如 JDBC 连接、网络连接、文件)关联到 Observable。
注释中说明了实现步骤,代码示例:
String[] result = {""};
Observable<Character> values = Observable.using(
() -> "MyResource",
r -> {
return Observable.create(o -> {
for (Character c : r.toCharArray()) {
o.onNext(c);
}
o.onCompleted();
});
},
r -> System.out.println("Disposed: " + r)
);
values.subscribe(
v -> result[0] += v,
e -> result[0] += e
);
assertTrue(result[0].equals("MyResource"));
10. 总结
本文介绍了 RxJava 的核心功能及关键特性。完整代码示例可在 GitHub 获取。