1. 概述
本文探讨如何在 Jersey 框架中使用 JAX-RS 的响应式(Reactive)编程能力。我们假设你已经熟悉标准的 Jersey REST 客户端用法。
如果你了解过 响应式编程 的基本概念会更有帮助,但不是必须的——我们会从实战角度切入,带你避开常见坑点。
2. 依赖配置
首先,需要引入标准的 Jersey 客户端依赖:
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
<version>2.27</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId>
<version>2.27</version>
</dependency>
这两个是使用 Jersey 的基础依赖,提供了 JAX-RS 的核心能力。
✅ 若想集成响应式框架,还需额外引入对应的扩展包:
使用 RxJava1 的
Observable
:<dependency> <groupId>org.glassfish.jersey.ext.rx</groupId> <artifactId>jersey-rx-client-rxjava</artifactId> <version>2.27</version> </dependency>
使用 RxJava2 的
Flowable
(推荐):<dependency> <groupId>org.glassfish.jersey.ext.rx</groupId> <artifactId>jersey-rx-client-rxjava2</artifactId> <version>2.27</version> </dependency>
⚠️ 注意:虽然官方文档有时写成 ObservableRxInvokerProvider
,但正确类名是 RxObservableInvokerProvider
,踩过坑的人都知道这点。
所有依赖均可在 Maven Central 找到最新版本。
3. 为什么需要响应式 JAX-RS 客户端
假设有三个微服务需要串联调用:
id-service
:返回一批用户 ID(List<Long>
)name-service
:根据用户 ID 返回用户名hash-service
:对 “用户名 + ID” 进行哈希处理
我们先初始化客户端:
Client client = ClientBuilder.newClient();
WebTarget userIdService = client.target("http://localhost:8080/id-service/ids");
WebTarget nameService = client.target("http://localhost:8080/name-service/users/{userId}/name");
WebTarget hashService = client.target("http://localhost:8080/hash-service/{rawValue}");
接下来的问题是:如何高效地完成这一串调用?
JAX-RS 提供了三种模式:
- ✅ 同步阻塞(Synchronous)
- ✅ 异步回调(Asynchronous)
- ✅ 响应式流(Reactive)
我们逐个分析。
3.1 同步调用的痛点
最简单的写法就是层层嵌套同步请求:
List<Long> ids = userIdService.request().get(new GenericType<List<Long>>(){});
for (Long id : ids) {
String name = nameService.resolveTemplate("userId", id).request().get(String.class);
String hash = hashService.resolveTemplate("rawValue", name + id).request().get(String.class);
// 处理结果
}
❌ 问题很明显:每个请求都阻塞线程,总耗时 = 所有请求耗时之和,I/O 密集场景下资源浪费严重。
3.2 异步回调的“回调地狱”
JAX-RS 提供了 async().get(InvocationCallback)
支持异步非阻塞调用:
CountDownLatch completionTracker = new CountDownLatch(expectedHashValues.size());
userIdService.request()
.accept(MediaType.APPLICATION_JSON)
.async()
.get(new InvocationCallback<List<Long>>() {
@Override
public void completed(List<Long> employeeIds) {
employeeIds.forEach(id -> {
nameService.resolveTemplate("userId", id).request()
.async()
.get(new InvocationCallback<String>() {
@Override
public void completed(String name) {
hashService.resolveTemplate("rawValue", name + id).request()
.async()
.get(new InvocationCallback<String>() {
@Override
public void completed(String hash) {
receivedHashValues.add(hash);
completionTracker.countDown();
}
});
}
});
});
}
});
✅ 优点:非阻塞,节省线程资源
❌ 缺点:嵌套太深,代码可读性差,典型的“回调地狱”(Pyramid of Doom),维护成本高。
3.3 响应式:简洁又高效的解法
响应式编程的核心优势:
- ✅ 代码流畅、易读
- ✅ 链式调用,逻辑清晰
- ✅ 线程资源利用率高
- ✅ 错误处理统一
JAX-RS 提供了以下响应式组件支持:
组件 | 说明 |
---|---|
CompletionStageRxInvoker |
默认支持 Java 8 的 CompletionStage |
RxObservableInvokerProvider |
支持 RxJava1 的 Observable |
RxFlowableInvokerProvider |
支持 RxJava2 的 Flowable |
还支持 SPI 扩展,可接入其他响应式库。
4. JAX-RS 响应式组件详解
4.1 使用 CompletionStage
(推荐)
CompletionStage
+ CompletableFuture
是 Java 原生方案,无需额外依赖,简单粗暴又高效。
发起第一个请求获取用户 ID:
CompletionStage<List<Long>> userIdStage = userIdService.request()
.accept(MediaType.APPLICATION_JSON)
.rx() // ⚠️ 关键:开启响应式调用
.get(new GenericType<List<Long>>() {})
.exceptionally((throwable) -> {
logger.warn("获取用户ID失败", throwable);
return null;
});
然后链式处理后续调用:
List<String> receivedHashValues = new ArrayList<>();
CountDownLatch completionTracker = new CountDownLatch(expectedHashValues.size());
userIdStage.thenAcceptAsync(employeeIds -> {
logger.info("收到用户ID: {}", employeeIds);
employeeIds.forEach(id -> {
CompletableFuture<String> nameFuture = nameService
.resolveTemplate("userId", id)
.request()
.rx()
.get(String.class)
.toCompletableFuture();
nameFuture.thenAccept(userName -> {
logger.info("获取用户名: {}", userName);
hashService
.resolveTemplate("rawValue", userName + id)
.request()
.rx()
.get(String.class)
.toCompletableFuture()
.thenAcceptAsync(hash -> {
logger.info("哈希结果: {}", hash);
receivedHashValues.add(hash);
completionTracker.countDown();
})
.exceptionally(throwable -> {
logger.warn("哈希计算失败: {}", id);
return null;
});
});
});
});
// 单元测试中等待结果
if (!completionTracker.await(10, TimeUnit.SECONDS)) {
logger.warn("部分请求超时");
}
assertThat(receivedHashValues).containsAll(expectedHashValues);
✅ thenAcceptAsync
确保每个步骤异步执行,不阻塞主线程
✅ 整体流程清晰,易于调试和扩展
✅ 可自定义 Executor
控制线程池行为
4.2 使用 RxJava 的 Observable
若项目已使用 RxJava1,可切换为 Observable
模式。
⚠️ 注意:必须注册正确的 Provider:
Client client = ClientBuilder.newClient()
.register(RxObservableInvokerProvider.class); // 不是 ObservableRxInvokerProvider!
发起请求:
Observable<List<Long>> userIdObservable = userIdService
.request()
.rx(RxObservableInvoker.class) // 指定使用 Observable 调用器
.get(new GenericType<List<Long>>(){});
后续可使用标准 RxJava 操作符:
userIdObservable
.flatMap(Observable::from)
.flatMap(id ->
nameService.resolveTemplate("userId", id).request()
.rx(RxObservableInvoker.class)
.get(String.class)
.map(name -> new User(id, name))
)
.flatMap(user ->
hashService.resolveTemplate("rawValue", user.name + user.id).request()
.rx(RxObservableInvoker.class)
.get(String.class)
.map(hash -> new HashedUser(user.id, hash))
)
.subscribe(
result -> logger.info("处理结果: {}", result),
error -> logger.error("出错了", error)
);
4.3 使用 RxJava2 的 Flowable
Flowable
是 RxJava2 中支持背压(Backpressure)的版本,更适合高并发场景。
注册 Provider:
Client client = ClientBuilder.newClient()
.register(RxFlowableInvokerProvider.class);
创建 Flowable
请求:
Flowable<List<Long>> userIdFlowable = userIdService
.request()
.rx(RxFlowableInvoker.class)
.get(new GenericType<List<Long>>(){});
后续可使用 flatMap
、merge
等操作符进行流式处理,逻辑与 Observable
类似。
5. 总结
JAX-RS 的响应式客户端 API 提供了多种非阻塞调用方式,显著提升 I/O 密集型场景的吞吐能力。
方案 | 推荐度 | 适用场景 |
---|---|---|
CompletionStage |
✅✅✅ | Java 8+ 项目,轻量级,无额外依赖 |
Observable |
✅✅ | 已使用 RxJava1 的老项目 |
Flowable |
✅✅✅ | 高并发、需背压控制的系统 |
✅ 首选建议:使用 CompletionStage
,原生支持、性能好、学习成本低。
✅ 复杂流处理:搭配 RxJava 的 Flowable
更灵活。
完整示例代码见:GitHub - spring-jersey