1. 引言

本文将演示如何在同一台机器上运行的多个JVM之间共享内存。这种技术能实现极快的进程间通信,因为我们可以直接移动数据块而无需任何I/O操作。

2. 共享内存的工作原理

现代操作系统中运行的进程都会获得一个所谓的虚拟内存空间。我们称之为"虚拟"是因为,虽然它看起来像是一个庞大、连续且私有的可寻址内存空间,但实际上它是由分散在物理RAM各处的内存页组成的。这里的"页"只是操作系统对连续内存块的称呼,其大小范围取决于特定的CPU架构。对于x86-64架构,内存页可以小至4KB,大至1GB。

在任何给定时间,只有部分虚拟空间实际映射到物理内存页。随着时间推移,当进程需要为任务消耗更多内存时,操作系统会分配更多物理内存页并映射到虚拟空间。当内存需求超过物理可用空间时,操作系统会开始将当前未使用的内存页交换到辅助存储设备,为新请求腾出空间。

共享内存块的行为与常规内存类似,但与常规内存不同的是,它不专属于单个进程。当一个进程修改此块中任何字节的内容时,任何有权访问同一共享内存的其他进程都会立即"看到"此变化。

共享内存的常见用途包括:

  • 调试器(想知道调试器如何检查其他进程中的变量吗?)
  • 进程间通信
  • 进程间只读内容共享(例如:动态库代码)
  • 各种黑科技 ;)

3. 共享内存与内存映射文件

内存映射文件,顾名思义,是一种常规文件,其内容直接映射到进程虚拟内存中的连续区域。这意味着我们可以读取和/或修改其内容而无需显式使用I/O操作。操作系统会检测到映射区域的任何写入操作,并安排后台I/O操作来持久化修改后的数据。

由于无法保证后台操作何时发生,操作系统还提供了系统调用来刷新所有待处理的更改。这对于数据库重做日志等用例很重要,但在我们的进程间通信(IPC)场景中则不需要。

内存映射文件通常被数据库服务器用于实现高吞吐量I/O操作,但我们也可以利用它们来构建基于共享内存的IPC机制。基本思路是:所有需要共享数据的进程都映射同一个文件,瞧,它们现在就有了一个共享内存区域

4. 在Java中创建内存映射文件

在Java中,我们使用FileChannelmap()方法将文件区域映射到内存,该方法返回一个MappedByteBuffer,允许我们访问其内容:

MappedByteBuffer createSharedMemory(String path, long size) {

    try (FileChannel fc = (FileChannel)Files.newByteChannel(new File(path).toPath(),
      EnumSet.of(
        StandardOpenOption.CREATE,
        StandardOpenOption.SPARSE,
        StandardOpenOption.WRITE,
        StandardOpenOption.READ))) {

        return fc.map(FileChannel.MapMode.READ_WRITE, 0, size);
    }
    catch( IOException ioe) {
        throw new RuntimeException(ioe);
    }
}

这里使用SPARSE选项相当关键。只要底层操作系统和文件系统支持,我们就可以映射大块内存区域而无需实际消耗磁盘空间

现在,让我们创建一个简单的演示应用程序。Producer应用程序将分配一个足够容纳64KB数据加SHA1哈希(20字节)的共享内存。接下来,它将启动一个循环,用随机数据填充缓冲区,后跟其SHA1哈希值。我们将连续重复此操作30秒然后退出:

// ... SHA1摘要初始化代码省略

MappedByteBuffer shm = createSharedMemory("some_path.dat", 64*1024 + 20);
Random rnd = new Random();

long start = System.currentTimeMillis();
long iterations = 0;
int capacity = shm.capacity();
System.out.println("Starting producer iterations...");
while(System.currentTimeMillis() - start < 30000) {

    for (int i = 0; i < capacity - hashLen; i++) {
        byte value = (byte) (rnd.nextInt(256) & 0x00ff);
        digest.update(value);
        shm.put(i, value);
    }

    // 在末尾写入哈希值
    byte[] hash = digest.digest();
    shm.put(capacity - hashLen, hash);
    iterations++;
}

System.out.printf("%d iterations run\n", iterations);

为了验证我们确实能共享内存,我们还将创建一个Consumer应用程序,它将读取缓冲区内容,计算其哈希值,并与Producer生成的哈希值进行比较。我们将重复此过程30秒。在每次迭代中,我们还会计算缓冲区内容的哈希值,并将其与缓冲区末尾的哈希值进行比较:

// ... 摘要初始化代码省略

MappedByteBuffer shm = createSharedMemory("some_path.dat", 64*1024 + 20);
long start = System.currentTimeMillis();
long iterations = 0;
int capacity = shm.capacity();
System.out.println("Starting consumer iterations...");

long matchCount = 0;
long mismatchCount = 0;
byte[] expectedHash = new byte[hashLen];

while (System.currentTimeMillis() - start < 30000) {

    for (int i = 0; i < capacity - 20; i++) {
        byte value = shm.get(i);
        digest.update(value);
    }

    byte[] hash = digest.digest();
    shm.get(capacity - hashLen, expectedHash);

    if (Arrays.equals(hash, expectedHash)) {
        matchCount++;
    } else {
        mismatchCount++;
    }
    iterations++;
}

System.out.printf("%d iterations run. matches=%d, mismatches=%d\n", iterations, matchCount, mismatchCount);

要测试我们的内存共享方案,让我们同时启动这两个程序。这是它们在3Ghz四核Intel I7机器上运行时的输出:

# Producer输出
Starting producer iterations...
11722 iterations run


# Consumer输出
Starting consumer iterations...
18893 iterations run. matches=11714, mismatches=7179

我们可以看到,在许多情况下,消费者检测到预期计算值与实际值不同。欢迎来到并发问题的奇妙世界!

5. 同步共享内存访问

我们遇到问题的根本原因是需要同步访问共享内存缓冲区Consumer必须等待Producer完成哈希值写入后才能开始读取数据。另一方面,Producer也必须等待Consumer完成数据消费后才能再次写入。

对于常规的多线程应用程序,解决这个问题不是什么大事。标准库提供了多种同步原语,允许我们控制谁可以在给定时间写入共享内存。

然而,我们面对的是多JVM场景,因此这些标准方法都不适用。那么,我们该怎么办呢?简单来说,我们得耍点小聪明。我们可以使用操作系统特定的机制如信号量,但这会影响应用程序的可移植性。此外,这意味着需要使用JNI或JNA,这也会使事情复杂化。

这时Unsafe就派上用场了。尽管名字有点吓人,但这个标准库类恰好提供了我们实现简单锁机制所需的功能:compareAndSwapInt()方法。

此方法实现了一个原子测试并设置原语,接受四个参数。虽然文档中没有明确说明,但它不仅可以针对Java对象,还可以针对原始内存地址。对于后者,我们在第一个参数中传递null,这样它就会将offset参数视为虚拟内存地址。

当我们调用此方法时,它会首先检查目标地址的值,并将其与expected值进行比较。如果相等,则修改位置内容为新值并返回true表示成功。如果位置的值与expected不同,则不执行任何操作,方法返回false

更重要的是,即使在多核架构中,这个原子操作也能保证正常工作,这是同步多个执行线程的关键特性

让我们创建一个利用此方法的SpinLock类,实现一个(非常!)简单的锁机制:

//... 包和导入语句省略

public class SpinLock {
    private static final Unsafe unsafe;

    // ... unsafe初始化代码省略
    private final long addr;

    public SpinLock(long addr) {
        this.addr = addr;
    }

    public boolean tryLock(long maxWait) {
        long deadline = System.currentTimeMillis() + maxWait;
        while (System.currentTimeMillis() < deadline ) {
            if (unsafe.compareAndSwapInt(null, addr, 0, 1)) {
                return true;
            }
        }
        return false;
    }

    public void unlock() {
        unsafe.putInt(addr, 0);
    }
}

这个实现缺乏关键功能,比如在释放锁前检查是否拥有锁,但对于我们的目的已经足够。

好了,那么我们如何获取用于存储锁状态的内存地址呢?这个地址必须在共享内存缓冲区内,以便两个进程都能使用它,但MappedByteBuffer类不公开实际的内存地址。

检查map()返回的对象,我们可以看到它是一个DirectByteBuffer这个类有一个名为address()的公共方法,它恰好返回我们想要的内容。不幸的是,这个类是包私有的,所以我们不能使用简单的强制转换来访问这个方法

为了绕过这个限制,我们再耍点小聪明,使用反射来调用这个方法:

private static long getBufferAddress(MappedByteBuffer shm) {
    try {
        Class<?> cls = shm.getClass();
        Method maddr = cls.getMethod("address");
        maddr.setAccessible(true);
        Long addr = (Long) maddr.invoke(shm);
        if (addr == null) {
            throw new RuntimeException("Unable to retrieve buffer's address");
        }
        return addr;
    } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) {
        throw new RuntimeException(ex);
    }
}

这里,我们使用setAccessible()使address()方法可以通过Method句柄调用。但请注意,从Java 17开始,除非我们明确使用运行时–add-opens标志,否则这种技术将无法工作

6. 为Producer和Consumer添加同步

现在我们有了锁机制,让我们先将其应用到Producer。为了演示目的,我们假设Producer将始终在Consumer之前启动。我们需要这样做来初始化缓冲区,清除其内容,包括我们将用于SpinLock的区域:

public static void main(String[] args) throws Exception {

    // ... 摘要初始化代码省略
    MappedByteBuffer shm = createSharedMemory("some_path.dat", 64*1024 + 20);

    // 清理锁区域 
    shm.putInt(0, 0);

    long addr = getBufferAddress(shm);
    System.out.println("Starting producer iterations...");

    long start = System.currentTimeMillis();
    long iterations = 0;
    Random rnd = new Random();
    int capacity = shm.capacity();
    SpinLock lock = new SpinLock(addr);
    while(System.currentTimeMillis() - start < 30000) {

        if (!lock.tryLock(5000)) {
            throw new RuntimeException("Unable to acquire lock");
        }

        try {
            // 跳过前4个字节,因为它们被锁使用
            for (int i = 4; i < capacity - hashLen; i++) {
                byte value = (byte) (rnd.nextInt(256) & 0x00ff);
                digest.update(value);
                shm.put(i, value);
            }

            // 在末尾写入哈希值
            byte[] hash = digest.digest();
            shm.put(capacity - hashLen, hash);
            iterations++;
        }
        finally {
            lock.unlock();
        }
    }
    System.out.printf("%d iterations run\n", iterations);
}

与未同步版本相比,只有一些微小改动:

  • 获取与MappedByteBufer关联的内存地址
  • 使用此地址创建SpinLock实例。锁使用一个int,因此它将占用缓冲区的前四个字节
  • 使用SpinLock实例保护用随机数据及其哈希值填充缓冲区的代码

现在,让我们对Consumer端应用类似的更改:

private static void main(String args[]) throws Exception {

    // ... 摘要初始化代码省略
    MappedByteBuffer shm = createSharedMemory("some_path.dat", 64*1024 + 20);
    long addr = getBufferAddress(shm);

    System.out.println("Starting consumer iterations...");

    Random rnd = new Random();
    long start = System.currentTimeMillis();
    long iterations = 0;
    int capacity = shm.capacity();

    long matchCount = 0;
    long mismatchCount = 0;
    byte[] expectedHash = new byte[hashLen];
    SpinLock lock = new SpinLock(addr);
    while (System.currentTimeMillis() - start < 30000) {

        if (!lock.tryLock(5000)) {
            throw new RuntimeException("Unable to acquire lock");
        }

        try {
            for (int i = 4; i < capacity - hashLen; i++) {
                byte value = shm.get(i);
                digest.update(value);
            }

            byte[] hash = digest.digest();
            shm.get(capacity - hashLen, expectedHash);

            if (Arrays.equals(hash, expectedHash)) {
                matchCount++;
            } else {
                mismatchCount++;
            }

            iterations++;
        } finally {
            lock.unlock();
        }
    }

    System.out.printf("%d iterations run. matches=%d, mismatches=%d\n", iterations, matchCount, mismatchCount);
}

通过这些更改,我们现在可以运行两端并与之前的结果进行比较:

# Producer输出
Starting producer iterations...
8543 iterations run

# Consumer输出
Starting consumer iterations...
8607 iterations run. matches=8607, mismatches=0

正如预期的那样,报告的迭代次数将低于未同步版本。主要原因是我们在代码的关键部分花费了大部分时间持有锁。持有锁的程序会阻止另一方执行任何操作。

如果我们比较第一种情况报告的平均迭代次数,它将大约等于我们这次获得的迭代次数总和。这表明锁机制本身增加的开销很小。

7. 结论

在本教程中,我们探讨了如何在同一台机器上运行的多个JVM之间共享内存区域。我们可以将这里介绍的技术作为构建高吞吐量、低延迟进程间通信库的基础。

一如既往,所有代码都可以在GitHub上获取


原始标题:Sharing Memory Between JVMs