1. 概述
本文将演示如何使用 Java 7 的 NIO.2 通道 API 构建一个简单的服务器及其客户端。我们将重点讲解 AsynchronousServerSocketChannel
和 AsynchronousSocketChannel
这两个核心类,它们分别是实现服务器和客户端的关键。
如果你是 NIO.2 通道 API 的新手,建议先阅读本站的入门文章(点击查看)。使用 NIO.2 通道 API 所需的所有类都打包在 java.nio.channels
包中:
import java.nio.channels.*;
2. 基于 Future 的服务器实现
通过调用 AsynchronousServerSocketChannel
的静态 open()
方法创建实例:
AsynchronousServerSocketChannel server
= AsynchronousServerSocketChannel.open();
新创建的异步服务器套接字通道处于开放状态但尚未绑定,因此必须将其绑定到本地地址并选择端口:
server.bind(new InetSocketAddress("127.0.0.1", 4555));
也可以传入 null
使用本地地址并绑定到任意端口:
server.bind(null);
绑定完成后,调用 accept()
方法开始接受通道套接字的连接:
Future<AsynchronousSocketChannel> acceptFuture = server.accept();
由于是异步通道操作,上述调用会立即返回并继续执行。接下来使用 get()
API 查询 Future
对象的响应:
AsynchronousSocketChannel worker = future.get();
此调用会阻塞等待客户端连接请求。若不想无限等待,可指定超时时间:
AsynchronousSocketChannel worker = acceptFuture.get(10, TimeUnit.SECONDS);
当调用返回且操作成功后,创建循环监听传入消息并回显给客户端。在 runServer
方法中实现等待和处理逻辑:
public void runServer() {
clientChannel = acceptResult.get();
if ((clientChannel != null) && (clientChannel.isOpen())) {
while (true) {
ByteBuffer buffer = ByteBuffer.allocate(32);
Future<Integer> readResult = clientChannel.read(buffer);
// 执行其他计算任务
readResult.get();
buffer.flip();
Future<Integer> writeResult = clientChannel.write(buffer);
// 执行其他计算任务
writeResult.get();
buffer.clear();
}
clientChannel.close();
serverChannel.close();
}
}
循环内部主要操作:
- 创建读写缓冲区
- 每次读写操作后可继续执行其他代码
- 准备处理结果时调用
Future
的get()
方法
在 main
方法中启动服务器:
public static void main(String[] args) {
AsyncEchoServer server = new AsyncEchoServer();
server.runServer();
}
⚠️ 踩坑提示:buffer.flip()
切换读写模式是关键步骤,新手常忘记导致数据错乱!
3. 基于 CompletionHandler 的服务器实现
本节使用 CompletionHandler
替代 Future
实现相同功能。在构造函数中创建并绑定 AsynchronousServerSocketChannel
:
serverChannel = AsynchronousServerSocketChannel.open();
InetSocketAddress hostAddress = new InetSocketAddress("localhost", 4999);
serverChannel.bind(hostAddress);
在构造函数中创建循环接受客户端连接。此循环严格用于防止服务器在建立连接前退出。为避免无限循环,在循环末尾调用 System.in.read()
阻塞执行:
while (true) {
serverChannel.accept(
null, new CompletionHandler<AsynchronousSocketChannel,Object>() {
@Override
public void completed(
AsynchronousSocketChannel result, Object attachment) {
if (serverChannel.isOpen()){
serverChannel.accept(null, this);
}
clientChannel = result;
if ((clientChannel != null) && (clientChannel.isOpen())) {
ReadWriteHandler handler = new ReadWriteHandler();
ByteBuffer buffer = ByteBuffer.allocate(32);
Map<String, Object> readInfo = new HashMap<>();
readInfo.put("action", "read");
readInfo.put("buffer", buffer);
clientChannel.read(buffer, readInfo, handler);
}
}
@Override
public void failed(Throwable exc, Object attachment) {
// 处理错误
}
});
System.in.read();
}
连接建立后,accept 操作的 CompletionHandler
中 completed
回调方法被触发:
- 若服务器通道仍开放,调用
accept()
准备新连接 - 将返回的套接字通道分配给全局实例
- 在
completed
回调中启动读写操作(替代Future
的轮询方式)
📌 关键区别:服务器建立连接后不会自动退出,需显式关闭。
下面实现读写处理器 ReadWriteHandler
:
class ReadWriteHandler implements
CompletionHandler<Integer, Map<String, Object>> {
@Override
public void completed(
Integer result, Map<String, Object> attachment) {
Map<String, Object> actionInfo = attachment;
String action = (String) actionInfo.get("action");
if ("read".equals(action)) {
ByteBuffer buffer = (ByteBuffer) actionInfo.get("buffer");
buffer.flip();
actionInfo.put("action", "write");
clientChannel.write(buffer, actionInfo, this);
buffer.clear();
} else if ("write".equals(action)) {
ByteBuffer buffer = ByteBuffer.allocate(32);
actionInfo.put("action", "read");
actionInfo.put("buffer", buffer);
clientChannel.read(buffer, actionInfo, this);
}
}
@Override
public void failed(Throwable exc, Map<String, Object> attachment) {
//
}
}
attachment
对象(Map 类型)传递两个关键参数:
- 操作类型(action)
- 缓冲区(buffer)
处理流程:
- 初始执行读操作(echo 服务器需先接收消息)
- 读完成后切换缓冲区模式,更新 action 为 "write" 并回写数据
- 写完成后重新分配缓冲区,更新 action 为 "read" 准备下次接收
✅ 优势:通过 attachment 参数实现状态传递,避免全局变量污染。
4. 客户端实现
服务器搭建完成后,通过 AsynchronousSocketChannel.open()
创建客户端:
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
InetSocketAddress hostAddress = new InetSocketAddress("localhost", 4999)
Future<Void> future = client.connect(hostAddress);
connect
操作成功时返回 null
,但可用 Future
监控异步操作状态。调用 get()
等待连接建立:
future.get()
连接建立后,通过 sendMessage
方法发送消息并接收回显:
public String sendMessage(String message) {
byte[] byteMsg = new String(message).getBytes();
ByteBuffer buffer = ByteBuffer.wrap(byteMsg);
Future<Integer> writeResult = client.write(buffer);
// 执行其他计算任务
writeResult.get();
buffer.flip();
Future<Integer> readResult = client.read(buffer);
// 执行其他计算任务
readResult.get();
String echo = new String(buffer.array()).trim();
buffer.clear();
return echo;
}
⚡ 性能技巧:在 get()
调用间执行其他计算任务,充分利用异步优势。
5. 测试验证
通过单元测试验证服务器和客户端的预期行为:
@Test
public void givenServerClient_whenServerEchosMessage_thenCorrect() {
String resp1 = client.sendMessage("hello");
String resp2 = client.sendMessage("world");
assertEquals("hello", resp1);
assertEquals("world", resp2);
}
🔍 测试要点:
- 验证消息完整回显
- 检查多消息顺序处理
- 确认连接稳定性
6. 总结
本文深入探索了 Java NIO.2 异步套接字通道 API,完整演示了使用这些新 API 构建服务器和客户端的过程。核心要点包括:
两种异步模式对比:
Future
模式:简单粗暴,适合简单场景CompletionHandler
模式:事件驱动,适合复杂交互
关键实践建议:
- 缓冲区状态管理(flip/clear)是高频踩坑点
- 合理使用 attachment 传递上下文
- 异步操作间穿插计算任务提升吞吐量
完整源代码可在 GitHub 项目 获取。建议结合实战调试,深入理解异步编程精髓!