1. 引言

本文将深入探讨 RxJava 中的错误处理机制。首先需要明确一个核心概念:Observable 通常不会直接抛出异常。默认情况下,当发生不可恢复的错误时,Observable 会调用其 Observer 的 onError() 方法通知错误,然后立即终止序列,不再调用 Observer 的其他方法。

我们即将介绍的错误处理操作符,正是通过恢复或重试 Observable 序列来改变这种默认行为的关键。

2. Maven 依赖

首先在 pom.xml 中添加 RxJava 依赖:

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.1.3</version>
</dependency>

最新版本可在 Maven 中央仓库 查询。

3. 错误处理策略

当错误发生时,通常需要采取特定处理方式,例如:

  • 修改外部状态
  • 使用默认值恢复序列
  • 允许错误继续传播

3.1 错误时执行操作

使用 doOnError 可在错误发生时执行指定操作:

@Test
public void whenChangeStateOnError_thenErrorThrown() {
    TestObserver testObserver = new TestObserver();
    AtomicBoolean state = new AtomicBoolean(false);
    Observable
      .error(UNKNOWN_ERROR)
      .doOnError(throwable -> state.set(true))
      .subscribe(testObserver);

    testObserver.assertError(UNKNOWN_ERROR);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
 
    assertTrue("state should be changed", state.get());
}

⚠️ 注意:若操作中抛出异常,RxJava 会将其包装为 CompositeException

@Test
public void whenExceptionOccurOnError_thenCompositeExceptionThrown() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .doOnError(throwable -> {
          throw new RuntimeException("unexcepted");
      })
      .subscribe(testObserver);

    testObserver.assertError(CompositeException.class);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
}

3.2 使用默认值恢复序列

虽然 doOnError 能执行操作,但错误仍会中断序列流程。若需用默认值恢复序列,可使用 onErrorReturnItem

@Test
public void whenHandleOnErrorResumeItem_thenResumed(){
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .onErrorReturnItem("singleValue")
      .subscribe(testObserver);
 
    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertValueCount(1);
    testObserver.assertValue("singleValue");
}

若需动态生成默认值,可使用 onErrorReturn

@Test
public void whenHandleOnErrorReturn_thenResumed() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .onErrorReturn(Throwable::getMessage)
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertValueCount(1);
    testObserver.assertValue("unknown error");
}

3.3 使用备用序列恢复

当需要用备用数据序列(而非单个值)恢复时,可使用 onErrorResumeNext 阻止错误传播:

@Test
public void whenHandleOnErrorResume_thenResumed() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .onErrorResumeNext(Observable.just("one", "two"))
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertValueCount(2);
    testObserver.assertValues("one", "two");
}

若需根据异常类型动态生成备用序列,可传入函数式接口:

@Test
public void whenHandleOnErrorResumeFunc_thenResumed() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .onErrorResumeNext(throwable -> Observable
        .just(throwable.getMessage(), "nextValue"))
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertValueCount(2);
    testObserver.assertValues("unknown error", "nextValue");
}

3.4 仅处理异常(非错误)

RxJava 提供了 onExceptionResumeNext仅当抛出 Exception(非 Error)时才恢复序列:

@Test
public void whenHandleOnException_thenResumed() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_EXCEPTION)
      .onExceptionResumeNext(Observable.just("exceptionResumed"))
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertValueCount(1);
    testObserver.assertValue("exceptionResumed");
}

@Test
public void whenHandleOnException_thenNotResumed() {
    TestObserver testObserver = new TestObserver();
    Observable
      .error(UNKNOWN_ERROR)
      .onExceptionResumeNext(Observable.just("exceptionResumed"))
      .subscribe(testObserver);

    testObserver.assertError(UNKNOWN_ERROR);
    testObserver.assertNotComplete();
}

如代码所示,当发生 Error(非 Exception)时,onExceptionResumeNext 不会触发恢复逻辑。

4. 错误重试机制

临时系统故障或后端错误可能导致序列中断。此时需要实现重试逻辑,等待问题解决。

4.1 基础重试

使用 retry 可无限重试,但实际开发中通常限制重试次数:

@Test
public void whenRetryOnError_thenRetryConfirmed() {
    TestObserver testObserver = new TestObserver();
    AtomicInteger atomicCounter = new AtomicInteger(0);
    Observable
      .error(() -> {
          atomicCounter.incrementAndGet();
          return UNKNOWN_ERROR;
      })
      .retry(1)
      .subscribe(testObserver);

    testObserver.assertError(UNKNOWN_ERROR);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
    assertTrue("should try twice", atomicCounter.get() == 2);
}

4.2 条件重试

RxJava 支持条件重试,可通过 谓词重试retryUntil 实现:

@Test
public void whenRetryConditionallyOnError_thenRetryConfirmed() {
    TestObserver testObserver = new TestObserver();
    AtomicInteger atomicCounter = new AtomicInteger(0);
    Observable
      .error(() -> {
          atomicCounter.incrementAndGet();
          return UNKNOWN_ERROR;
      })
      .retry((integer, throwable) -> integer < 4)
      .subscribe(testObserver);

    testObserver.assertError(UNKNOWN_ERROR);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
    assertTrue("should call 4 times", atomicCounter.get() == 4);
}

@Test
public void whenRetryUntilOnError_thenRetryConfirmed() {
    TestObserver testObserver = new TestObserver();
    AtomicInteger atomicCounter = new AtomicInteger(0);
    Observable
      .error(UNKNOWN_ERROR)
      .retryUntil(() -> atomicCounter.incrementAndGet() > 3)
      .subscribe(testObserver);
    testObserver.assertError(UNKNOWN_ERROR);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
    assertTrue("should call 4 times", atomicCounter.get() == 4);
}

4.3 高级重试:retryWhen

retryWhen 是最灵活的重试机制,其核心逻辑如下:

  • 返回一个新的 Observable(NewO)
  • 若 NewO 调用 onCompleteonError,则终止订阅
  • 若 NewO 发射任何数据项,则触发源 Observable 重新订阅

测试用例展示其工作机制:

@Test
public void whenRetryWhenOnError_thenRetryConfirmed() {
    TestObserver testObserver = new TestObserver();
    Exception noretryException = new Exception("don't retry");
    Observable
      .error(UNKNOWN_ERROR)
      .retryWhen(throwableObservable -> Observable.error(noretryException))
      .subscribe(testObserver);

    testObserver.assertError(noretryException);
    testObserver.assertNotComplete();
    testObserver.assertNoValues();
}

@Test
public void whenRetryWhenOnError_thenCompleted() {
    TestObserver testObserver = new TestObserver();
    AtomicInteger atomicCounter = new AtomicInteger(0);
    Observable
      .error(() -> {
        atomicCounter.incrementAndGet();
        return UNKNOWN_ERROR;
      })
      .retryWhen(throwableObservable -> Observable.empty())
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertNoValues();
    assertTrue("should not retry", atomicCounter.get()==0);
}

@Test
public void whenRetryWhenOnError_thenResubscribed() {
    TestObserver testObserver = new TestObserver();
    AtomicInteger atomicCounter = new AtomicInteger(0);
    Observable
      .error(() -> {
        atomicCounter.incrementAndGet();
        return UNKNOWN_ERROR;
      })
      .retryWhen(throwableObservable -> Observable.just("anything"))
      .subscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertNoValues();
    assertTrue("should retry once", atomicCounter.get()==1);
}

典型应用场景:带延迟的有限次重试

@Test
public void whenRetryWhenForMultipleTimesOnError_thenResumed() {
    TestObserver testObserver = new TestObserver();
    long before = System.currentTimeMillis();
    Observable
      .error(UNKNOWN_ERROR)
      .retryWhen(throwableObservable -> throwableObservable
        .zipWith(Observable.range(1, 3), (throwable, integer) -> integer)
        .flatMap(integer -> Observable.timer(integer, TimeUnit.SECONDS)))
      .blockingSubscribe(testObserver);

    testObserver.assertNoErrors();
    testObserver.assertComplete();
    testObserver.assertNoValues();
    long secondsElapsed = (System.currentTimeMillis() - before)/1000;
    assertTrue("6 seconds should elapse",secondsElapsed == 6 );
}

✅ 该实现实现了 3 次重试,且每次重试延迟时间递增(1s → 2s → 3s)。

5. 总结

本文系统介绍了 RxJava 中处理错误和异常的多种策略,包括:

  • 错误时执行操作(doOnError
  • 使用默认值恢复(onErrorReturnItem/onErrorReturn
  • 使用备用序列恢复(onErrorResumeNext
  • 仅处理异常(onExceptionResumeNext
  • 基础重试(retry
  • 条件重试(retry + 谓词/retryUntil
  • 高级重试(retryWhen

关于 RxJava 特有的错误处理异常,可参考 官方 Wiki。完整实现代码见 GitHub 仓库


原始标题:RxJava and Error Handling | Baeldung