1. 引言

本文将使用 Netty 构建一个聊天室应用。在网络编程领域,Netty 作为强大的框架,极大地简化了异步 I/O 操作的复杂性。我们将探索如何构建一个基础聊天服务器,支持多客户端连接并实现实时通信。

2. 场景设计

服务器将接收的消息广播给所有连接的客户端,同时保留最近几条消息的历史记录,使新加入的客户端能快速了解当前对话上下文。实现这个功能只需要几个关键的事件处理器来维护通道间通信

Netty 聊天室客户端与服务器通信示意图

在 Netty 中,通信通过通道(Channel)完成,它抽象了任何协议上的异步 I/O 操作。这让我们能专注于业务逻辑而非底层网络代码。 我们的应用将通过命令行运行,包含服务器和客户端两个部分。

3. 创建自定义事件处理器

为处理通道间通信,我们将实现 SimpleChannelInboundHandler<String>,这是 ChannelInboundHandlerAdapter泛型实现。该适配器允许我们只关注需要处理的事件,本例中是 channelRead0()——当服务器收到消息时触发。我们选择这个实现是因为只需处理字符串消息,能简化开发流程。

3.1. 客户端事件处理器

先从客户端消息处理器开始,它将服务器接收到的所有消息原样输出到控制台

public class ClientEventHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) {
        System.out.println(msg);
    }
}

稍后我们将通过直接写入通道来处理消息发送。

3.2. 消息对象

在处理服务器事件前,先创建一个 POJO 表示发送到服务器的每条消息。我们将记录发送时间、用户名和消息内容:

public class Message {

    private final Instant time;
    private final String user;
    private final String message;

    public Message(String user, String message) {
        this.time = Instant.now();
        this.user = user;
        this.message = message;
    }

    // 标准getter方法...
}

接着添加辅助方法,首先是服务器发送消息时的控制台显示格式

@Override
public String toString() {
    return time + " - " + user + ": " + message;
}

然后为客户端接收的消息解析实现 CSV 格式解析(稍后创建客户端时会看到这种格式):

public static Message parse(String string) {
    String[] arr = string.split(";", 2);
    return new Message(arr[0], arr[1]);
}

⚠️ 限制分割次数为 2 很重要,因为消息内容可能包含分号。

3.3. 服务器事件处理器

在服务器事件处理器中,先创建辅助方法供其他重写事件使用。同时需要维护连接客户端的映射和最多保存 MAX_HISTORY 条消息的队列

public class ServerEventHandler extends SimpleChannelInboundHandler<String> {

    static final Map<String, Channel> clients = new HashMap<>();
    static final Queue<String> history = new LinkedList<>();
    static final int MAX_HISTORY = 5;

    private void handleBroadcast(Message message, ChannelHandlerContext context) {
        String channelId = context.channel()
          .id()
          .asShortText();
        
        clients.forEach((id, channel) -> {
            if (!id.equals(channelId))
                channel.writeAndFlush(message.toString());
        });

        // 历史记录控制代码...
    }

    // ...
}

首先获取通道 ID 作为映射键。广播时,向除发送者外的所有客户端转发消息。

注意 writeAndFlush() 接收 Object 参数。由于我们的处理器只能处理字符串,必须调用 toString() 确保客户端正确接收。

最后进行历史记录控制:每次添加新消息时,若超过 MAX_HISTORY 条则移除最旧的消息:

history.add(message.toString());
if (history.size() > MAX_HISTORY)
    history.poll();

现在重写 channelRead0() 解析客户端消息:

@Override
public void channelRead0(ChannelHandlerContext context, String msg) {
    handleBroadcast(Message.parse(msg), context);
}

当新客户端上线时,将其加入 clients 列表,发送历史消息提供上下文,并广播系统通知

@Override
public void channelActive(final ChannelHandlerContext context) {
    Channel channel = context.channel();
    clients.put(channel.id().asShortText(), channel);

    history.forEach(channel::writeAndFlush);

    handleBroadcast(new Message("system", "client online"), context);
}

最后重写 channelInactive() 处理客户端下线事件。只需从列表移除客户端并广播系统通知

@Override
public void channelInactive(ChannelHandlerContext context) {
    Channel channel = context.channel();
    clients.remove(channel.id().asShortText());

    handleBroadcast(new Message("system", "client offline"), context);
}

4. 服务器引导应用

处理器本身无法独立运行,需要引导应用来启动它,这是一个通用模板。

4.1. 在 ChannelPipeline 中注册自定义组件

准备引导时,选择通道实现并创建子处理器(child handler)来处理通道请求

bootstrap.group(serverGroup, clientGroup)
  .channel(NioServerSocketChannel.class)
  .childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      public void initChannel(SocketChannel channel) {
          channel.pipeline()
            .addFirst(
              new StringDecoder(),
              new ServerEventHandler()
              new StringEncoder());
      }
  });

在子处理器中定义处理管道。由于我们只处理字符串消息,使用内置的字符串编解码器,省去了手动编解码字节缓冲区的麻烦。

最后注意顺序:添加解码器、ServerEventHandler 和编码器。因为事件在管道中从入站(inbound)流向出站(outbound)。

通过绑定主机/端口完成应用,返回 ChannelFuture。我们用它等待异步套接字关闭(通过 sync()):

ChannelFuture future = bootstrap.bind(HOST, PORT).sync();
System.out.println("server started. accepting clients.");
future.channel().closeFuture().sync();

5. 客户端引导应用

客户端应用遵循引导的通用模板。关键是调用 handler() 时使用我们的 ClientEventHandler

channel.pipeline().addFirst(
  new StringDecoder(), 
  new ClientEventHandler(), 
  new StringEncoder());

5.1. 处理消息输入

连接服务器后,使用Scanner循环处理用户输入:先获取用户名,再持续接收消息直到输入 "exit"。必须使用 writeAndFlush() 发送消息,格式需符合 Message.parse() 的要求:

private static void messageLoop(Scanner scanner, Channel channel) {
    while (user.isEmpty()) {
        System.out.print("your name: ");
        user = scanner.nextLine();
    }

    while (scanner.hasNext()) {
        System.out.print("> ");
        String message = scanner.nextLine();
        if (message.equals("exit"))
            break;

        channel.writeAndFlush(user + ";" + message);
    }
}

6. 创建自定义事件监听器

在 Netty 中,事件监听器对处理通道生命周期中的异步事件至关重要。事件监听器本质是回调机制,用于响应返回 ChannelFuture 的操作完成事件。

我们通过实现 ChannelFutureListener 接口定义操作完成时的自定义行为。ChannelFuture 表示异步操作(如连接尝试或 I/O 操作)的结果。

ChannelFutureListener 提供了 CLOSE_ON_FAILUREFIRE_EXCEPTION_ON_FAILURE 等默认实现。但这里我们实现 GenericFutureListener 用于操作确认。

我们保存自定义事件名作为上下文,检查操作是否成功完成。失败时标记状态为 "FAILED" 并记录错误

public class ChannelInfoListener implements GenericFutureListener<ChannelFuture> {

    private final String event;

    public ChannelInfoListener(String event) {
        this.event = event;
    }

    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        Channel channel = future.channel();
        String status = "OK";

        if (!future.isSuccess()) {
            status = "FAILED";
            future.cause().printStackTrace();
        }

        System.out.printf(
          "%s - channel#%s %s: %s%n", Instant.now(), channel.id().asShortText(), status, event);
    }
}

6.1. 事件回执

回到代码中添加监听器。首先为客户端添加"连接服务器"确认

future.addListener(new ChannelInfoListener("connected to server"));

然后在消息循环中添加"消息发送"确认

ChannelFuture sent = channel.writeAndFlush(user + ";" + message);
sent.addListener(new ChannelInfoListener("message sent"));

这能确保发送消息时仍连接到服务器。最后在服务器处理器的广播过程中添加"消息转发"确认

clients.forEach((id, channel) -> {
    if (!id.equals(channelId)) {
        ChannelFuture relay = channel.writeAndFlush(message.toString());
        relay.addListener(new ChannelInfoListener("message relayed to " + id));
    }
});

7. 实际运行效果

Netty 允许使用EmbeddedChannel测试管道,但这里我们展示终端运行的客户端/服务器交互效果。启动服务器(省略包名):

$ mvn exec:java -Dexec.mainClass=ChatServerMain
chat server started. ready to accept clients.

启动第一个客户端,输入用户名并发送两条消息:

$ mvn exec:java -Dexec.mainClass=ChatClientMain
2024-01-12 3:47:02 - channel#03c40ad4 OK: connected to server
your name: Bob
> Hello
2024-01-12 3:47:02 - channel#03c40ad4 OK: message sent
> Anyone there?!
2024-01-12 3:47:03 - channel#03c40ad4 OK: message sent

当第二个客户端连接时,会在输入用户名前收到历史消息

$ mvn exec:java -Dexec.mainClass=ChatClientMain
2024-01-12 3:49:33 - channel#daa64476 OK: connected to server
2024-01-12 3:46:55 - system: client online: 03c40ad4
2024-01-12 3:47:03 - Bob: Hello
2024-01-12 3:48:40 - Bob: Anyone there?!

输入用户名并发送消息后:

your name: Alice
> Hi, Bob!
2024-01-12 3:51:05 - channel#daa64476 OK: message sent

第一个客户端会收到:

2024-01-12 3:49:33 - system: client online: daa64476
2024-01-12 3:51:05 - Alice: Hi, Bob!

8. 总结

本文成功使用 Netty 构建了功能完整的聊天服务器,展示了该框架在处理异步通信时的强大与简洁。通过实现事件处理器,我们实现了客户端间消息转发和上下文历史维护。

源代码可在 GitHub 获取。


原始标题:Custom Event Handlers and Listeners in Netty