1. 概述

本教程将深入探讨 如何在 RxJava 中实现带延迟的重试机制。当处理 Observable 时,我们经常会遇到错误而非成功的情况。此时,通过带延迟的重试机制,我们可以在等待一段时间后重新尝试操作,从而提高系统的健壮性。

2. 项目配置

首先,我们需要创建一个 Maven 或 Gradle 项目。这里以 Maven 项目为例,在 pom.xml 中添加 rxjava 依赖:

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.21</version>
</dependency>

该依赖可在 Maven Central 获取。

3. Observable 生命周期中的重试机制

Observable 源发出错误时,Retry 操作符就会派上用场。它本质上是重新订阅原始数据源,期望在下一次尝试中能成功完成。因此,retry() 及其变体仅在 Observable 触发 onError 时才会生效。

3.1. Retry 的重载变体

除了基础的 retry() 方法,RxJava 还提供两个重载变体:

retry(long)
返回一个镜像 Observable,仅当原始流触发 onError 的次数未超过指定值时才会重新订阅。

retry(Func2)
接受一个函数作为参数,该函数接收 Throwable 和重试次数,返回 boolean。只有当函数针对特定异常返回 true 时才会触发重试。

3.2. RetryWhen 与 Retry 的区别

retryWhen(Func1) 提供了更灵活的自定义重试逻辑。它接收一个函数,该函数处理 onError 抛出的异常:

  • 当函数发出任何 item 时 → 触发重新订阅
  • 当函数触发 onError 时 → 终止重试流程

⚠️ 踩坑提示:retryWhen 的实现需要特别注意,错误处理逻辑不当可能导致无限重试。

4. 代码示例

通过实际代码片段来理解这些概念:

4.1. 成功的 Observable

Observable<String> successfulObservable = Observable.create(emitter -> {
    emitter.onNext("Data 1");
    emitter.onNext("Data 2");
    emitter.onComplete();
});

successfulObservable
    .subscribe(
        data -> System.out.println("Received: " + data),
        error -> System.err.println("Error: " + error),
        () -> System.out.println("Completed!")
    );

输出结果:

Received: Data 1
Received: Data 2
Completed!

4.2. 基础重试机制

Observable<String> failingObservable = Observable.create(emitter -> {
    emitter.onNext("Initial Data");
    emitter.onError(new RuntimeException("Simulated Error"));
});

failingObservable
    .retry(2) // 最多重试2次
    .subscribe(
        data -> System.out.println("Received: " + data),
        error -> System.err.println("Final Error: " + error)
    );

输出结果:

Received: Initial Data
Received: Initial Data
Received: Initial Data
Final Error: java.lang.RuntimeException: Simulated Error

4.3. 带延迟的重试实现

Observable<String> delayedRetryObservable = Observable.create(emitter -> {
    emitter.onNext("Critical Data");
    emitter.onError(new IOException("Network Failure"));
});

delayedRetryObservable
    .retryWhen(errors -> errors
        .zipWith(Observable.range(1, 3), (error, attempt) -> attempt)
        .flatMap(attempt -> {
            long delay = (long) Math.pow(2, attempt) * 1000; // 指数退避
            System.out.println("Retry attempt #" + attempt + " after " + delay + "ms");
            return Observable.timer(delay, TimeUnit.MILLISECONDS);
        })
    )
    .blockingSubscribe(
        data -> System.out.println("Success: " + data),
        error -> System.err.println("Failed after retries: " + error)
    );

输出示例:

Retry attempt #1 after 2000ms
Success: Critical Data
Retry attempt #2 after 4000ms
Success: Critical Data
Retry attempt #3 after 8000ms
Success: Critical Data
Failed after retries: java.io.IOException: Network Failure

关键点说明:

  • 使用 zipWith 组合错误流和重试计数器
  • 通过 flatMap 实现指数退避延迟
  • Observable.timer 创建延迟流
  • 最终重试耗尽后抛出原始错误

原始标题:Retry with Delay in RxJava