1. 介绍
本文将探讨如何使用Java NIO包中的SocketChannel
实现序列化对象的发送与接收。这种方式能在客户端和服务器间建立高效的非阻塞网络通信,特别适合需要传输复杂数据结构的分布式系统。
⚠️ 注意:本文假设读者已具备Java NIO基础,不涉及SocketChannel
的入门概念。
2. 理解序列化
序列化是将对象转换为字节流的过程,使其能通过网络传输或持久化存储。当与SocketChannel结合时,序列化实现了应用程序间复杂数据结构的无缝传输,这是分布式系统中对象交换的核心技术。
2.1. Java序列化的关键类
Java序列化主要依赖两个核心类:
ObjectOutputStream
:将对象序列化为字节流。例如,通过网络发送Message
对象时,它负责将对象字段和元数据写入输出流。ObjectInputStream
:在接收端从字节流重构对象。
✅ 记住:所有需要序列化的对象必须实现Serializable
接口。
3. 理解Socket通道
Socket通道是Java NIO包的组成部分,提供比传统Socket更灵活、可扩展的通信方案。它们同时支持阻塞和非阻塞模式,特别适合需要高效处理大量连接的高性能网络应用。
3.1. Socket通道的核心组件
Socket通道包含三个关键组件:
ServerSocketChannel
:监听TCP连接请求。绑定特定端口后等待客户端连接。SocketChannel
:表示客户端与服务器间的连接。支持阻塞和非阻塞两种模式。Selector
:使用单线程监控多个Socket通道。通过处理连接请求或数据可读等事件,避免了为每个连接分配独立线程的开销。
4. 搭建服务器与客户端
在实现服务器和客户端前,先定义一个待传输的示例对象。Java中对象必须实现Serializable
接口才能转换为字节流进行网络传输。
4.1. 创建可序列化对象
定义MyObject
类作为传输对象示例:
class MyObject implements Serializable {
private String name;
private int age;
public MyObject(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public int getAge() {
return age;
}
}
该类实现了Serializable
接口,这是对象能在Socket连接中传输的前提。
4.2. 实现服务器
服务器端使用ServerSocketChannel
监听客户端连接并处理接收的序列化对象:
private static final int PORT = 6000;
try (ServerSocketChannel serverSocket = ServerSocketChannel.open()) {
serverSocket.bind(new InetSocketAddress(PORT));
logger.info("Server is listening on port " + PORT);
while (true) {
try (SocketChannel clientSocket = serverSocket.accept()) {
System.out.println("Client connected...");
// 接收对象的位置
}
}
} catch (IOException e) {
// 异常处理
}
服务器监听6000端口,接受客户端连接后等待接收对象。
4.3. 实现客户端
客户端创建MyObject
实例,序列化后通过SocketChannel
发送到服务器:
private static final String SERVER_ADDRESS = "localhost";
private static final int SERVER_PORT = 6000;
try (SocketChannel socketChannel = SocketChannel.open()) {
socketChannel.connect(new InetSocketAddress(SERVER_ADDRESS, SERVER_PORT));
logger.info("Connected to the server...");
// 发送对象的位置
} catch (IOException e) {
// 异常处理
}
代码连接本地6000端口的服务器,准备发送序列化对象。
5. 序列化与发送对象
通过SocketChannel
传输对象时,需将对象序列化为字节数组并包装到ByteBuffer
中。发送前先附加4字节整数指明数据长度,确保接收方知道需要读取多少字节:
void sendObject(SocketChannel channel, MyObject obj) throws IOException {
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
try (ObjectOutputStream objOut = new ObjectOutputStream(byteStream)) {
objOut.writeObject(obj);
}
byte[] bytes = byteStream.toByteArray();
ByteBuffer lengthBuffer = ByteBuffer.allocate(4);
lengthBuffer.putInt(bytes.length);
lengthBuffer.flip();
while (lengthBuffer.hasRemaining()) {
channel.write(lengthBuffer);
}
ByteBuffer dataBuffer = ByteBuffer.wrap(bytes);
while (dataBuffer.hasRemaining()) {
channel.write(dataBuffer);
}
}
步骤分解:
- 序列化对象到字节数组
- 将长度信息写入4字节缓冲区
- 发送长度信息
- 发送实际数据
客户端调用示例:
try (SocketChannel socketChannel = SocketChannel.open()) {
socketChannel.connect(new InetSocketAddress(SERVER_ADDRESS, SERVER_PORT));
MyObject objectToSend = new MyObject("Alice", 25);
sendObject(socketChannel, objectToSend); // 序列化并发送
}
客户端连接服务器后,发送包含名称"Alice"和年龄25的MyObject
。
6. 接收与反序列化对象
服务器端先读取4字节长度信息,再根据长度读取完整数据并反序列化:
MyObject receiveObject(SocketChannel channel) throws IOException, ClassNotFoundException {
ByteBuffer lengthBuffer = ByteBuffer.allocate(4);
while (lengthBuffer.hasRemaining()) {
if (channel.read(lengthBuffer) == -1) {
throw new EOFException("Connection closed prematurely");
}
}
lengthBuffer.flip();
int length = lengthBuffer.getInt();
// 精确读取指定长度的字节
ByteBuffer dataBuffer = ByteBuffer.allocate(length);
while (dataBuffer.hasRemaining()) {
if (channel.read(dataBuffer) == -1) {
throw new EOFException("Incomplete data received");
}
}
dataBuffer.flip();
byte[] bytes = new byte[length];
dataBuffer.get(bytes);
try (ObjectInputStream objIn = new ObjectInputStream(new ByteArrayInputStream(bytes))) {
return (MyObject) objIn.readObject();
}
}
关键步骤:
- 读取4字节长度信息
- 分配对应大小的缓冲区
- 读取完整数据
- 反序列化为对象
服务器端接收示例:
try (SocketChannel clientSocket = serverSocket.accept()) {
MyObject receivedObject = receiveObject(clientSocket);
logger.info("Received Object - Name: " + receivedObject.getName());
}
7. 处理多客户端
使用Selector
在非阻塞模式下管理多个Socket通道,实现并发客户端处理:
class NonBlockingServer {
private static final int PORT = 6000;
public static void main(String[] args) throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.bind(new InetSocketAddress(PORT));
serverChannel.configureBlocking(false);
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel client = serverChannel.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
MyObject obj = receiveObject(client);
System.out.println("Received from client: " + obj.getName());
}
}
}
}
}
核心机制:
configureBlocking(false)
:设置非阻塞模式,避免accept()
和read()
操作阻塞线程Selector
:监听多个通道事件(连接请求OP_ACCEPT/数据可读OP_READ)- 事件驱动:仅在有事件发生时处理连接,实现高并发
⚠️ 踩坑提示:非阻塞模式下需处理read()
返回-1的情况(连接关闭)。
8. 测试用例
验证SocketChannel
对象序列化/反序列化的正确性:
@Test
void givenClientSendsObject_whenServerReceives_thenDataMatches() throws Exception {
try (ServerSocketChannel server = ServerSocketChannel.open().bind(new InetSocketAddress(6000))) {
int port = ((InetSocketAddress) server.getLocalAddress()).getPort();
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<MyObject> future = executor.submit(() -> {
try (SocketChannel client = server.accept();
ObjectInputStream objIn = new ObjectInputStream(Channels.newInputStream(client))) {
return (MyObject) objIn.readObject();
}
});
try (SocketChannel client = SocketChannel.open()) {
client.configureBlocking(true);
client.connect(new InetSocketAddress("localhost", 6000));
while (!client.finishConnect()) {
Thread.sleep(10);
}
try (ObjectOutputStream objOut = new ObjectOutputStream(Channels.newOutputStream(client))) {
objOut.writeObject(new MyObject("Test User", 25));
}
}
MyObject received = future.get(2, TimeUnit.SECONDS);
assertEquals("Test User", received.getName());
assertEquals(25, received.getAge());
executor.shutdown();
}
}
测试要点:
- 服务端在独立线程中等待接收对象
- 客户端连接后发送测试对象
- 验证接收对象数据一致性
- 设置超时防止死锁
9. 总结
本文展示了如何使用Java NIO的SocketChannel
构建客户端-服务器系统,实现序列化对象的传输。通过结合序列化与非阻塞I/O技术,我们实现了:
- ✅ 复杂数据结构的网络传输
- ✅ 高效的非阻塞通信
- ✅ 多客户端并发处理
- ✅ 可靠的数据完整性保障
这种方案特别适合需要高性能网络通信的分布式系统,简单粗暴地解决了对象传输问题。