1. 引言
本文将演示如何在同一台机器上运行的多个JVM之间共享内存。这种技术能实现极快的进程间通信,因为我们可以直接移动数据块而无需任何I/O操作。
2. 共享内存的工作原理
现代操作系统中运行的进程都会获得一个所谓的虚拟内存空间。我们称之为"虚拟"是因为,虽然它看起来像是一个庞大、连续且私有的可寻址内存空间,但实际上它是由分散在物理RAM各处的内存页组成的。这里的"页"只是操作系统对连续内存块的称呼,其大小范围取决于特定的CPU架构。对于x86-64架构,内存页可以小至4KB,大至1GB。
在任何给定时间,只有部分虚拟空间实际映射到物理内存页。随着时间推移,当进程需要为任务消耗更多内存时,操作系统会分配更多物理内存页并映射到虚拟空间。当内存需求超过物理可用空间时,操作系统会开始将当前未使用的内存页交换到辅助存储设备,为新请求腾出空间。
共享内存块的行为与常规内存类似,但与常规内存不同的是,它不专属于单个进程。当一个进程修改此块中任何字节的内容时,任何有权访问同一共享内存的其他进程都会立即"看到"此变化。
共享内存的常见用途包括:
- 调试器(想知道调试器如何检查其他进程中的变量吗?)
- 进程间通信
- 进程间只读内容共享(例如:动态库代码)
- 各种黑科技 ;)
3. 共享内存与内存映射文件
内存映射文件,顾名思义,是一种常规文件,其内容直接映射到进程虚拟内存中的连续区域。这意味着我们可以读取和/或修改其内容而无需显式使用I/O操作。操作系统会检测到映射区域的任何写入操作,并安排后台I/O操作来持久化修改后的数据。
由于无法保证后台操作何时发生,操作系统还提供了系统调用来刷新所有待处理的更改。这对于数据库重做日志等用例很重要,但在我们的进程间通信(IPC)场景中则不需要。
内存映射文件通常被数据库服务器用于实现高吞吐量I/O操作,但我们也可以利用它们来构建基于共享内存的IPC机制。基本思路是:所有需要共享数据的进程都映射同一个文件,瞧,它们现在就有了一个共享内存区域。
4. 在Java中创建内存映射文件
在Java中,我们使用FileChannel
的map()
方法将文件区域映射到内存,该方法返回一个MappedByteBuffer
,允许我们访问其内容:
MappedByteBuffer createSharedMemory(String path, long size) {
try (FileChannel fc = (FileChannel)Files.newByteChannel(new File(path).toPath(),
EnumSet.of(
StandardOpenOption.CREATE,
StandardOpenOption.SPARSE,
StandardOpenOption.WRITE,
StandardOpenOption.READ))) {
return fc.map(FileChannel.MapMode.READ_WRITE, 0, size);
}
catch( IOException ioe) {
throw new RuntimeException(ioe);
}
}
这里使用SPARSE
选项相当关键。只要底层操作系统和文件系统支持,我们就可以映射大块内存区域而无需实际消耗磁盘空间。
现在,让我们创建一个简单的演示应用程序。Producer
应用程序将分配一个足够容纳64KB数据加SHA1哈希(20字节)的共享内存。接下来,它将启动一个循环,用随机数据填充缓冲区,后跟其SHA1哈希值。我们将连续重复此操作30秒然后退出:
// ... SHA1摘要初始化代码省略
MappedByteBuffer shm = createSharedMemory("some_path.dat", 64*1024 + 20);
Random rnd = new Random();
long start = System.currentTimeMillis();
long iterations = 0;
int capacity = shm.capacity();
System.out.println("Starting producer iterations...");
while(System.currentTimeMillis() - start < 30000) {
for (int i = 0; i < capacity - hashLen; i++) {
byte value = (byte) (rnd.nextInt(256) & 0x00ff);
digest.update(value);
shm.put(i, value);
}
// 在末尾写入哈希值
byte[] hash = digest.digest();
shm.put(capacity - hashLen, hash);
iterations++;
}
System.out.printf("%d iterations run\n", iterations);
为了验证我们确实能共享内存,我们还将创建一个Consumer
应用程序,它将读取缓冲区内容,计算其哈希值,并与Producer
生成的哈希值进行比较。我们将重复此过程30秒。在每次迭代中,我们还会计算缓冲区内容的哈希值,并将其与缓冲区末尾的哈希值进行比较:
// ... 摘要初始化代码省略
MappedByteBuffer shm = createSharedMemory("some_path.dat", 64*1024 + 20);
long start = System.currentTimeMillis();
long iterations = 0;
int capacity = shm.capacity();
System.out.println("Starting consumer iterations...");
long matchCount = 0;
long mismatchCount = 0;
byte[] expectedHash = new byte[hashLen];
while (System.currentTimeMillis() - start < 30000) {
for (int i = 0; i < capacity - 20; i++) {
byte value = shm.get(i);
digest.update(value);
}
byte[] hash = digest.digest();
shm.get(capacity - hashLen, expectedHash);
if (Arrays.equals(hash, expectedHash)) {
matchCount++;
} else {
mismatchCount++;
}
iterations++;
}
System.out.printf("%d iterations run. matches=%d, mismatches=%d\n", iterations, matchCount, mismatchCount);
要测试我们的内存共享方案,让我们同时启动这两个程序。这是它们在3Ghz四核Intel I7机器上运行时的输出:
# Producer输出
Starting producer iterations...
11722 iterations run
# Consumer输出
Starting consumer iterations...
18893 iterations run. matches=11714, mismatches=7179
我们可以看到,在许多情况下,消费者检测到预期计算值与实际值不同。欢迎来到并发问题的奇妙世界!
5. 同步共享内存访问
我们遇到问题的根本原因是需要同步访问共享内存缓冲区。Consumer
必须等待Producer
完成哈希值写入后才能开始读取数据。另一方面,Producer
也必须等待Consumer
完成数据消费后才能再次写入。
对于常规的多线程应用程序,解决这个问题不是什么大事。标准库提供了多种同步原语,允许我们控制谁可以在给定时间写入共享内存。
然而,我们面对的是多JVM场景,因此这些标准方法都不适用。那么,我们该怎么办呢?简单来说,我们得耍点小聪明。我们可以使用操作系统特定的机制如信号量,但这会影响应用程序的可移植性。此外,这意味着需要使用JNI或JNA,这也会使事情复杂化。
这时Unsafe
就派上用场了。尽管名字有点吓人,但这个标准库类恰好提供了我们实现简单锁机制所需的功能:compareAndSwapInt()
方法。
此方法实现了一个原子测试并设置原语,接受四个参数。虽然文档中没有明确说明,但它不仅可以针对Java对象,还可以针对原始内存地址。对于后者,我们在第一个参数中传递null
,这样它就会将offset
参数视为虚拟内存地址。
当我们调用此方法时,它会首先检查目标地址的值,并将其与expected
值进行比较。如果相等,则修改位置内容为新值并返回true
表示成功。如果位置的值与expected
不同,则不执行任何操作,方法返回false
。
更重要的是,即使在多核架构中,这个原子操作也能保证正常工作,这是同步多个执行线程的关键特性。
让我们创建一个利用此方法的SpinLock
类,实现一个(非常!)简单的锁机制:
//... 包和导入语句省略
public class SpinLock {
private static final Unsafe unsafe;
// ... unsafe初始化代码省略
private final long addr;
public SpinLock(long addr) {
this.addr = addr;
}
public boolean tryLock(long maxWait) {
long deadline = System.currentTimeMillis() + maxWait;
while (System.currentTimeMillis() < deadline ) {
if (unsafe.compareAndSwapInt(null, addr, 0, 1)) {
return true;
}
}
return false;
}
public void unlock() {
unsafe.putInt(addr, 0);
}
}
这个实现缺乏关键功能,比如在释放锁前检查是否拥有锁,但对于我们的目的已经足够。
好了,那么我们如何获取用于存储锁状态的内存地址呢?这个地址必须在共享内存缓冲区内,以便两个进程都能使用它,但MappedByteBuffer
类不公开实际的内存地址。
检查map()
返回的对象,我们可以看到它是一个DirectByteBuffer
。这个类有一个名为address()
的公共方法,它恰好返回我们想要的内容。不幸的是,这个类是包私有的,所以我们不能使用简单的强制转换来访问这个方法。
为了绕过这个限制,我们再耍点小聪明,使用反射来调用这个方法:
private static long getBufferAddress(MappedByteBuffer shm) {
try {
Class<?> cls = shm.getClass();
Method maddr = cls.getMethod("address");
maddr.setAccessible(true);
Long addr = (Long) maddr.invoke(shm);
if (addr == null) {
throw new RuntimeException("Unable to retrieve buffer's address");
}
return addr;
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) {
throw new RuntimeException(ex);
}
}
这里,我们使用setAccessible()
使address()
方法可以通过Method
句柄调用。但请注意,从Java 17开始,除非我们明确使用运行时–add-opens
标志,否则这种技术将无法工作。
6. 为Producer和Consumer添加同步
现在我们有了锁机制,让我们先将其应用到Producer
。为了演示目的,我们假设Producer
将始终在Consumer
之前启动。我们需要这样做来初始化缓冲区,清除其内容,包括我们将用于SpinLock
的区域:
public static void main(String[] args) throws Exception {
// ... 摘要初始化代码省略
MappedByteBuffer shm = createSharedMemory("some_path.dat", 64*1024 + 20);
// 清理锁区域
shm.putInt(0, 0);
long addr = getBufferAddress(shm);
System.out.println("Starting producer iterations...");
long start = System.currentTimeMillis();
long iterations = 0;
Random rnd = new Random();
int capacity = shm.capacity();
SpinLock lock = new SpinLock(addr);
while(System.currentTimeMillis() - start < 30000) {
if (!lock.tryLock(5000)) {
throw new RuntimeException("Unable to acquire lock");
}
try {
// 跳过前4个字节,因为它们被锁使用
for (int i = 4; i < capacity - hashLen; i++) {
byte value = (byte) (rnd.nextInt(256) & 0x00ff);
digest.update(value);
shm.put(i, value);
}
// 在末尾写入哈希值
byte[] hash = digest.digest();
shm.put(capacity - hashLen, hash);
iterations++;
}
finally {
lock.unlock();
}
}
System.out.printf("%d iterations run\n", iterations);
}
与未同步版本相比,只有一些微小改动:
- 获取与
MappedByteBufer
关联的内存地址 - 使用此地址创建
SpinLock
实例。锁使用一个int
,因此它将占用缓冲区的前四个字节 - 使用
SpinLock
实例保护用随机数据及其哈希值填充缓冲区的代码
现在,让我们对Consumer
端应用类似的更改:
private static void main(String args[]) throws Exception {
// ... 摘要初始化代码省略
MappedByteBuffer shm = createSharedMemory("some_path.dat", 64*1024 + 20);
long addr = getBufferAddress(shm);
System.out.println("Starting consumer iterations...");
Random rnd = new Random();
long start = System.currentTimeMillis();
long iterations = 0;
int capacity = shm.capacity();
long matchCount = 0;
long mismatchCount = 0;
byte[] expectedHash = new byte[hashLen];
SpinLock lock = new SpinLock(addr);
while (System.currentTimeMillis() - start < 30000) {
if (!lock.tryLock(5000)) {
throw new RuntimeException("Unable to acquire lock");
}
try {
for (int i = 4; i < capacity - hashLen; i++) {
byte value = shm.get(i);
digest.update(value);
}
byte[] hash = digest.digest();
shm.get(capacity - hashLen, expectedHash);
if (Arrays.equals(hash, expectedHash)) {
matchCount++;
} else {
mismatchCount++;
}
iterations++;
} finally {
lock.unlock();
}
}
System.out.printf("%d iterations run. matches=%d, mismatches=%d\n", iterations, matchCount, mismatchCount);
}
通过这些更改,我们现在可以运行两端并与之前的结果进行比较:
# Producer输出
Starting producer iterations...
8543 iterations run
# Consumer输出
Starting consumer iterations...
8607 iterations run. matches=8607, mismatches=0
正如预期的那样,报告的迭代次数将低于未同步版本。主要原因是我们在代码的关键部分花费了大部分时间持有锁。持有锁的程序会阻止另一方执行任何操作。
如果我们比较第一种情况报告的平均迭代次数,它将大约等于我们这次获得的迭代次数总和。这表明锁机制本身增加的开销很小。
7. 结论
在本教程中,我们探讨了如何在同一台机器上运行的多个JVM之间共享内存区域。我们可以将这里介绍的技术作为构建高吞吐量、低延迟进程间通信库的基础。
一如既往,所有代码都可以在GitHub上获取。