1. 概述

本文探讨如何在 Jersey 框架中使用 JAX-RS 的响应式(Reactive)编程能力。我们假设你已经熟悉标准的 Jersey REST 客户端用法。

如果你了解过 响应式编程 的基本概念会更有帮助,但不是必须的——我们会从实战角度切入,带你避开常见坑点。


2. 依赖配置

首先,需要引入标准的 Jersey 客户端依赖:

<dependency>
    <groupId>org.glassfish.jersey.core</groupId>
    <artifactId>jersey-client</artifactId>
    <version>2.27</version>
</dependency>
<dependency>
    <groupId>org.glassfish.jersey.inject</groupId>
    <artifactId>jersey-hk2</artifactId>
    <version>2.27</version>
</dependency>

这两个是使用 Jersey 的基础依赖,提供了 JAX-RS 的核心能力。

✅ 若想集成响应式框架,还需额外引入对应的扩展包:

  • 使用 RxJava1 的 Observable

    <dependency>
        <groupId>org.glassfish.jersey.ext.rx</groupId>
        <artifactId>jersey-rx-client-rxjava</artifactId>
        <version>2.27</version>
    </dependency>
    
  • 使用 RxJava2 的 Flowable(推荐):

    <dependency>
        <groupId>org.glassfish.jersey.ext.rx</groupId>
        <artifactId>jersey-rx-client-rxjava2</artifactId>
        <version>2.27</version>
    </dependency>
    

⚠️ 注意:虽然官方文档有时写成 ObservableRxInvokerProvider,但正确类名是 RxObservableInvokerProvider,踩过坑的人都知道这点。

所有依赖均可在 Maven Central 找到最新版本。


3. 为什么需要响应式 JAX-RS 客户端

假设有三个微服务需要串联调用:

  • id-service:返回一批用户 ID(List<Long>
  • name-service:根据用户 ID 返回用户名
  • hash-service:对 “用户名 + ID” 进行哈希处理

我们先初始化客户端:

Client client = ClientBuilder.newClient();
WebTarget userIdService = client.target("http://localhost:8080/id-service/ids");
WebTarget nameService = client.target("http://localhost:8080/name-service/users/{userId}/name");
WebTarget hashService = client.target("http://localhost:8080/hash-service/{rawValue}");

接下来的问题是:如何高效地完成这一串调用?

JAX-RS 提供了三种模式:

  1. ✅ 同步阻塞(Synchronous)
  2. ✅ 异步回调(Asynchronous)
  3. ✅ 响应式流(Reactive)

我们逐个分析。

3.1 同步调用的痛点

最简单的写法就是层层嵌套同步请求:

List<Long> ids = userIdService.request().get(new GenericType<List<Long>>(){});
for (Long id : ids) {
    String name = nameService.resolveTemplate("userId", id).request().get(String.class);
    String hash = hashService.resolveTemplate("rawValue", name + id).request().get(String.class);
    // 处理结果
}

❌ 问题很明显:每个请求都阻塞线程,总耗时 = 所有请求耗时之和,I/O 密集场景下资源浪费严重。

3.2 异步回调的“回调地狱”

JAX-RS 提供了 async().get(InvocationCallback) 支持异步非阻塞调用:

CountDownLatch completionTracker = new CountDownLatch(expectedHashValues.size());

userIdService.request()
    .accept(MediaType.APPLICATION_JSON)
    .async()
    .get(new InvocationCallback<List<Long>>() {
        @Override
        public void completed(List<Long> employeeIds) {
            employeeIds.forEach(id -> {
                nameService.resolveTemplate("userId", id).request()
                    .async()
                    .get(new InvocationCallback<String>() {
                        @Override
                        public void completed(String name) {
                            hashService.resolveTemplate("rawValue", name + id).request()
                                .async()
                                .get(new InvocationCallback<String>() {
                                    @Override
                                    public void completed(String hash) {
                                        receivedHashValues.add(hash);
                                        completionTracker.countDown();
                                    }
                                });
                        }
                    });
            });
        }
    });

✅ 优点:非阻塞,节省线程资源
❌ 缺点:嵌套太深,代码可读性差,典型的“回调地狱”(Pyramid of Doom),维护成本高。

3.3 响应式:简洁又高效的解法

响应式编程的核心优势:

  • ✅ 代码流畅、易读
  • ✅ 链式调用,逻辑清晰
  • ✅ 线程资源利用率高
  • ✅ 错误处理统一

JAX-RS 提供了以下响应式组件支持:

组件 说明
CompletionStageRxInvoker 默认支持 Java 8 的 CompletionStage
RxObservableInvokerProvider 支持 RxJava1 的 Observable
RxFlowableInvokerProvider 支持 RxJava2 的 Flowable

还支持 SPI 扩展,可接入其他响应式库。


4. JAX-RS 响应式组件详解

4.1 使用 CompletionStage(推荐)

CompletionStage + CompletableFuture 是 Java 原生方案,无需额外依赖,简单粗暴又高效。

发起第一个请求获取用户 ID:

CompletionStage<List<Long>> userIdStage = userIdService.request()
    .accept(MediaType.APPLICATION_JSON)
    .rx()  // ⚠️ 关键:开启响应式调用
    .get(new GenericType<List<Long>>() {})
    .exceptionally((throwable) -> {
        logger.warn("获取用户ID失败", throwable);
        return null;
    });

然后链式处理后续调用:

List<String> receivedHashValues = new ArrayList<>();
CountDownLatch completionTracker = new CountDownLatch(expectedHashValues.size());

userIdStage.thenAcceptAsync(employeeIds -> {
    logger.info("收到用户ID: {}", employeeIds);
    
    employeeIds.forEach(id -> {
        CompletableFuture<String> nameFuture = nameService
            .resolveTemplate("userId", id)
            .request()
            .rx()
            .get(String.class)
            .toCompletableFuture();

        nameFuture.thenAccept(userName -> {
            logger.info("获取用户名: {}", userName);
            
            hashService
                .resolveTemplate("rawValue", userName + id)
                .request()
                .rx()
                .get(String.class)
                .toCompletableFuture()
                .thenAcceptAsync(hash -> {
                    logger.info("哈希结果: {}", hash);
                    receivedHashValues.add(hash);
                    completionTracker.countDown();
                })
                .exceptionally(throwable -> {
                    logger.warn("哈希计算失败: {}", id);
                    return null;
                });
        });
    });
});

// 单元测试中等待结果
if (!completionTracker.await(10, TimeUnit.SECONDS)) {
    logger.warn("部分请求超时");
}

assertThat(receivedHashValues).containsAll(expectedHashValues);

thenAcceptAsync 确保每个步骤异步执行,不阻塞主线程
✅ 整体流程清晰,易于调试和扩展
✅ 可自定义 Executor 控制线程池行为

4.2 使用 RxJava 的 Observable

若项目已使用 RxJava1,可切换为 Observable 模式。

⚠️ 注意:必须注册正确的 Provider:

Client client = ClientBuilder.newClient()
    .register(RxObservableInvokerProvider.class);  // 不是 ObservableRxInvokerProvider!

发起请求:

Observable<List<Long>> userIdObservable = userIdService
    .request()
    .rx(RxObservableInvoker.class)  // 指定使用 Observable 调用器
    .get(new GenericType<List<Long>>(){});

后续可使用标准 RxJava 操作符:

userIdObservable
    .flatMap(Observable::from)
    .flatMap(id -> 
        nameService.resolveTemplate("userId", id).request()
            .rx(RxObservableInvoker.class)
            .get(String.class)
            .map(name -> new User(id, name))
    )
    .flatMap(user ->
        hashService.resolveTemplate("rawValue", user.name + user.id).request()
            .rx(RxObservableInvoker.class)
            .get(String.class)
            .map(hash -> new HashedUser(user.id, hash))
    )
    .subscribe(
        result -> logger.info("处理结果: {}", result),
        error -> logger.error("出错了", error)
    );

4.3 使用 RxJava2 的 Flowable

Flowable 是 RxJava2 中支持背压(Backpressure)的版本,更适合高并发场景。

注册 Provider:

Client client = ClientBuilder.newClient()
    .register(RxFlowableInvokerProvider.class);

创建 Flowable 请求:

Flowable<List<Long>> userIdFlowable = userIdService
    .request()
    .rx(RxFlowableInvoker.class)
    .get(new GenericType<List<Long>>(){});

后续可使用 flatMapmerge 等操作符进行流式处理,逻辑与 Observable 类似。


5. 总结

JAX-RS 的响应式客户端 API 提供了多种非阻塞调用方式,显著提升 I/O 密集型场景的吞吐能力。

方案 推荐度 适用场景
CompletionStage ✅✅✅ Java 8+ 项目,轻量级,无额外依赖
Observable ✅✅ 已使用 RxJava1 的老项目
Flowable ✅✅✅ 高并发、需背压控制的系统

首选建议:使用 CompletionStage,原生支持、性能好、学习成本低。
复杂流处理:搭配 RxJava 的 Flowable 更灵活。

完整示例代码见:GitHub - spring-jersey


原始标题:Reactive JAX-RS Client API