1. 简介

RxJava 入门 之后,我们来深入探讨过滤操作符。本文将重点介绍过滤、跳过、时间过滤和一些更高级的过滤操作。

2. 基础过滤

当处理 Observable 时,有时需要从发射的数据项中筛选出特定子集。RxJava 提供了多种过滤能力来实现这个需求。我们从最基础的 filter 操作符开始。

2.1 filter 操作符

简单粗暴地说,filter 操作符通过 Predicate 条件过滤 Observable,只保留满足条件的数据项。

下面演示如何筛选奇数:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable
  .filter(i -> i % 2 != 0);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(1, 3, 5, 7, 9);

2.2 take 操作符

take 操作符只保留前 n 个数据项,后续数据直接丢弃。

示例:只取前 3 个元素:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.take(3);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(1, 2, 3);

2.3 takeWhile 操作符

takeWhile 持续发射数据直到遇到第一个不满足 Predicate 的元素,之后立即终止。

示例:只取小于 4 的连续元素:

Observable<Integer> sourceObservable = Observable.just(1, 2, 3, 4, 3, 2, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable
  .takeWhile(i -> i < 4);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(1, 2, 3);

2.4 takeFirst 操作符

当需要获取第一个满足条件的元素时,takeFirst() 是最佳选择。

示例:获取第一个大于 5 的数:

Observable<Integer> sourceObservable = Observable
  .just(1, 2, 3, 4, 5, 7, 6);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable
  .takeFirst(x -> x > 5);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(7);

2.5 firstfirstOrDefault 操作符

类似行为可通过 first 实现:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.first();

filteredObservable.subscribe(subscriber);

subscriber.assertValue(1);

但若需指定空数据时的默认值,应使用 firstOrDefault

Observable<Integer> sourceObservable = Observable.empty();

Observable<Integer> filteredObservable = sourceObservable.firstOrDefault(-1);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(-1);

2.6 takeLast 操作符

takeLast 只保留最后 n 个元素,其他全部丢弃。

示例:只取最后 3 个元素:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.takeLast(3);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(8, 9, 10);

⚠️ 注意:此操作符会延迟所有元素发射,直到源 Observable 完成。

2.7 lastlastOrDefault

除了 takeLast(1)last 可直接获取最后一个元素,并支持条件过滤:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable
  .last(i -> i % 2 != 0);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(9);

空数据或无匹配元素时,使用 lastOrDefault 返回默认值:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = 
  sourceObservable.lastOrDefault(-1, i -> i > 10);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(-1);

2.8 elementAtelementAtOrDefault 操作符

elementAt 通过索引获取指定位置的元素:

Observable<Integer> sourceObservable = Observable
  .just(1, 2, 3, 5, 7, 11);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.elementAt(4);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(7);

❌ 超出索引会抛 IndexOutOfBoundException。改用 elementAtOrDefault 可避免:

Observable<Integer> sourceObservable = Observable
  .just(1, 2, 3, 5, 7, 11);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable
 = sourceObservable.elementAtOrDefault(7, -1);

filteredObservable.subscribe(subscriber);

subscriber.assertValue(-1);

2.9 ofType 操作符

Observable 发射混合类型数据时,ofType 可按类型过滤:

示例:只保留字符串类型:

Observable sourceObservable = Observable.just(1, "two", 3, "five", 7, 11);
TestSubscriber subscriber = new TestSubscriber();

Observable filteredObservable = sourceObservable.ofType(String.class);

filteredObservable.subscribe(subscriber);

subscriber.assertValues("two", "five");

3. 跳过操作

与过滤相反,RxJava 提供了跳过操作符作为过滤的补充。我们从 skip 开始。

3.1 skip 操作符

skip 直接丢弃前 n 个元素,保留后续所有元素。

示例:跳过前 4 个元素:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.skip(4);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(5, 6, 7, 8, 9, 10);

3.2 skipWhile 操作符

skipWhile 丢弃所有满足条件的初始元素,直到遇到第一个不满足条件的元素,之后保留所有后续元素。

示例:跳过所有小于 4 的初始元素:

Observable<Integer> sourceObservable = Observable
  .just(1, 2, 3, 4, 5, 4, 3, 2, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable
  .skipWhile(i -> i < 4);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(4, 5, 4, 3, 2, 1);

3.3 skipLast 操作符

skipLast 丢弃最后 n 个元素,只保留前面的元素。

示例:跳过最后 5 个元素:

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = sourceObservable.skipLast(5);

filteredObservable.subscribe(subscriber);

subscriber.assertValues(1, 2, 3, 4, 5);

3.4 distinctdistinctUntilChanged 操作符

distinct 过滤所有重复元素,只保留首次出现的元素:

Observable<Integer> sourceObservable = Observable
  .just(1, 1, 2, 2, 1, 3, 3, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> distinctObservable = sourceObservable.distinct();

distinctObservable.subscribe(subscriber);

subscriber.assertValues(1, 2, 3);

distinctUntilChanged 只过滤连续重复的元素:

Observable<Integer> sourceObservable = Observable
  .just(1, 1, 2, 2, 1, 3, 3, 1);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> distinctObservable = sourceObservable.distinctUntilChanged();

distinctObservable.subscribe(subscriber);
subscriber.assertValues(1, 2, 1, 3, 1);

3.5 ignoreElements 操作符

ignoreElements 直接丢弃所有元素,只保留终止通知(onComplete/onError):

Observable<Integer> sourceObservable = Observable.range(1, 10);
TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> ignoredObservable = sourceObservable.ignoreElements();

ignoredObservable.subscribe(subscriber);

subscriber.assertNoValues();

4. 时间过滤操作符

在处理 Observable 序列时,时间维度常被忽略,但RxJava 提供了基于时间的过滤能力。先定义一个每秒发射一个元素的测试 Observable

TestScheduler testScheduler = new TestScheduler();

Observable<Integer> timedObservable = Observable
  .just(1, 2, 3, 4, 5, 6)
  .zipWith(Observable.interval(
    0, 1, TimeUnit.SECONDS, testScheduler), (item, time) -> item);

TestScheduler 是特殊调度器,可手动推进时间,便于测试。

4.1 samplethrottleLast 操作符

sample 按固定时间间隔采样,发射每个周期内的最后一个元素:

示例:每 2.5 秒采样一次:

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> sampledObservable = timedObservable
  .sample(2500L, TimeUnit.MILLISECONDS, testScheduler);

sampledObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertValues(3, 5, 6);

throttleLastsample 行为完全一致。

4.2 throttleFirst 操作符

throttleFirst 发射每个采样周期内的第一个元素(而非最后一个):

示例:每 4.1 秒取第一个元素:

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable
  .throttleFirst(4100L, TimeUnit.MILLISECONDS, testScheduler); // 原文此处单位错误,应为 MILLISECONDS

filteredObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertValues(1, 6);

4.3 debouncethrottleWithTimeout 操作符

debounce 只发射指定时间窗口内最后一个元素,丢弃中间所有元素:

示例:设置 2 秒防抖:

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable
  .debounce(2000L, TimeUnit.MILLISECONDS, testScheduler);

filteredObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertValue(6);

throttleWithTimeoutdebounce 行为一致。

4.4 timeout 操作符

timeout 在指定时间内未收到元素时,立即终止并抛 TimeoutException

示例:设置 500 毫秒超时:

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable
  .timeout(500L, TimeUnit.MILLISECONDS, testScheduler);

filteredObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertError(TimeoutException.class); 
subscriber.assertValues(1);

5. 多 Observable 过滤

**RxJava 支持基于第二个 Observable 动态过滤源 Observable**。先定义一个延迟 3 秒发射的 Observable

Observable<Integer> delayedObservable = Observable.just(1)
  .delay(3, TimeUnit.SECONDS, testScheduler);

5.1 takeUntil 操作符

takeUntil 在第二个 Observable 发射元素或终止时,立即停止源 Observable 的发射:

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable
  .takeUntil(delayedObservable); // 原文此处代码误写为 skipUntil

filteredObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertValues(1, 2, 3);

5.2 skipUntil 操作符

skipUntil 丢弃源 Observable 的所有元素,直到第二个 Observable 发射元素,之后开始正常发射:

TestSubscriber<Integer> subscriber = new TestSubscriber();

Observable<Integer> filteredObservable = timedObservable
  .skipUntil(delayedObservable);

filteredObservable.subscribe(subscriber);

testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);

subscriber.assertValues(4, 5, 6);

6. 总结

本文全面介绍了 RxJava 的过滤操作符,每个操作符都提供了简洁的示例。实际开发中,根据场景选择合适的操作符能显著简化代码逻辑。

所有示例代码可在 GitHub 获取。


原始标题:Filtering Observables in RxJava