1. 概述

本教程将探讨在 gRPC 服务器应用中如何使用拦截器处理全局异常。

拦截器可以在请求到达 RPC 方法前对其进行验证或操作。因此,它们非常适合处理应用中的通用关注点,如日志记录、安全、缓存、审计、身份验证和授权等。

应用也可以将拦截器用作全局异常处理器。

2. 拦截器作为全局异常处理器

拦截器主要能处理两类异常:

  • 处理从方法中逃逸且未被捕获的未知运行时异常
  • 处理从其他下游拦截器中逃逸的异常

拦截器有助于建立集中式异常处理框架,使应用能采用一致且健壮的标准方式处理异常。

它们可以通过多种方式处理异常:

  • 记录或持久化异常用于审计或报告
  • 创建支持工单
  • 在返回给客户端前修改或丰富错误响应

3. 全局异常处理器的高层设计

拦截器可以将传入请求转发给目标 RPC 服务。但当目标 RPC 方法抛出异常时,它能捕获并妥善处理

假设有一个订单处理微服务。我们将通过拦截器开发一个全局异常处理器,用于捕获微服务中 RPC 方法逃逸的异常。此外,该拦截器还能捕获从其他下游拦截器逃逸的异常,然后调用工单服务在工单系统中创建工单,最后将响应返回给客户端。

让我们看看请求在 RPC 接口失败时的执行路径:

interceptor sequence

类似地,当请求在日志拦截器中失败时的执行路径:

interceptor sequence failed at log

首先,我们在 protobuf 文件 order_processing.proto 中定义订单处理服务的基础类:

syntax = "proto3";

package orderprocessing;

option java_multiple_files = true;
option java_package = "com.baeldung.grpc.orderprocessing";

message OrderRequest {
  string product = 1;
  int32 quantity = 2;
  float price = 3;
}
message OrderResponse {
  string response = 1;
  string orderID = 2;
  string error = 3;
}
service OrderProcessor {
  rpc createOrder(OrderRequest) returns (OrderResponse){}
}

*order_processing.proto* 文件定义了包含远程方法 *createOrder()* 的 *OrderProcessor* 服务,以及两个 DTO *OrderRequest* 和 *OrderResponse*。

让我们看看后续将要实现的主要类:

order processing cld

之后,我们可以使用 order_processing.proto 文件生成实现 OrderProcessorImplGlobalExeptionInterceptor 所需的 Java 源代码。Maven 插件会生成 OrderRequestOrderResponseOrderProcessorGrpc 类。

我们将在实现部分详细讨论这些类。

4. 实现

我们将实现一个能处理各类异常的拦截器。这些异常可能是由于业务逻辑失败显式抛出的,也可能是不可预见的错误导致的。

4.1 实现全局异常处理器

gRPC 应用中的拦截器必须实现 ServerInterceptor 接口的 interceptCall() 方法

public class GlobalExceptionInterceptor implements ServerInterceptor {
    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata headers,
        ServerCallHandler<ReqT, RespT> next) {
        ServerCall.Listener<ReqT> delegate = null;
        try {
            delegate = next.startCall(serverCall, headers);
        } catch(Exception ex) {
            return handleInterceptorException(ex, serverCall);
        }
        return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(delegate) {
            @Override
            public void onHalfClose() {
                try {
                    super.onHalfClose();
                } catch (Exception ex) {
                    handleEndpointException(ex, serverCall);
                }
            }
        };
    }

    private static <ReqT, RespT> void handleEndpointException(Exception ex, ServerCall<ReqT, RespT> serverCall) {
        String ticket = new TicketService().createTicket(ex.getMessage());
        serverCall.close(Status.INTERNAL
            .withCause(ex)
            .withDescription(ex.getMessage() + ", Ticket raised:" + ticket), new Metadata());
    }

    private <ReqT, RespT> ServerCall.Listener<ReqT> handleInterceptorException(Throwable t, ServerCall<ReqT, RespT> serverCall) {
        String ticket = new TicketService().createTicket(t.getMessage());
        serverCall.close(Status.INTERNAL
            .withCause(t)
            .withDescription("An exception occurred in a **subsequent** interceptor:" + ", Ticket raised:" + ticket), new Metadata());

        return new ServerCall.Listener<ReqT>() {
            // no-op
        };
    }
}

interceptCall() 方法接收三个输入参数:

该方法包含两个 trycatch 块。第一个块处理从任何下游拦截器抛出的未捕获异常。 在 catch 块中,我们调用 handleInterceptorException() 方法为异常创建工单。最后返回一个 ServerCall.Listener 对象作为回调方法。

类似地,第二个 trycatch 块处理从 RPC 接口抛出的未捕获异常。 interceptCall() 方法返回的 ServerCall.Listener 作为传入 RPC 消息的回调。具体返回的是 ForwardingServerCallListener.SimpleForwardingServerCallListener 的实例,它是 ServerCall.Listener 的子类。

为处理下游方法抛出的异常,我们重写了 ForwardingServerCallListener.SimpleForwardingServerCallListener 类中的 onHalfClose() 方法。该方法在客户端完成消息发送后触发。

在此方法中,super.onHalfClose() 将请求转发给 OrderProcessorImpl 类中的 RPC 接口 *createOrder()*。如果接口中存在未捕获异常,我们会捕获异常并调用 handleEndpointException() 创建工单。最后调用 serverCall 对象的 close() 方法关闭服务器调用,并将响应返回给客户端。

4.2 注册全局异常处理器

我们在启动时创建 io.grpc.Server 对象时注册拦截器:

public class OrderProcessingServer {
    public static void main(String[] args) throws IOException, InterruptedException {
        Server server = ServerBuilder.forPort(8080)
          .addService(new OrderProcessorImpl())
          .intercept(new LogInterceptor())
          .intercept(new GlobalExceptionInterceptor())
          .build();
        server.start();
        server.awaitTermination();
    }
}

我们将 GlobalExceptionInterceptor 对象传递给 io.grpc.ServerBuilder 类的 intercept() 方法。 这确保了对 OrderProcessorImpl 服务的任何 RPC 调用都会经过 GlobalExceptionInterceptor。类似地,我们调用 addService() 方法注册 OrderProcessorImpl 服务。最后调用 Server 对象的 start() 方法启动服务器应用。

4.3 处理接口中的未捕获异常

为演示异常处理器的功能,我们先看看 OrderProcessorImpl 类:

public class OrderProcessorImpl extends OrderProcessorGrpc.OrderProcessorImplBase {
    @Override
    public void createOrder(OrderRequest request, StreamObserver<OrderResponse> responseObserver) {
        if (!validateOrder(request)) {
             throw new StatusRuntimeException(Status.FAILED_PRECONDITION.withDescription("Order Validation failed"));
        } else {
            OrderResponse orderResponse = processOrder(request);

            responseObserver.onNext(orderResponse);
            responseObserver.onCompleted();
        }
    }

    private Boolean validateOrder(OrderRequest request) {
        int tax = 100/0;
        return false;
    }

    private OrderResponse processOrder(OrderRequest request) {
        return OrderResponse.newBuilder()
          .setOrderID("ORD-5566")
          .setResponse("Order placed successfully")
          .build();
    }
}

RPC 方法 createOrder() 首先验证订单,然后调用 processOrder() 方法处理订单。在 validateOrder() 方法中,我们故意通过除以零触发运行时异常。

现在运行服务,观察其如何处理异常:

@Test
void whenRuntimeExceptionInRPCEndpoint_thenHandleException() {
    OrderRequest orderRequest = OrderRequest.newBuilder()
      .setProduct("PRD-7788")
      .setQuantity(1)
      .setPrice(5000)
      .build();

    try {
        OrderResponse response = orderProcessorBlockingStub.createOrder(orderRequest);
    } catch (StatusRuntimeException ex) {
        assertTrue(ex.getStatus()
          .getDescription()
          .contains("Ticket raised:TKT"));
    }
}

我们创建 OrderRequest 对象并传递给客户端存根的 createOrder() 方法。如预期,服务抛出异常。检查异常描述时,发现其中嵌入了工单信息。这证明 GlobalExceptionInterceptor 成功完成了任务。

对于流式场景同样有效。

4.4 处理拦截器中的未捕获异常

假设在 GlobalExceptionInterceptor 之后还有另一个拦截器 LogInterceptor,它记录所有传入请求用于审计。让我们看看它的实现:

public class LogInterceptor implements ServerInterceptor {
    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata,
        ServerCallHandler<ReqT, RespT> next) {
        logMessage(serverCall);
        ServerCall.Listener<ReqT> delegate = next.startCall(serverCall, metadata);
        return delegate;
    }

    private <ReqT, RespT> void logMessage(ServerCall<ReqT, RespT> call) {
        int result = 100/0;
    }
}

LogInterceptor 中,interceptCall() 方法在将请求转发给 RPC 接口前调用 logMessage() 记录消息。*logMessage()* 方法故意执行除零操作以触发运行时异常,用于演示 *GlobalExceptionInterceptor* 的能力。

运行服务,观察其如何处理 LogInterceptor 抛出的异常:

@Test
void whenRuntimeExceptionInLogInterceptor_thenHandleException() {
    OrderRequest orderRequest = OrderRequest.newBuilder()
        .setProduct("PRD-7788")
        .setQuantity(1)
        .setPrice(5000)
        .build();

    try {
        OrderResponse response = orderProcessorBlockingStub.createOrder(orderRequest);
    } catch (StatusRuntimeException ex) {
        assertTrue(ex.getStatus()
            .getDescription()
            .contains("An exception occurred in a **subsequent** interceptor:, Ticket raised:TKT"));
    }
    logger.info("order processing over");
}

首先调用客户端存根的 createOrder() 方法。这次 GlobalExceptionInterceptor 在第一个 trycatch 块中捕获了从 LogInterceptor 逃逸的异常。随后客户端收到异常,描述中嵌入了工单信息。

5. 结论

本文探讨了 gRPC 框架中拦截器作为全局异常处理器的作用。它们是处理异常相关通用关注点的绝佳工具,如日志记录、创建工单、丰富错误响应等。

本文使用的代码可在 GitHub 上找到。


原始标题:Add Global Exception Interceptor in gRPC Server | Baeldung