1. 概述

本文将演示如何使用 Java 7 的 NIO.2 通道 API 构建一个简单的服务器及其客户端。我们将重点讲解 AsynchronousServerSocketChannelAsynchronousSocketChannel 这两个核心类,它们分别是实现服务器和客户端的关键。

如果你是 NIO.2 通道 API 的新手,建议先阅读本站的入门文章(点击查看)。使用 NIO.2 通道 API 所需的所有类都打包在 java.nio.channels 包中:

import java.nio.channels.*;

2. 基于 Future 的服务器实现

通过调用 AsynchronousServerSocketChannel 的静态 open() 方法创建实例:

AsynchronousServerSocketChannel server
  = AsynchronousServerSocketChannel.open();

新创建的异步服务器套接字通道处于开放状态但尚未绑定,因此必须将其绑定到本地地址并选择端口:

server.bind(new InetSocketAddress("127.0.0.1", 4555));

也可以传入 null 使用本地地址并绑定到任意端口:

server.bind(null);

绑定完成后,调用 accept() 方法开始接受通道套接字的连接:

Future<AsynchronousSocketChannel> acceptFuture = server.accept();

由于是异步通道操作,上述调用会立即返回并继续执行。接下来使用 get() API 查询 Future 对象的响应:

AsynchronousSocketChannel worker = future.get();

此调用会阻塞等待客户端连接请求。若不想无限等待,可指定超时时间:

AsynchronousSocketChannel worker = acceptFuture.get(10, TimeUnit.SECONDS);

当调用返回且操作成功后,创建循环监听传入消息并回显给客户端。在 runServer 方法中实现等待和处理逻辑:

public void runServer() {
    clientChannel = acceptResult.get();
    if ((clientChannel != null) && (clientChannel.isOpen())) {
        while (true) {
            ByteBuffer buffer = ByteBuffer.allocate(32);
            Future<Integer> readResult  = clientChannel.read(buffer);
            
            // 执行其他计算任务
            
            readResult.get();
            
            buffer.flip();
            Future<Integer> writeResult = clientChannel.write(buffer);
 
            // 执行其他计算任务
 
            writeResult.get();
            buffer.clear();
        } 
        clientChannel.close();
        serverChannel.close();
    }
}

循环内部主要操作:

  1. 创建读写缓冲区
  2. 每次读写操作后可继续执行其他代码
  3. 准备处理结果时调用 Futureget() 方法

main 方法中启动服务器:

public static void main(String[] args) {
    AsyncEchoServer server = new AsyncEchoServer();
    server.runServer();
}

⚠️ 踩坑提示buffer.flip() 切换读写模式是关键步骤,新手常忘记导致数据错乱!

3. 基于 CompletionHandler 的服务器实现

本节使用 CompletionHandler 替代 Future 实现相同功能。在构造函数中创建并绑定 AsynchronousServerSocketChannel

serverChannel = AsynchronousServerSocketChannel.open();
InetSocketAddress hostAddress = new InetSocketAddress("localhost", 4999);
serverChannel.bind(hostAddress);

在构造函数中创建循环接受客户端连接。此循环严格用于防止服务器在建立连接前退出。为避免无限循环,在循环末尾调用 System.in.read() 阻塞执行:

while (true) {
    serverChannel.accept(
      null, new CompletionHandler<AsynchronousSocketChannel,Object>() {

        @Override
        public void completed(
          AsynchronousSocketChannel result, Object attachment) {
            if (serverChannel.isOpen()){
                serverChannel.accept(null, this);
            }

            clientChannel = result;
            if ((clientChannel != null) && (clientChannel.isOpen())) {
                ReadWriteHandler handler = new ReadWriteHandler();
                ByteBuffer buffer = ByteBuffer.allocate(32);

                Map<String, Object> readInfo = new HashMap<>();
                readInfo.put("action", "read");
                readInfo.put("buffer", buffer);

                clientChannel.read(buffer, readInfo, handler);
             }
         }
         @Override
         public void failed(Throwable exc, Object attachment) {
             // 处理错误
         }
    });
    System.in.read();
}

连接建立后,accept 操作的 CompletionHandlercompleted 回调方法被触发:

  • 若服务器通道仍开放,调用 accept() 准备新连接
  • 将返回的套接字通道分配给全局实例
  • completed 回调中启动读写操作(替代 Future 的轮询方式)

📌 关键区别:服务器建立连接后不会自动退出,需显式关闭。

下面实现读写处理器 ReadWriteHandler

class ReadWriteHandler implements 
  CompletionHandler<Integer, Map<String, Object>> {
    
    @Override
    public void completed(
      Integer result, Map<String, Object> attachment) {
        Map<String, Object> actionInfo = attachment;
        String action = (String) actionInfo.get("action");

        if ("read".equals(action)) {
            ByteBuffer buffer = (ByteBuffer) actionInfo.get("buffer");
            buffer.flip();
            actionInfo.put("action", "write");

            clientChannel.write(buffer, actionInfo, this);
            buffer.clear();

        } else if ("write".equals(action)) {
            ByteBuffer buffer = ByteBuffer.allocate(32);

            actionInfo.put("action", "read");
            actionInfo.put("buffer", buffer);

            clientChannel.read(buffer, actionInfo, this);
        }
    }
    
    @Override
    public void failed(Throwable exc, Map<String, Object> attachment) {
        // 
    }
}

attachment 对象(Map 类型)传递两个关键参数:

  1. 操作类型(action)
  2. 缓冲区(buffer)

处理流程:

  1. 初始执行读操作(echo 服务器需先接收消息)
  2. 读完成后切换缓冲区模式,更新 action 为 "write" 并回写数据
  3. 写完成后重新分配缓冲区,更新 action 为 "read" 准备下次接收

优势:通过 attachment 参数实现状态传递,避免全局变量污染。

4. 客户端实现

服务器搭建完成后,通过 AsynchronousSocketChannel.open() 创建客户端:

AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
InetSocketAddress hostAddress = new InetSocketAddress("localhost", 4999)
Future<Void> future = client.connect(hostAddress);

connect 操作成功时返回 null,但可用 Future 监控异步操作状态。调用 get() 等待连接建立:

future.get()

连接建立后,通过 sendMessage 方法发送消息并接收回显:

public String sendMessage(String message) {
    byte[] byteMsg = new String(message).getBytes();
    ByteBuffer buffer = ByteBuffer.wrap(byteMsg);
    Future<Integer> writeResult = client.write(buffer);

    // 执行其他计算任务

    writeResult.get();
    buffer.flip();
    Future<Integer> readResult = client.read(buffer);
    
    // 执行其他计算任务

    readResult.get();
    String echo = new String(buffer.array()).trim();
    buffer.clear();
    return echo;
}

性能技巧:在 get() 调用间执行其他计算任务,充分利用异步优势。

5. 测试验证

通过单元测试验证服务器和客户端的预期行为:

@Test
public void givenServerClient_whenServerEchosMessage_thenCorrect() {
    String resp1 = client.sendMessage("hello");
    String resp2 = client.sendMessage("world");

    assertEquals("hello", resp1);
    assertEquals("world", resp2);
}

🔍 测试要点

  • 验证消息完整回显
  • 检查多消息顺序处理
  • 确认连接稳定性

6. 总结

本文深入探索了 Java NIO.2 异步套接字通道 API,完整演示了使用这些新 API 构建服务器和客户端的过程。核心要点包括:

  • 两种异步模式对比

    • Future 模式:简单粗暴,适合简单场景
    • CompletionHandler 模式:事件驱动,适合复杂交互
  • 关键实践建议

    • 缓冲区状态管理(flip/clear)是高频踩坑点
    • 合理使用 attachment 传递上下文
    • 异步操作间穿插计算任务提升吞吐量

完整源代码可在 GitHub 项目 获取。建议结合实战调试,深入理解异步编程精髓!


原始标题:A Guide to NIO2 Asynchronous Socket Channel