1. 概述

本文将深入介绍如何使用 Jetty 提供的 Reactive HTTP 客户端。我们将通过多个小示例,演示它如何与主流响应式编程库(如 Project Reactor、RxJava 等)协同工作。

✅ 目标读者:已有响应式编程经验的 Java 开发者
⚠️ 注意:本文不讲解响应式基础概念,如 Publisher、Subscriber、背压等。如果你对这些还不熟悉,建议先阅读 Reactor 或 RxJava 入门文章。


2. 什么是 Reactive HttpClient?

Jetty 原生的 HttpClient 支持阻塞式 HTTP 请求。但在响应式系统中,我们需要非阻塞、流式处理的能力。为此,Jetty 提供了一个基于 ReactiveStreams 规范的封装层 —— Reactive HttpClient

简单来说:

✅ Reactive HttpClient 的核心用途是:通过 HTTP 协议消费或生产数据流

本文示例将构建一个响应式客户端,与 Jetty 服务端通信,并展示不同响应式库下的使用方式。同时也会介绍请求/响应过程中的生命周期事件,便于调试和监控。

📌 建议提前了解:


3. Maven 依赖

首先在 pom.xml 中引入关键依赖。注意版本兼容性,避免踩坑。

<dependency>
    <groupId>org.eclipse.jetty</groupId>
    <artifactId>jetty-reactive-httpclient</artifactId>
    <version>1.0.3</version>
</dependency>
<dependency>
    <groupId>org.eclipse.jetty</groupId>
    <artifactId>jetty-server</artifactId>
    <version>9.4.19.v20190610</version>
</dependency>
<dependency>
    <groupId>org.reactivestreams</groupId>
    <artifactId>reactive-streams</artifactId>
    <version>1.0.3</version>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.6.0</version>
</dependency>
<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.11</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-webflux</artifactId>
    <version>5.1.9.RELEASE</version>
</dependency>

📌 说明:

  • jetty-reactive-httpclient 是核心包
  • jetty-server 用于启动本地测试服务
  • 其余为响应式生态常用库

4. 构建服务端与客户端

先搭建一个简单的 Echo 服务端,用于接收请求并原样返回 body。

服务端代码

public class RequestHandler extends AbstractHandler {
    @Override
    public void handle(String target, Request jettyRequest, HttpServletRequest request,
                      HttpServletResponse response) throws IOException, ServletException {
        jettyRequest.setHandled(true);
        response.setContentType(request.getContentType());
        IO.copy(request.getInputStream(), response.getOutputStream());
    }
}

// 启动服务
Server server = new Server(8080);
server.setHandler(new RequestHandler());
server.start();

客户端初始化

HttpClient httpClient = new HttpClient();
httpClient.start();

转换为响应式请求

关键一步:使用 ReactiveRequest.newBuilder() 包装原始请求,使其支持响应式流。

Request request = httpClient.newRequest("http://localhost:8080/");
ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request).build();
Publisher<ReactiveResponse> publisher = reactiveRequest.response();

✅ 到此为止,原本阻塞的 HttpClient 已被升级为响应式客户端。接下来可以对接各种响应式库。


5. 使用 Reactive Streams 原生接口

Jetty 的 Reactive Client 原生支持 Reactive Streams 规范。我们先用最基础的方式测试。

自定义阻塞 Subscriber

public class BlockingSubscriber implements Subscriber<ReactiveResponse> {
    BlockingQueue<ReactiveResponse> sink = new LinkedBlockingQueue<>(1);

    @Override
    public void onSubscribe(Subscription subscription) { 
        subscription.request(1); // ⚠️ 必须调用 request() 才能触发数据流
    }
  
    @Override 
    public void onNext(ReactiveResponse response) { 
        sink.offer(response);
    } 
   
    @Override 
    public void onError(Throwable failure) { } 

    @Override 
    public void onComplete() { }

    public ReactiveResponse block() throws InterruptedException {
        return sink.poll(5, TimeUnit.SECONDS); // 超时保护
    }   
}

📌 踩坑提醒:

  • 不调用 subscription.request() → 永远不会收到 onNext
  • 队列大小设为 1 是因为预期只接收一个响应

执行测试

BlockingSubscriber subscriber = new BlockingSubscriber();
publisher.subscribe(subscriber);
ReactiveResponse response = subscriber.block();

Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);

6. 与 Project Reactor 集成

Project Reactor 是 Spring WebFlux 的底层引擎,集成非常自然。

使用 Mono 接收响应

ReactiveResponse response = Mono.from(publisher).block();

断言结果

Assert.assertNotNull(response);
Assert.assertEquals(response.getStatus(), HttpStatus.OK_200);

6.1 结合 Spring WebFlux 的 WebClient

更优雅的方式是使用 WebClient,它支持插拔式 HTTP 客户端实现。

步骤一:绑定 Jetty 客户端

ClientHttpConnector clientConnector = new JettyClientHttpConnector(httpClient);

步骤二:构建 WebClient

WebClient client = WebClient.builder()
    .clientConnector(clientConnector)
    .build();

步骤三:发起响应式请求

String responseContent = client.post()
    .uri("http://localhost:8080/")
    .contentType(MediaType.TEXT_PLAIN)
    .body(BodyInserters.fromPublisher(Mono.just("Hello World!"), String.class))
    .retrieve()
    .bodyToMono(String.class)
    .block();

Assert.assertNotNull(responseContent);
Assert.assertEquals("Hello World!", responseContent);

✅ 优势:

  • API 更简洁
  • 与 Spring 生态无缝集成
  • 支持函数式编程风格

7. 与 RxJava2 集成

RxJava2 同样基于 Reactive Streams,集成方式几乎一致。

构建带 body 的请求

ReactiveRequest reactiveRequest = ReactiveRequest.newBuilder(request)
    .content(ReactiveRequest.Content
        .fromString("Hello World!", "text/plain", StandardCharsets.UTF_8))
    .build();

// 直接获取字符串流
Publisher<String> publisher = reactiveRequest
    .response(ReactiveResponse.Content.asString());

📌 提示:

  • ReactiveResponse.Content.asString():自动解析 body 为字符串
  • ReactiveResponse.Content.discard():丢弃 body,仅关注状态码

使用 Single 接收结果

String responseContent = Single.fromPublisher(publisher)
    .blockingGet();

Assert.assertEquals("Hello World!", responseContent);

✅ 小结:Reactor 的 Mono 和 RxJava 的 Single 在此处完全对等。


8. 请求与响应事件监听

Reactive HttpClient 提供了丰富的生命周期事件,可用于监控、日志、性能分析等场景。

监听请求事件

ReactiveRequest request = ReactiveRequest.newBuilder(httpClient, "http://localhost:8080/")
    .content(ReactiveRequest.Content.fromString("Hello World!", "text/plain", UTF_8))
    .build();

Publisher<ReactiveRequest.Event> requestEvents = request.requestEvents();

使用 RxJava 收集事件类型

List<Type> requestEventTypes = new ArrayList<>();

Flowable.fromPublisher(requestEvents)
    .map(ReactiveRequest.Event::getType)
    .subscribe(requestEventTypes::add);

Single<ReactiveResponse> response = Single.fromPublisher(request.response());

验证事件流程

int actualStatus = response.blockingGet().getStatus();

Assert.assertEquals(6, requestEventTypes.size()); // 通常包含 CONNECTED, REQUEST_BEGIN 等
Assert.assertEquals(HttpStatus.OK_200, actualStatus);

📌 常见事件类型(Request):

  • CONNECTED
  • REQUEST_BEGIN
  • REQUEST_HEADERS
  • REQUEST_CONTENT
  • REQUEST_END
  • RESPONSE_BEGIN(虽属响应阶段,但归在 requestEvents 中)

⚠️ 注意:响应事件(response events)也可类似订阅,本文略过,完整代码见 GitHub。


9. 总结

本文系统介绍了 Jetty ReactiveStreams HTTP Client 的核心能力:

✅ 核心价值:

  • 将传统 Jetty HttpClient 升级为响应式
  • 原生支持 Reactive Streams 规范
  • 无缝对接 Reactor、RxJava、WebClient 等主流框架

✅ 适用场景:

  • 高并发流式数据传输
  • 微服务间响应式通信
  • 需要精细控制 HTTP 生命周期的系统

✅ 推荐用法:

  • 日常开发优先使用 WebClient + JettyClientHttpConnector
  • 底层框架开发可直接操作 ReactiveRequest

所有示例代码已托管至 GitHub:

👉 https://github.com/eugenp/tutorials/tree/master/libraries-http-2


原始标题:Jetty ReactiveStreams HTTP Client