1. 概述
本教程将深入探讨 如何在 RxJava 中实现带延迟的重试机制。当处理 Observable 时,我们经常会遇到错误而非成功的情况。此时,通过带延迟的重试机制,我们可以在等待一段时间后重新尝试操作,从而提高系统的健壮性。
2. 项目配置
首先,我们需要创建一个 Maven 或 Gradle 项目。这里以 Maven 项目为例,在 pom.xml 中添加 rxjava 依赖:
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.21</version>
</dependency>
该依赖可在 Maven Central 获取。
3. Observable 生命周期中的重试机制
当 Observable 源发出错误时,Retry 操作符就会派上用场。它本质上是重新订阅原始数据源,期望在下一次尝试中能成功完成。因此,retry() 及其变体仅在 Observable 触发 onError 时才会生效。
3.1. Retry 的重载变体
除了基础的 retry() 方法,RxJava 还提供两个重载变体:
✅ retry(long)
返回一个镜像 Observable,仅当原始流触发 onError 的次数未超过指定值时才会重新订阅。
✅ retry(Func2)
接受一个函数作为参数,该函数接收 Throwable 和重试次数,返回 boolean。只有当函数针对特定异常返回 true 时才会触发重试。
3.2. RetryWhen 与 Retry 的区别
retryWhen(Func1) 提供了更灵活的自定义重试逻辑。它接收一个函数,该函数处理 onError 抛出的异常:
- 当函数发出任何 item 时 → 触发重新订阅
- 当函数触发 onError 时 → 终止重试流程
⚠️ 踩坑提示:retryWhen 的实现需要特别注意,错误处理逻辑不当可能导致无限重试。
4. 代码示例
通过实际代码片段来理解这些概念:
4.1. 成功的 Observable
Observable<String> successfulObservable = Observable.create(emitter -> {
emitter.onNext("Data 1");
emitter.onNext("Data 2");
emitter.onComplete();
});
successfulObservable
.subscribe(
data -> System.out.println("Received: " + data),
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed!")
);
输出结果:
Received: Data 1
Received: Data 2
Completed!
4.2. 基础重试机制
Observable<String> failingObservable = Observable.create(emitter -> {
emitter.onNext("Initial Data");
emitter.onError(new RuntimeException("Simulated Error"));
});
failingObservable
.retry(2) // 最多重试2次
.subscribe(
data -> System.out.println("Received: " + data),
error -> System.err.println("Final Error: " + error)
);
输出结果:
Received: Initial Data
Received: Initial Data
Received: Initial Data
Final Error: java.lang.RuntimeException: Simulated Error
4.3. 带延迟的重试实现
Observable<String> delayedRetryObservable = Observable.create(emitter -> {
emitter.onNext("Critical Data");
emitter.onError(new IOException("Network Failure"));
});
delayedRetryObservable
.retryWhen(errors -> errors
.zipWith(Observable.range(1, 3), (error, attempt) -> attempt)
.flatMap(attempt -> {
long delay = (long) Math.pow(2, attempt) * 1000; // 指数退避
System.out.println("Retry attempt #" + attempt + " after " + delay + "ms");
return Observable.timer(delay, TimeUnit.MILLISECONDS);
})
)
.blockingSubscribe(
data -> System.out.println("Success: " + data),
error -> System.err.println("Failed after retries: " + error)
);
输出示例:
Retry attempt #1 after 2000ms
Success: Critical Data
Retry attempt #2 after 4000ms
Success: Critical Data
Retry attempt #3 after 8000ms
Success: Critical Data
Failed after retries: java.io.IOException: Network Failure
关键点说明:
- 使用
zipWith
组合错误流和重试计数器- 通过
flatMap
实现指数退避延迟Observable.timer
创建延迟流- 最终重试耗尽后抛出原始错误