1. 引言
本文将深入探讨 RxJava 中的错误处理机制。首先需要明确一个核心概念:Observable 通常不会直接抛出异常。默认情况下,当发生不可恢复的错误时,Observable 会调用其 Observer 的 onError()
方法通知错误,然后立即终止序列,不再调用 Observer 的其他方法。
我们即将介绍的错误处理操作符,正是通过恢复或重试 Observable 序列来改变这种默认行为的关键。
2. Maven 依赖
首先在 pom.xml
中添加 RxJava 依赖:
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.1.3</version>
</dependency>
最新版本可在 Maven 中央仓库 查询。
3. 错误处理策略
当错误发生时,通常需要采取特定处理方式,例如:
- 修改外部状态
- 使用默认值恢复序列
- 允许错误继续传播
3.1 错误时执行操作
使用 doOnError
可在错误发生时执行指定操作:
@Test
public void whenChangeStateOnError_thenErrorThrown() {
TestObserver testObserver = new TestObserver();
AtomicBoolean state = new AtomicBoolean(false);
Observable
.error(UNKNOWN_ERROR)
.doOnError(throwable -> state.set(true))
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("state should be changed", state.get());
}
⚠️ 注意:若操作中抛出异常,RxJava 会将其包装为 CompositeException
:
@Test
public void whenExceptionOccurOnError_thenCompositeExceptionThrown() {
TestObserver testObserver = new TestObserver();
Observable
.error(UNKNOWN_ERROR)
.doOnError(throwable -> {
throw new RuntimeException("unexcepted");
})
.subscribe(testObserver);
testObserver.assertError(CompositeException.class);
testObserver.assertNotComplete();
testObserver.assertNoValues();
}
3.2 使用默认值恢复序列
虽然 doOnError
能执行操作,但错误仍会中断序列流程。若需用默认值恢复序列,可使用 onErrorReturnItem
:
@Test
public void whenHandleOnErrorResumeItem_thenResumed(){
TestObserver testObserver = new TestObserver();
Observable
.error(UNKNOWN_ERROR)
.onErrorReturnItem("singleValue")
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(1);
testObserver.assertValue("singleValue");
}
若需动态生成默认值,可使用 onErrorReturn
:
@Test
public void whenHandleOnErrorReturn_thenResumed() {
TestObserver testObserver = new TestObserver();
Observable
.error(UNKNOWN_ERROR)
.onErrorReturn(Throwable::getMessage)
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(1);
testObserver.assertValue("unknown error");
}
3.3 使用备用序列恢复
当需要用备用数据序列(而非单个值)恢复时,可使用 onErrorResumeNext
阻止错误传播:
@Test
public void whenHandleOnErrorResume_thenResumed() {
TestObserver testObserver = new TestObserver();
Observable
.error(UNKNOWN_ERROR)
.onErrorResumeNext(Observable.just("one", "two"))
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(2);
testObserver.assertValues("one", "two");
}
若需根据异常类型动态生成备用序列,可传入函数式接口:
@Test
public void whenHandleOnErrorResumeFunc_thenResumed() {
TestObserver testObserver = new TestObserver();
Observable
.error(UNKNOWN_ERROR)
.onErrorResumeNext(throwable -> Observable
.just(throwable.getMessage(), "nextValue"))
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(2);
testObserver.assertValues("unknown error", "nextValue");
}
3.4 仅处理异常(非错误)
RxJava 提供了 onExceptionResumeNext
,仅当抛出 Exception(非 Error)时才恢复序列:
@Test
public void whenHandleOnException_thenResumed() {
TestObserver testObserver = new TestObserver();
Observable
.error(UNKNOWN_EXCEPTION)
.onExceptionResumeNext(Observable.just("exceptionResumed"))
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(1);
testObserver.assertValue("exceptionResumed");
}
@Test
public void whenHandleOnException_thenNotResumed() {
TestObserver testObserver = new TestObserver();
Observable
.error(UNKNOWN_ERROR)
.onExceptionResumeNext(Observable.just("exceptionResumed"))
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
}
如代码所示,当发生 Error(非 Exception)时,onExceptionResumeNext
不会触发恢复逻辑。
4. 错误重试机制
临时系统故障或后端错误可能导致序列中断。此时需要实现重试逻辑,等待问题解决。
4.1 基础重试
使用 retry
可无限重试,但实际开发中通常限制重试次数:
@Test
public void whenRetryOnError_thenRetryConfirmed() {
TestObserver testObserver = new TestObserver();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retry(1)
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("should try twice", atomicCounter.get() == 2);
}
4.2 条件重试
RxJava 支持条件重试,可通过 谓词重试 或 retryUntil
实现:
@Test
public void whenRetryConditionallyOnError_thenRetryConfirmed() {
TestObserver testObserver = new TestObserver();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retry((integer, throwable) -> integer < 4)
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("should call 4 times", atomicCounter.get() == 4);
}
@Test
public void whenRetryUntilOnError_thenRetryConfirmed() {
TestObserver testObserver = new TestObserver();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.error(UNKNOWN_ERROR)
.retryUntil(() -> atomicCounter.incrementAndGet() > 3)
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("should call 4 times", atomicCounter.get() == 4);
}
4.3 高级重试:retryWhen
retryWhen
是最灵活的重试机制,其核心逻辑如下:
- 返回一个新的 Observable(NewO)
- 若 NewO 调用
onComplete
或onError
,则终止订阅 - 若 NewO 发射任何数据项,则触发源 Observable 重新订阅
测试用例展示其工作机制:
@Test
public void whenRetryWhenOnError_thenRetryConfirmed() {
TestObserver testObserver = new TestObserver();
Exception noretryException = new Exception("don't retry");
Observable
.error(UNKNOWN_ERROR)
.retryWhen(throwableObservable -> Observable.error(noretryException))
.subscribe(testObserver);
testObserver.assertError(noretryException);
testObserver.assertNotComplete();
testObserver.assertNoValues();
}
@Test
public void whenRetryWhenOnError_thenCompleted() {
TestObserver testObserver = new TestObserver();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retryWhen(throwableObservable -> Observable.empty())
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertNoValues();
assertTrue("should not retry", atomicCounter.get()==0);
}
@Test
public void whenRetryWhenOnError_thenResubscribed() {
TestObserver testObserver = new TestObserver();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retryWhen(throwableObservable -> Observable.just("anything"))
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertNoValues();
assertTrue("should retry once", atomicCounter.get()==1);
}
典型应用场景:带延迟的有限次重试:
@Test
public void whenRetryWhenForMultipleTimesOnError_thenResumed() {
TestObserver testObserver = new TestObserver();
long before = System.currentTimeMillis();
Observable
.error(UNKNOWN_ERROR)
.retryWhen(throwableObservable -> throwableObservable
.zipWith(Observable.range(1, 3), (throwable, integer) -> integer)
.flatMap(integer -> Observable.timer(integer, TimeUnit.SECONDS)))
.blockingSubscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertNoValues();
long secondsElapsed = (System.currentTimeMillis() - before)/1000;
assertTrue("6 seconds should elapse",secondsElapsed == 6 );
}
✅ 该实现实现了 3 次重试,且每次重试延迟时间递增(1s → 2s → 3s)。
5. 总结
本文系统介绍了 RxJava 中处理错误和异常的多种策略,包括:
- 错误时执行操作(
doOnError
) - 使用默认值恢复(
onErrorReturnItem
/onErrorReturn
) - 使用备用序列恢复(
onErrorResumeNext
) - 仅处理异常(
onExceptionResumeNext
) - 基础重试(
retry
) - 条件重试(
retry
+ 谓词/retryUntil
) - 高级重试(
retryWhen
)