1. 引言

Ratpack是基于Netty引擎构建的框架,能快速构建HTTP应用。我们之前的文章已经介绍了其基础用法,本文将重点展示如何利用其流式API实现反应式应用

2. 反应式流快速回顾

进入实战前,先快速回顾反应式应用的核心特征(根据原始定义):

  • 响应式(Responsive)
  • 弹性(Resilient)
  • 可伸缩(Elastic)
  • 消息驱动(Message Driven)

反应式流如何帮助实现这些特性?这里的"消息驱动"不一定指消息中间件,核心是异步请求处理和非阻塞背压支持

Ratpack的反应式支持基于JVM的Reactive Streams API标准实现,因此可与Project Reactor、RxJava等框架互操作

3. 使用Ratpack的Streams

Ratpack的*Streams类提供多种工具方法创建Publisher*实例,用于构建数据处理管道

publish()方法开始,它能将任何Iterable转换为Publisher

Publisher<String> pub = Streams.publish(Arrays.asList("hello", "hello again"));
LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
pub.subscribe(sub);
sub.block();

LoggingSubscriber是测试用的Subscriber实现,会记录每个发布对象。其block()方法会阻塞调用者直到发布完成或出错。运行结果:

onSubscribe: sub=7311908
onNext: sub=7311908, value=hello
onNext: sub=7311908, value=hello again
onComplete: sub=7311908

另一个实用方法是yield(),通过Function参数生成下一个对象:

@Test
public void whenYield_thenSuccess() {
    
    Publisher<String> pub = Streams.yield((t) -> {
        return t.getRequestNum() < 5 ? "hello" : null;
    });
    
    LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
    pub.subscribe(sub);
    sub.block();
    assertEquals(5, sub.getReceived());
}

YieldRequestgetRequestNum()提供已发布对象计数,可用于终止流(返回null)。创建周期性事件发布器:

@Test
public void whenPeriodic_thenSuccess() {
    ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
    Publisher<String> pub = Streams.periodically(executor, Duration.ofSeconds(1), (t) -> {
        return t < 5 ? String.format("hello %d",t): null; 
    });

    LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
    pub.subscribe(sub);
    sub.block();
    assertEquals(5, sub.getReceived());
}

返回的发布器使用ScheduledExecutorService周期调用生产函数,直到返回null。生产函数接收已发布对象计数用于终止流。

4. 使用TransformablePublisher

细看Streams方法会发现它们大多返回TransformablePublisher。此接口扩展了Publisher,提供多种实用方法(类似Project Reactor的Flux/Mono),便于构建复杂处理管道

示例:用map将整数序列转为字符串:

@Test
public void whenMap_thenSuccess() throws Exception {
    TransformablePublisher<String> pub = Streams.yield( t -> {
        return t.getRequestNum() < 5 ? t.getRequestNum() : null;
      })
      .map(v -> String.format("item %d", v));
    
    ExecResult<List<String>> result = ExecHarness.yieldSingle((c) -> pub.toList());
    assertTrue("should succeed", result.isSuccess());
    assertEquals("should have 5 items",5,result.getValue().size());
}

实际执行在测试工具类ExecHarness管理的线程池中进行。因yieldSingle()需要*Promise*,我们用toList()适配——此方法收集所有结果到List

注意:文档警告此方法可能耗尽内存!建议仅用于单元测试

map()外,TransformablePublisher还提供:

  • filter():按Predicate过滤上游对象
  • take():仅取前n个对象
  • wiretap():添加观测点检查流经数据
  • reduce():将上游对象归约为单值
  • transform():注入常规Publisher

5. 使用buffer()处理非规范Publisher

某些场景需处理无视请求数量、过度发送数据的Publisher。Ratpack的buffer()方法可将多余数据暂存内存直到被消费。

创建非规范Publisher(忽略请求数,始终多发送5个):

private class NonCompliantPublisher implements Publisher<Integer> {

    @Override
    public void subscribe(Subscriber<? super Integer> subscriber) {
        log.info("subscribe");
        subscriber.onSubscribe(new NonCompliantSubscription(subscriber));
    }
    
    private class NonCompliantSubscription implements Subscription {
        private Subscriber<? super Integer> subscriber;
        private int recurseLevel = 0;

        public NonCompliantSubscription(Subscriber<? super Integer> subscriber) {
            this.subscriber = subscriber;
        }

        @Override
        public void request(long n) {
            log.info("request: n={}", n);
            if ( recurseLevel > 0 ) {
               return;
            }
            recurseLevel++;
            for (int i = 0 ; i < (n + 5) ; i ++ ) {
                subscriber.onNext(i);
            }
            subscriber.onComplete();
        }

        @Override
        public void cancel() {
        }
    }
}

先用LoggingSubscriber测试,用take()限制只取1项:

@Test
public void whenNonCompliantPublisherWithoutBuffer_thenSuccess() throws Exception {
    TransformablePublisher<Integer> pub = Streams.transformable(new NonCompliantPublisher())
      .wiretap(new LoggingAction(""))
      .take(1);
      
    LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
    pub.subscribe(sub);
    sub.block();
}

运行发现即使收到cancel(),非规范发布器仍持续发送数据

RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=583189145, value=0
RatpackStreamsUnitTest - : event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - request: n=1
RatpackStreamsUnitTest - : event=StreamEvent[CancelEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=583189145
RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=1}]
... more expurious data event
RatpackStreamsUnitTest - : event=StreamEvent[CompletionEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=583189145

添加buffer()并增加日志观察效果:

@Test
public void whenNonCompliantPublisherWithBuffer_thenSuccess() throws Exception {
    TransformablePublisher<Integer> pub = Streams.transformable(new NonCompliantPublisher())
      .wiretap(new LoggingAction("before buffer"))
      .buffer()
      .wiretap(new LoggingAction("after buffer"))
      .take(1);
      
    LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
    pub.subscribe(sub);
    sub.block();
}

日志显示非规范发布器发送了所有数据,但下游仍按需逐项发送

LoggingSubscriber - onSubscribe: sub=675852144
RatpackStreamsUnitTest - after buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - subscribe
RatpackStreamsUnitTest - before buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
NonCompliantPublisher - request: n=1
RatpackStreamsUnitTest - before buffer: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
... more data events
RatpackStreamsUnitTest - before buffer: event=StreamEvent[CompletionEvent{subscriptionId=0}]
RatpackStreamsUnitTest - after buffer: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=675852144, value=0
RatpackStreamsUnitTest - after buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
RatpackStreamsUnitTest - after buffer: event=StreamEvent[CancelEvent{subscriptionId=0}]
RatpackStreamsUnitTest - before buffer: event=StreamEvent[CancelEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=67585214

6. 使用*batch()*优化慢速订阅者

下游订阅者请求数据量过小(如LoggingSubscriber每次只请求1项)会降低吞吐量。实际应用中这会导致大量上下文切换,影响性能batch()方法允许上游使用更高效的请求大小,同时下游保持小请求量。

先看无batch的情况:

@Test
public void whenCompliantPublisherWithoutBatch_thenSuccess() throws Exception {
    TransformablePublisher<Integer> pub = Streams.transformable(new CompliantPublisher(10))
      .wiretap(new LoggingAction(""));
      
    LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
    pub.subscribe(sub);
    sub.block();
}

CompliantPublisher是测试发布器,生成0到n-1的整数。日志显示逐项发送

CompliantPublisher - subscribe
LoggingSubscriber - onSubscribe: sub=-779393331
RatpackStreamsUnitTest - : event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
CompliantPublisher - request: requested=1, available=10
RatpackStreamsUnitTest - : event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=-779393331, value=0
... more data events omitted
CompliantPublisher - request: requested=1, available=1
RatpackStreamsUnitTest - : event=StreamEvent[CompletionEvent{subscriptionId=0}]
LoggingSubscriber - onComplete: sub=-779393331

添加batch()使上游每次发送5项:

@Test
public void whenCompliantPublisherWithBatch_thenSuccess() throws Exception {
    
    TransformablePublisher<Integer> pub = Streams.transformable(new CompliantPublisher(10))
      .wiretap(new LoggingAction("before batch"))
      .batch(5, Action.noop())
      .wiretap(new LoggingAction("after batch"));
      
    LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
    pub.subscribe(sub);
    sub.block();
}

batch()接收两个参数:每次请求的项数和处理丢弃项的Action(如错误或取消时)。日志显示上游现在每次收到5项请求

LoggingSubscriber - onSubscribe: sub=-1936924690
RatpackStreamsUnitTest - after batch: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
CompliantPublisher - subscribe
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=10
RatpackStreamsUnitTest - before batch: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
... first batch data events omitted
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=6
RatpackStreamsUnitTest - before batch: event=StreamEvent[DataEvent{subscriptionId=0, data=5}]
... second batch data events omitted
RatpackStreamsUnitTest - before batch: event=StreamEvent[RequestEvent{requestAmount=5, subscriptionId=0}]
CompliantPublisher - request: requested=5, available=1
RatpackStreamsUnitTest - before batch: event=StreamEvent[CompletionEvent{subscriptionId=0}]
RatpackStreamsUnitTest - after buffer: event=StreamEvent[DataEvent{subscriptionId=0, data=0}]
LoggingSubscriber - onNext: sub=-1936924690, value=0
RatpackStreamsUnitTest - after buffer: event=StreamEvent[RequestEvent{requestAmount=1, subscriptionId=0}]
RatpackStreamsUnitTest - after buffer: event=StreamEvent[DataEvent{subscriptionId=0, data=1}]
... downstream data events omitted
LoggingSubscriber - onComplete: sub=-1936924690

注意:因单线程执行,batch()会持续缓冲直到收到onComplete()信号。

7. 在Web应用中使用流

Ratpack支持将反应式流与异步Web框架结合。

7.1 接收数据流

通过处理器的Context获取的Request对象提供getBodyStream(),返回ByteBufTransformablePublisher。可基于此构建处理管道:

@Bean
public Action<Chain> uploadFile() {
    
    return chain -> chain.post("upload", ctx -> {
        TransformablePublisher<? extends ByteBuf> pub = ctx.getRequest().getBodyStream();
        pub.subscribe(new Subscriber<ByteBuf>() {
            private Subscription sub;
            @Override
            public void onSubscribe(Subscription sub) {
                this.sub = sub;
                sub.request(1);
            }

            @Override
            public void onNext(ByteBuf t) {
                try {
                    ... do something useful with received data
                    sub.request(1);
                }
                finally {
                    // 切记释放!
                    t.release();
                }
            }

            @Override
            public void onError(Throwable t) {
                ctx.getResponse().status(500);
            }

            @Override
            public void onComplete() {
                ctx.getResponse().status(202);
            }
        }); 
    });
}

实现订阅者时注意两点:

  1. 必须调用ByteBuf.release()否则会内存泄漏
  2. 异步处理只能用Ratpack原语(如PromiseBlocking等)

7.2 发送数据流

最直接的方式是Response.sendStream(),接收ByteBuf发布器并发送数据,自动应用背压防溢出:

@Bean
public Action<Chain> download() {
    return chain -> chain.get("download", ctx -> {
        ctx.getResponse().sendStream(new RandomBytesPublisher(1024,512));
    });
}

缺点:不会自动设置任何头部(包括Content-Length),可能影响客户端:

$ curl -v --output data.bin http://localhost:5050/download
... request messages omitted
< HTTP/1.1 200 OK
< transfer-encoding: chunked
... download progress messages omitted

推荐方式:用处理器的Context.render()传递ResponseChunks对象。此时响应使用分块传输编码。创建ResponseChunks的简单方法:

@Bean
public Action<Chain> downloadChunks() {
    return chain -> chain.get("downloadChunks", ctx -> {
        ctx.render(ResponseChunks.bufferChunks("application/octetstream",
          new RandomBytesPublisher(1024,512)));
    });
}

现在响应包含content-type头部:

$ curl -v --output data.bin http://localhost:5050/downloadChunks
... request messages omitted
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< content-type: application/octetstream
<
... progress messages omitted

7.3 使用服务器推送事件(SSE)

SSE支持也通过render()实现,使用ServerSentEventsProducer的项适配为带元数据的Event对象:

@Bean
public Action<Chain> quotes() {
    ServerSentEvents sse = ServerSentEvents.serverSentEvents(quotesService.newTicker(), (evt) -> {
        evt
          .id(Long.toString(idSeq.incrementAndGet()))
          .event("quote")
          .data( q -> q.toString());
    });
    
    return chain -> chain.get("quotes", ctx -> ctx.render(sse));
}

QuotesService是示例服务,定期生成随机报价。第二个参数是事件处理函数,添加id、事件类型和载荷。用curl测试:

$ curl -v http://localhost:5050/quotes
... request messages omitted
< HTTP/1.1 200 OK
< content-type: text/event-stream;charset=UTF-8
< transfer-encoding: chunked
... other response headers omitted
id: 10
event: quote
data: Quote [ts=2021-10-11T01:20:52.081Z, symbol=ORCL, value=53.0]

... more quotes

7.4 广播WebSocket数据

可用Websockets.websocketBroadcast()将任意Publisher数据管道输出到WebSocket

@Bean
public Action<Chain> quotesWS() {
    Publisher<String> pub = Streams.transformable(quotesService.newTicker())
      .map(Quote::toString);
    return chain -> chain.get("quotes-ws", ctx -> WebSockets.websocketBroadcast(ctx, pub));
}

复用之前的QuotesService作为事件源。用curl模拟WebSocket客户端:

$ curl --include -v \
     --no-buffer \
     --header "Connection: Upgrade" \
     --header "Upgrade: websocket" \
     --header "Sec-WebSocket-Key: SGVsbG8sIHdvcmxkIQ==" \
     --header "Sec-WebSocket-Version: 13" \
     http://localhost:5050/quotes-ws
... request messages omitted
< HTTP/1.1 101 Switching Protocols
HTTP/1.1 101 Switching Protocols
< upgrade: websocket
upgrade: websocket
< connection: upgrade
connection: upgrade
< sec-websocket-accept: qGEgH3En71di5rrssAZTmtRTyFk=
sec-websocket-accept: qGEgH3En71di5rrssAZTmtRTyFk=

<
<Quote [ts=2021-10-11T01:39:42.915Z, symbol=ORCL, value=63.0]
... more quotes omitted

8. 总结

本文探讨了Ratpack对反应式流的支持及其在不同场景的应用。完整示例代码见GitHub


原始标题:Reactive Streams API with Ratpack