1. 概述

本文将探讨如何为特定键获取锁,防止对该键的并发操作,同时不影响其他键的操作。核心目标是实现以下两个方法:

  • void lock(String key)
  • void unlock(String key)

为简化教程,我们假设键都是 String 类型。实际使用时可替换为任何类型,只要确保正确实现了 equals()hashCode() 方法(因为我们会用它们作为 HashMap 的键)。

2. 简单的互斥锁实现

首先,我们实现一个基础版本:当键被占用时直接拒绝新操作。这里我们将使用 boolean tryLock(String key) 替代 lock() 方法。

核心思路是维护一个 Set 集合存储当前正在使用的键。当新操作请求某个键时,若发现该键已被其他线程占用,直接拒绝操作。

由于线程安全的 Set 实现不存在,我们改用 ConcurrentHashMap 支撑的 Set,确保多线程环境下的数据一致性:

public class SimpleExclusiveLockByKey {

    private static Set<String> usedKeys= ConcurrentHashMap.newKeySet();
    
    public boolean tryLock(String key) {
        return usedKeys.add(key);
    }
    
    public void unlock(String key) {
        usedKeys.remove(key);
    }
}

使用方式如下:

String key = "key";
SimpleExclusiveLockByKey lockByKey = new SimpleExclusiveLockByKey();
try {
    lockByKey.tryLock(key);
    // 只有获取到锁时才执行的代码
} finally { // 关键点
    lockByKey.unlock(key);
}

务必注意 finally 块的存在: 即使 try 块内抛出异常,也能确保锁被释放。这是避免死锁的黄金法则。

3. 基于键的锁获取与释放

现在深入探讨更复杂场景:我们不直接拒绝并发操作,而是让新操作等待当前操作完成。流程如下:

  • 线程1请求键锁:成功获取
  • 线程2请求相同键锁:进入等待队列
  • 线程1释放锁
  • 线程2获取锁并执行操作

3.1. 定义带线程计数器的锁

使用 Lock 接口实现此场景最自然。我们选用 ReentrantLock 作为基础实现,并将其封装在内部类中。该类需跟踪等待线程数,提供计数器增减方法:

private static class LockWrapper {
    private final Lock lock = new ReentrantLock();
    private final AtomicInteger numberOfThreadsInQueue = new AtomicInteger(1);

    private LockWrapper addThreadInQueue() {
        numberOfThreadsInQueue.incrementAndGet(); 
        return this;
    }

    private int removeThreadFromQueue() {
        return numberOfThreadsInQueue.decrementAndGet(); 
    }
}

3.2. 让锁处理线程排队

继续使用 ConcurrentHashMap,但值改为 LockWrapper 对象:

private static ConcurrentHashMap<String, LockWrapper> locks = new ConcurrentHashMap<String, LockWrapper>();

当线程请求锁时:

  • 若键不存在:创建新 LockWrapper,计数器初始化为1
  • 若键已存在:返回现有 LockWrapper 并递增计数器

实现代码非常简洁:

public void lock(String key) {
    LockWrapper lockWrapper = locks.compute(key, (k, v) -> v == null ? new LockWrapper() : v.addThreadInQueue());
    lockWrapper.lock.lock();
}

compute 方法的工作原理:

  1. 获取键对应的当前值
  2. 应用 BiFunction 生成新值
  3. 用新值替换旧值

3.3. 解锁并清理Map条目

释放锁时递减计数器,当计数器归零时移除 ConcurrentHashMap 中的条目:

public void unlock(String key) {
    LockWrapper lockWrapper = locks.get(key);
    lockWrapper.lock.unlock();
    if (lockWrapper.removeThreadFromQueue() == 0) { 
        // 传入特定值避免并发问题
        locks.remove(key, lockWrapper);
    }
}

3.4. 完整实现

最终完整代码如下:

public class LockByKey {
    
    private static class LockWrapper {
        private final Lock lock = new ReentrantLock();
        private final AtomicInteger numberOfThreadsInQueue = new AtomicInteger(1);
        
        private LockWrapper addThreadInQueue() {
            numberOfThreadsInQueue.incrementAndGet(); 
            return this;
        }
        
        private int removeThreadFromQueue() {
            return numberOfThreadsInQueue.decrementAndGet(); 
        }
        
    }
    
    private static ConcurrentHashMap<String, LockWrapper> locks = new ConcurrentHashMap<String, LockWrapper>();
    
    public void lock(String key) {
        LockWrapper lockWrapper = locks.compute(key, (k, v) -> v == null ? new LockWrapper() : v.addThreadInQueue());
        lockWrapper.lock.lock();
    }
    
    public void unlock(String key) {
        LockWrapper lockWrapper = locks.get(key);
        lockWrapper.lock.unlock();
        if (lockWrapper.removeThreadFromQueue() == 0) { 
            // 传入特定值避免并发问题
            locks.remove(key, lockWrapper);
        }
    }
    
}

使用方式与简单版本类似:

String key = "key"; 
LockByKey lockByKey = new LockByKey(); 
try { 
    lockByKey.lock(key);
    // 业务代码 
} finally { // 关键点 
    lockByKey.unlock(key); 
}

4. 允许有限并发操作

最后考虑另一种场景:不限制单线程操作,而是允许最多 n 个线程同时操作同一键(这里设 n=2)。场景描述:

  • 线程1请求锁:允许
  • 线程2请求相同锁:允许
  • 线程3请求相同锁:需等待前两个线程释放锁

Semaphore 是解决此问题的利器,它能限制同时访问资源的线程数。实现与锁机制高度相似:

public class SimultaneousEntriesLockByKey {

    private static final int ALLOWED_THREADS = 2;
    
    private static ConcurrentHashMap<String, Semaphore> semaphores = new ConcurrentHashMap<String, Semaphore>();
    
    public void lock(String key) {
        Semaphore semaphore = semaphores.compute(key, (k, v) -> v == null ? new Semaphore(ALLOWED_THREADS) : v);
        semaphore.acquireUninterruptibly();
    }
    
    public void unlock(String key) {
        Semaphore semaphore = semaphores.get(key);
        semaphore.release();
        if (semaphore.availablePermits() == ALLOWED_THREADS) { 
            semaphores.remove(key, semaphore);
        }  
    }
    
}

使用方式完全一致:

String key = "key"; 
SimultaneousEntriesLockByKey lockByKey = new SimultaneousEntriesLockByKey(); 
try { 
    lockByKey.lock(key); 
    // 业务代码 
} finally { // 关键点 
    lockByKey.unlock(key); 
}

5. 总结

本文展示了三种基于键的并发锁实现方案:

  1. 简单互斥锁:直接拒绝并发操作
  2. 排队锁:让新操作等待当前操作完成
  3. 信号量锁:限制最大并发线程数

这些方案能有效解决键级并发控制问题,避免全局锁带来的性能瓶颈。完整代码可在 GitHub 获取。


原始标题:Acquire a Lock by a Key in Java