2. 信号量(Semaphore)基础
信号量是并发编程中控制资源访问的核心工具,在Java中通过java.util.concurrent.Semaphore
实现。我们可以用它限制同时访问特定资源的线程数量。
举个实际场景:实现一个登录队列控制系统,限制同时在线用户数:
class LoginQueueUsingSemaphore {
private Semaphore semaphore;
public LoginQueueUsingSemaphore(int slotLimit) {
semaphore = new Semaphore(slotLimit);
}
boolean tryLogin() {
return semaphore.tryAcquire();
}
void logout() {
semaphore.release();
}
int availableSlots() {
return semaphore.availablePermits();
}
}
关键方法解析:
- ✅
tryAcquire()
- 立即尝试获取许可,成功返回true,失败返回false(不阻塞) - ⚠️
acquire()
- 阻塞直到获取许可(注意与tryAcquire区别) - 🔓
release()
- 释放许可 - 📊
availablePermits()
- 查询当前可用许可数
测试场景1:达到上限后拒绝新请求
@Test
public void givenLoginQueue_whenReachLimit_thenBlocked() {
int slots = 10;
ExecutorService executorService = Executors.newFixedThreadPool(slots);
LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(loginQueue::tryLogin));
executorService.shutdown();
assertEquals(0, loginQueue.availableSlots());
assertFalse(loginQueue.tryLogin());
}
测试场景2:用户退出后释放许可
@Test
public void givenLoginQueue_whenLogout_thenSlotsAvailable() {
int slots = 10;
ExecutorService executorService = Executors.newFixedThreadPool(slots);
LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(loginQueue::tryLogin));
executorService.shutdown();
assertEquals(0, loginQueue.availableSlots());
loginQueue.logout();
assertTrue(loginQueue.availableSlots() > 0);
assertTrue(loginQueue.tryLogin());
}
3. 定时信号量(TimedSemaphore)
Apache Commons提供的TimedSemaphore
在普通信号量基础上增加了时间维度:许可会在指定周期后自动重置。适合实现速率控制场景。
构建延迟队列示例:
class DelayQueueUsingTimedSemaphore {
private TimedSemaphore semaphore;
DelayQueueUsingTimedSemaphore(long period, int slotLimit) {
semaphore = new TimedSemaphore(period, TimeUnit.SECONDS, slotLimit);
}
boolean tryAdd() {
return semaphore.tryAcquire();
}
int availableSlots() {
return semaphore.getAvailablePermits();
}
}
测试场景1:周期内耗尽许可后阻塞
public void givenDelayQueue_whenReachLimit_thenBlocked() {
int slots = 50;
ExecutorService executorService = Executors.newFixedThreadPool(slots);
DelayQueueUsingTimedSemaphore delayQueue
= new DelayQueueUsingTimedSemaphore(1, slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(delayQueue::tryAdd));
executorService.shutdown();
assertEquals(0, delayQueue.availableSlots());
assertFalse(delayQueue.tryAdd());
}
测试场景2:周期结束后自动重置许可
@Test
public void givenDelayQueue_whenTimePass_thenSlotsAvailable() throws InterruptedException {
int slots = 50;
ExecutorService executorService = Executors.newFixedThreadPool(slots);
DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(delayQueue::tryAdd));
executorService.shutdown();
assertEquals(0, delayQueue.availableSlots());
Thread.sleep(1000);
assertTrue(delayQueue.availableSlots() > 0);
assertTrue(delayQueue.tryAdd());
}
4. 信号量 vs 互斥锁(Mutex)
互斥锁本质上是二进制信号量,用于实现资源独占访问。下面用信号量实现线程安全的计数器:
class CounterUsingMutex {
private Semaphore mutex;
private int count;
CounterUsingMutex() {
mutex = new Semaphore(1);
count = 0;
}
void increase() throws InterruptedException {
mutex.acquire();
this.count = this.count + 1;
Thread.sleep(1000);
mutex.release();
}
int getCount() {
return this.count;
}
boolean hasQueuedThreads() {
return mutex.hasQueuedThreads();
}
}
测试场景1:多线程竞争时阻塞排队
@Test
public void whenMutexAndMultipleThreads_thenBlocked()
throws InterruptedException {
int count = 5;
ExecutorService executorService
= Executors.newFixedThreadPool(count);
CounterUsingMutex counter = new CounterUsingMutex();
IntStream.range(0, count)
.forEach(user -> executorService.execute(() -> {
try {
counter.increase();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
executorService.shutdown();
assertTrue(counter.hasQueuedThreads());
}
测试场景2:延迟后所有线程完成计数
@Test
public void givenMutexAndMultipleThreads_ThenDelay_thenCorrectCount()
throws InterruptedException {
int count = 5;
ExecutorService executorService
= Executors.newFixedThreadPool(count);
CounterUsingMutex counter = new CounterUsingMutex();
IntStream.range(0, count)
.forEach(user -> executorService.execute(() -> {
try {
counter.increase();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
executorService.shutdown();
assertTrue(counter.hasQueuedThreads());
Thread.sleep(5000);
assertFalse(counter.hasQueuedThreads());
assertEquals(count, counter.getCount());
}
5. 总结
本文通过实际案例展示了Java中信号量的核心用法:
- 普通信号量用于资源访问控制
- 定时信号量实现速率限制
- 二进制信号量构建互斥锁
关键点回顾:
- ✅
tryAcquire()
非阻塞获取许可 - ⏱️
TimedSemaphore
自动周期重置 - 🔒 互斥锁本质是许可数为1的信号量
- ⚠️ 记得在finally块中释放许可避免死锁
完整代码示例见GitHub仓库