1. 概述
本文将深入介绍如何使用 Jetty 提供的 Reactive HTTP 客户端。我们将通过多个小示例,演示它如何与主流响应式编程库(如 Project Reactor、RxJava 等)协同工作。
✅ 目标读者:已有响应式编程经验的 Java 开发者
⚠️ 注意:本文不讲解响应式基础概念,如 Publisher、Subscriber、背压等。如果你对这些还不熟悉,建议先阅读 Reactor 或 RxJava 入门文章。
2. 什么是 Reactive HttpClient?
Jetty 原生的 HttpClient
支持阻塞式 HTTP 请求。但在响应式系统中,我们需要非阻塞、流式处理的能力。为此,Jetty 提供了一个基于 ReactiveStreams
规范的封装层 —— Reactive HttpClient。
简单来说:
✅ Reactive HttpClient 的核心用途是:通过 HTTP 协议消费或生产数据流
本文示例将构建一个响应式客户端,与 Jetty 服务端通信,并展示不同响应式库下的使用方式。同时也会介绍请求/响应过程中的生命周期事件,便于调试和监控。
📌 建议提前了解:
3. Maven 依赖
首先在 pom.xml
中引入关键依赖。注意版本兼容性,避免踩坑。
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-reactive-httpclient</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>9.4.19.v20190610</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.11</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
<version>5.1.9.RELEASE</version>
</dependency>
📌 说明:
jetty-reactive-httpclient
是核心包jetty-server
用于启动本地测试服务- 其余为响应式生态常用库
4. 构建服务端与客户端
先搭建一个简单的 Echo 服务端,用于接收请求并原样返回 body。
服务端代码
public class RequestHandler extends AbstractHandler {
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request,
HttpServletResponse response) throws IOException, ServletException {
jettyRequest.setHandled(true);
response.setContentType(request.getContentType());
IO.copy(request.getInputStream(), response.getOutputStream());
}
}
// 启动服务
Server server = new Server(8080);
server.setHandler(new RequestHandler());
server.start();
客户端初始化
HttpClient httpClient = new HttpClient();
httpClient.start();
转换为响应式请求
关键一步:使用 ReactiveRequest.newBuilder()
包装原始请求,使其支持响应式流。
Request request = httpClient.newRequest("http://localhost:8080/");
ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request).build();
Publisher<ReactiveResponse> publisher = reactiveRequest.response();
✅ 到此为止,原本阻塞的 HttpClient
已被升级为响应式客户端。接下来可以对接各种响应式库。
5. 使用 Reactive Streams 原生接口
Jetty 的 Reactive Client 原生支持 Reactive Streams 规范。我们先用最基础的方式测试。
自定义阻塞 Subscriber
public class BlockingSubscriber implements Subscriber<ReactiveResponse> {
BlockingQueue<ReactiveResponse> sink = new LinkedBlockingQueue<>(1);
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(1); // ⚠️ 必须调用 request() 才能触发数据流
}
@Override
public void onNext(ReactiveResponse response) {
sink.offer(response);
}
@Override
public void onError(Throwable failure) { }
@Override
public void onComplete() { }
public ReactiveResponse block() throws InterruptedException {
return sink.poll(5, TimeUnit.SECONDS); // 超时保护
}
}
📌 踩坑提醒:
- 不调用
subscription.request()
→ 永远不会收到onNext
- 队列大小设为 1 是因为预期只接收一个响应
执行测试
BlockingSubscriber subscriber = new BlockingSubscriber();
publisher.subscribe(subscriber);
ReactiveResponse response = subscriber.block();
Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);
6. 与 Project Reactor 集成
Project Reactor 是 Spring WebFlux 的底层引擎,集成非常自然。
使用 Mono 接收响应
ReactiveResponse response = Mono.from(publisher).block();
断言结果
Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);
6.1 结合 Spring WebFlux 的 WebClient
更优雅的方式是使用 WebClient
,它支持插拔式 HTTP 客户端实现。
步骤一:绑定 Jetty 客户端
ClientHttpConnector clientConnector = new JettyClientHttpConnector(httpClient);
步骤二:构建 WebClient
WebClient client = WebClient.builder()
.clientConnector(clientConnector)
.build();
步骤三:发起响应式请求
String responseContent = client.post()
.uri("http://localhost:8080/")
.contentType(MediaType.TEXT_PLAIN)
.body(BodyInserters.fromPublisher(Mono.just("Hello World!"), String.class))
.retrieve()
.bodyToMono(String.class)
.block();
Assert.assertNotNull(responseContent);
Assert.assertEquals("Hello World!", responseContent);
✅ 优势:
- API 更简洁
- 与 Spring 生态无缝集成
- 支持函数式编程风格
7. 与 RxJava2 集成
RxJava2 同样基于 Reactive Streams,集成方式几乎一致。
构建带 body 的请求
ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request)
.content(ReactiveRequest.Content
.fromString("Hello World!", "text/plain", StandardCharsets.UTF_8))
.build();
// 直接获取字符串流
Publisher<String> publisher = reactiveRequest
.response(ReactiveResponse.Content.asString());
📌 提示:
ReactiveResponse.Content.asString()
:自动解析 body 为字符串ReactiveResponse.Content.discard()
:丢弃 body,仅关注状态码
使用 Single 接收结果
String responseContent = Single.fromPublisher(publisher)
.blockingGet();
Assert.assertEquals("Hello World!", responseContent);
✅ 小结:Reactor 的 Mono
和 RxJava 的 Single
在此处完全对等。
8. 请求与响应事件监听
Reactive HttpClient 提供了丰富的生命周期事件,可用于监控、日志、性能分析等场景。
监听请求事件
ReactiveRequest request = ReactiveRequest.newBuilder(httpClient, "http://localhost:8080/")
.content(ReactiveRequest.Content.fromString("Hello World!", "text/plain", UTF_8))
.build();
Publisher<ReactiveRequest.Event> requestEvents = request.requestEvents();
使用 RxJava 收集事件类型
List<Type> requestEventTypes = new ArrayList<>();
Flowable.fromPublisher(requestEvents)
.map(ReactiveRequest.Event::getType)
.subscribe(requestEventTypes::add);
Single<ReactiveResponse> response = Single.fromPublisher(request.response());
验证事件流程
int actualStatus = response.blockingGet().getStatus();
Assert.assertEquals(6, requestEventTypes.size()); // 通常包含 CONNECTED, REQUEST_BEGIN 等
Assert.assertEquals(HttpStatus.OK_200, actualStatus);
📌 常见事件类型(Request):
CONNECTED
REQUEST_BEGIN
REQUEST_HEADERS
REQUEST_CONTENT
REQUEST_END
RESPONSE_BEGIN
(虽属响应阶段,但归在 requestEvents 中)
⚠️ 注意:响应事件(response events)也可类似订阅,本文略过,完整代码见 GitHub。
9. 总结
本文系统介绍了 Jetty ReactiveStreams HTTP Client 的核心能力:
✅ 核心价值:
- 将传统 Jetty HttpClient 升级为响应式
- 原生支持 Reactive Streams 规范
- 无缝对接 Reactor、RxJava、WebClient 等主流框架
✅ 适用场景:
- 高并发流式数据传输
- 微服务间响应式通信
- 需要精细控制 HTTP 生命周期的系统
✅ 推荐用法:
- 日常开发优先使用
WebClient + JettyClientHttpConnector
- 底层框架开发可直接操作
ReactiveRequest
所有示例代码已托管至 GitHub:
👉 https://github.com/eugenp/tutorials/tree/master/libraries-http-2