2. 信号量(Semaphore)基础

信号量是并发编程中控制资源访问的核心工具,在Java中通过java.util.concurrent.Semaphore实现。我们可以用它限制同时访问特定资源的线程数量。

举个实际场景:实现一个登录队列控制系统,限制同时在线用户数:

class LoginQueueUsingSemaphore {

    private Semaphore semaphore;

    public LoginQueueUsingSemaphore(int slotLimit) {
        semaphore = new Semaphore(slotLimit);
    }

    boolean tryLogin() {
        return semaphore.tryAcquire();
    }

    void logout() {
        semaphore.release();
    }

    int availableSlots() {
        return semaphore.availablePermits();
    }
}

关键方法解析:

  • tryAcquire() - 立即尝试获取许可,成功返回true,失败返回false(不阻塞)
  • ⚠️ acquire() - 阻塞直到获取许可(注意与tryAcquire区别)
  • 🔓 release() - 释放许可
  • 📊 availablePermits() - 查询当前可用许可数

测试场景1:达到上限后拒绝新请求

@Test
public void givenLoginQueue_whenReachLimit_thenBlocked() {
    int slots = 10;
    ExecutorService executorService = Executors.newFixedThreadPool(slots);
    LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
    IntStream.range(0, slots)
      .forEach(user -> executorService.execute(loginQueue::tryLogin));
    executorService.shutdown();

    assertEquals(0, loginQueue.availableSlots());
    assertFalse(loginQueue.tryLogin());
}

测试场景2:用户退出后释放许可

@Test
public void givenLoginQueue_whenLogout_thenSlotsAvailable() {
    int slots = 10;
    ExecutorService executorService = Executors.newFixedThreadPool(slots);
    LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
    IntStream.range(0, slots)
      .forEach(user -> executorService.execute(loginQueue::tryLogin));
    executorService.shutdown();
    assertEquals(0, loginQueue.availableSlots());
    loginQueue.logout();

    assertTrue(loginQueue.availableSlots() > 0);
    assertTrue(loginQueue.tryLogin());
}

3. 定时信号量(TimedSemaphore)

Apache Commons提供的TimedSemaphore在普通信号量基础上增加了时间维度:许可会在指定周期后自动重置。适合实现速率控制场景。

构建延迟队列示例:

class DelayQueueUsingTimedSemaphore {

    private TimedSemaphore semaphore;

    DelayQueueUsingTimedSemaphore(long period, int slotLimit) {
        semaphore = new TimedSemaphore(period, TimeUnit.SECONDS, slotLimit);
    }

    boolean tryAdd() {
        return semaphore.tryAcquire();
    }

    int availableSlots() {
        return semaphore.getAvailablePermits();
    }
}

测试场景1:周期内耗尽许可后阻塞

public void givenDelayQueue_whenReachLimit_thenBlocked() {
    int slots = 50;
    ExecutorService executorService = Executors.newFixedThreadPool(slots);
    DelayQueueUsingTimedSemaphore delayQueue 
      = new DelayQueueUsingTimedSemaphore(1, slots);
    
    IntStream.range(0, slots)
      .forEach(user -> executorService.execute(delayQueue::tryAdd));
    executorService.shutdown();

    assertEquals(0, delayQueue.availableSlots());
    assertFalse(delayQueue.tryAdd());
}

测试场景2:周期结束后自动重置许可

@Test
public void givenDelayQueue_whenTimePass_thenSlotsAvailable() throws InterruptedException {
    int slots = 50;
    ExecutorService executorService = Executors.newFixedThreadPool(slots);
    DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots);
    IntStream.range(0, slots)
      .forEach(user -> executorService.execute(delayQueue::tryAdd));
    executorService.shutdown();

    assertEquals(0, delayQueue.availableSlots());
    Thread.sleep(1000);
    assertTrue(delayQueue.availableSlots() > 0);
    assertTrue(delayQueue.tryAdd());
}

4. 信号量 vs 互斥锁(Mutex)

互斥锁本质上是二进制信号量,用于实现资源独占访问。下面用信号量实现线程安全的计数器:

class CounterUsingMutex {

    private Semaphore mutex;
    private int count;

    CounterUsingMutex() {
        mutex = new Semaphore(1);
        count = 0;
    }

    void increase() throws InterruptedException {
        mutex.acquire();
        this.count = this.count + 1;
        Thread.sleep(1000);
        mutex.release();
    }

    int getCount() {
        return this.count;
    }

    boolean hasQueuedThreads() {
        return mutex.hasQueuedThreads();
    }
}

测试场景1:多线程竞争时阻塞排队

@Test
public void whenMutexAndMultipleThreads_thenBlocked()
 throws InterruptedException {
    int count = 5;
    ExecutorService executorService
     = Executors.newFixedThreadPool(count);
    CounterUsingMutex counter = new CounterUsingMutex();
    IntStream.range(0, count)
      .forEach(user -> executorService.execute(() -> {
          try {
              counter.increase();
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }));
    executorService.shutdown();

    assertTrue(counter.hasQueuedThreads());
}

测试场景2:延迟后所有线程完成计数

@Test
public void givenMutexAndMultipleThreads_ThenDelay_thenCorrectCount()
 throws InterruptedException {
    int count = 5;
    ExecutorService executorService
     = Executors.newFixedThreadPool(count);
    CounterUsingMutex counter = new CounterUsingMutex();
    IntStream.range(0, count)
      .forEach(user -> executorService.execute(() -> {
          try {
              counter.increase();
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }));
    executorService.shutdown();

    assertTrue(counter.hasQueuedThreads());
    Thread.sleep(5000);
    assertFalse(counter.hasQueuedThreads());
    assertEquals(count, counter.getCount());
}

5. 总结

本文通过实际案例展示了Java中信号量的核心用法:

  • 普通信号量用于资源访问控制
  • 定时信号量实现速率限制
  • 二进制信号量构建互斥锁

关键点回顾:

  • tryAcquire()非阻塞获取许可
  • ⏱️ TimedSemaphore自动周期重置
  • 🔒 互斥锁本质是许可数为1的信号量
  • ⚠️ 记得在finally块中释放许可避免死锁

完整代码示例见GitHub仓库


原始标题:Semaphores in Java | Baeldung