1. 概述

本文将带你快速了解 Finagle —— Twitter 开源的高性能 RPC 框架。

我们将使用它构建一个简单的 HTTP 客户端与服务端通信示例。整个过程简洁明了,适合有一定分布式系统经验的开发者参考。✅

2. 核心概念

在动手编码前,先熟悉 Finagle 的几个关键抽象。这些概念虽然在其他框架中也常见,但在 Finagle 中有其特定语义。

2.1. Service(服务)

Service 是一个函数式组件,接收请求对象并返回一个 Future<Response>。你可以把它理解为一个“处理单元”:

  • 输入:Request
  • 输出:Future<Response>(异步结果)
  • 状态:Pending(进行中)、Succeeded(成功)、Failed(失败)

⚠️ 注意:它不是 Spring 中的 Service 层,而是更底层的请求处理器。

2.2. Filter(过滤器)

Filter 类似于 AOP 中的切面,用于在请求前后执行通用逻辑,比如日志、鉴权、监控等。

它的调用链结构如下:

Request → Filter → Service → Filter → Response

Filter 接收请求和服务实例,可对请求预处理,调用服务后还能对返回的 Future 做后处理。

✅ 实际开发中常用于:

  • 打印访问日志
  • 添加 traceId
  • 统一异常处理

2.3. Future(未来)

Future 是 Scala 中的异步编程核心,代表一个尚未完成的操作结果。Finagle 所有 I/O 操作都是非阻塞的,返回 Future

常见状态:

  • succeeded:操作成功
  • failed:操作失败
  • pending:仍在执行

与 Java 的 CompletableFuture 类似,但更轻量、更适合高并发场景。

3. 实现 Service

我们来实现一个简单的 HTTP 问候服务:接收 name 参数,返回 "Hello {name}"

需要继承 Service<Request, Response> 并重写 apply 方法:

public class GreetingService extends Service<Request, Response> {
    @Override
    public Future<Response> apply(Request request) {
        String greeting = "Hello " + request.getParam("name");
        Reader<Buf> reader = Reader.fromBuf(new Buf.ByteArray(greeting.getBytes(), 0, greeting.length()));
        return Future.value(Response.apply(request.version(), Status.Ok(), reader));
    }
}

📌 踩坑提示:

  • getParam("name") 会自动解析查询参数(如 ?name=John
  • 响应体需包装为 Reader<Buf>,不能直接传字符串
  • Future.value() 用于立即成功完成的 Future

虽然看起来像 Java 8 函数式接口,但由于 Finagle 是 Scala 编写的,我们只能通过继承方式实现。

4. 实现 Filter

接下来写一个日志过滤器,打印请求的基本信息。

使用 SimpleFilter<Request, Response> 可简化泛型声明(否则要写四个类型参数):

public class LogFilter extends SimpleFilter<Request, Response> {
    @Override
    public Future apply(Request request, Service<Request, Response> service) {
        logger.info("Request host:" + request.host().getOrElse(() -> ""));
        logger.info("Request params:");
        request.getParams().forEach(entry -> logger.info("\t" + entry.getKey() + " : " + entry.getValue()));
        return service.apply(request);
    }
}

📌 关键点:

  • request.host().getOrElse(() -> ""):Scala 风格的空值处理
  • service.apply(request):将处理交给下一个组件(通常是 Service)
  • 日志逻辑在请求前执行,响应后也可加处理(如耗时统计)

这个 Filter 可复用在客户端和服务端。

5. 启动 Server

现在把 Filter 和 Service 组装起来,启动 HTTP 服务。

使用 andThen 将过滤器和服务串联成处理链:

Service serverService = new LogFilter().andThen(new GreetingService()); 
Http.serve(":8080", serverService);

✅ 简单粗暴三步走:

  1. new LogFilter():创建日志拦截
  2. .andThen(new GreetingService()):链式拼接业务逻辑
  3. Http.serve(port, service):绑定端口启动

服务启动后监听 localhost:8080,收到请求时会先走日志 Filter,再进入 GreetingService。

6. 编写 Client

客户端同样使用 Service<Request, Response> 抽象,通过 Http.newService 创建连接池:

Service<Request, Response> clientService = new LogFilter().andThen(Http.newService("localhost:8080"));
Request request = Request.apply(Method.Get(), "/?name=John");
request.host("localhost");
Future<Response> response = clientService.apply(request);

Await.result(response
        .onSuccess(r -> {
            assertEquals("Hello John", r.getContentString());
            return BoxedUnit.UNIT;
        })
        .onFailure(r -> {
            throw new RuntimeException(r);
        })
);

📌 关键说明:

  • Http.newService("localhost:8080"):创建指向本地服务的客户端
  • request.host("localhost"):必须设置 Host 头,否则 HTTP 1.1 请求会失败
  • onSuccess / onFailure:注册回调,避免阻塞等待
  • Await.result():仅用于测试!生产环境应避免同步等待

⚠️ 注意返回值 BoxedUnit.UNIT

  • 这是 Scala 中 void 的等价物
  • 回调函数签名要求返回 Unit,Java 侧需用 BoxedUnit.UNIT 兼容

7. 总结

本文通过一个简单示例展示了 Finagle 的核心用法:

  • ✅ 使用 Service 实现业务逻辑
  • ✅ 利用 Filter 实现横切关注点(如日志)
  • ✅ 通过 andThen 构建处理链
  • ✅ 客户端与服务端统一抽象为 Service
  • ✅ 全异步非阻塞,基于 Future 编程

Finagle 的设计非常函数式且灵活,适合构建高并发微服务系统。虽然学习曲线略陡,但一旦掌握,能写出高效、可组合的网络程序。

示例代码已托管至 GitHub:https://github.com/baeldung/tutorials/tree/master/libraries-rpc


原始标题:Introduction to Finagle | Baeldung