2. 概述

本文将深入探讨 java.util.concurrent 包中的 TransferQueue 构造。简单来说,这个队列允许我们基于生产者-消费者模式编写程序,并协调生产者向消费者传递消息的过程。

其实现与 BlockingQueue 类似,但新增了实现背压(backpressure)的能力。这意味着当生产者使用 transfer() 方法发送消息时,生产者线程会保持阻塞状态,直到消息被消费者实际消费。

3. 单生产者 - 零消费者场景

让我们测试 TransferQueuetransfer() 方法。预期行为是:生产者会被阻塞,直到消费者通过 take() 方法从队列中获取消息。

为验证这一点,我们创建一个单生产者、零消费者的程序。生产者首次调用 transfer() 时会无限期阻塞,因为没有消费者从队列中获取该元素。

先看 Producer 类实现:

class Producer implements Runnable {
    private TransferQueue<String> transferQueue;
 
    private String name;
 
    private Integer numberOfMessagesToProduce;
 
    public AtomicInteger numberOfProducedMessages
      = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < numberOfMessagesToProduce; i++) {
            try {
                boolean added 
                  = transferQueue.tryTransfer("A" + i, 4000, TimeUnit.MILLISECONDS);
                if(added){
                    numberOfProducedMessages.incrementAndGet();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    // 标准构造方法
}

我们向构造函数传入 TransferQueue 实例、生产者名称和待传输的消息数量。注意这里使用了带超时的 tryTransfer() 方法:等待4秒后,若生产者仍无法传输消息,则返回 false 并继续处理下一条消息。numberOfProducedMessages 变量用于跟踪已生产的消息数量。

再看 Consumer 类:

class Consumer implements Runnable {
 
    private TransferQueue<String> transferQueue;
 
    private String name;
 
    private int numberOfMessagesToConsume;
 
    public AtomicInteger numberOfConsumedMessages
     = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < numberOfMessagesToConsume; i++) {
            try {
                String element = transferQueue.take();
                longProcessing(element);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void longProcessing(String element)
      throws InterruptedException {
        numberOfConsumedMessages.incrementAndGet();
        Thread.sleep(500);
    }
    
    // 标准构造方法
}

与生产者类似,但通过 take() 方法从队列获取元素。在 longProcessing() 方法中模拟耗时操作,并递增 numberOfConsumedMessages 计数器。

现在启动仅含一个生产者的程序:

@Test
public void whenUseOneProducerAndNoConsumers_thenShouldFailWithTimeout() 
  throws InterruptedException {
    // given
    TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
    ExecutorService exService = Executors.newFixedThreadPool(2);
    Producer producer = new Producer(transferQueue, "1", 3);

    // when
    exService.execute(producer);

    // then
    exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
    exService.shutdown();

    assertEquals(producer.numberOfProducedMessages.intValue(), 0);
}

我们尝试向队列发送3个元素,但生产者在第一个元素处阻塞——因为没有消费者获取该元素。tryTransfer() 方法会阻塞直到消息被消费或超时,超时后返回 false 表示传输失败,然后尝试传输下一条。程序输出如下:

Producer: 1 is waiting to transfer...
can not add an element due to the timeout
Producer: 1 is waiting to transfer...

4. 单生产者 - 单消费者场景

测试单生产者、单消费者的情况:

@Test
public void whenUseOneConsumerAndOneProducer_thenShouldProcessAllMessages() 
  throws InterruptedException {
    // given
    TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
    ExecutorService exService = Executors.newFixedThreadPool(2);
    Producer producer = new Producer(transferQueue, "1", 3);
    Consumer consumer = new Consumer(transferQueue, "1", 3);

    // when
    exService.execute(producer);
    exService.execute(consumer);

    // then
    exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
    exService.shutdown();

    assertEquals(producer.numberOfProducedMessages.intValue(), 3);
    assertEquals(consumer.numberOfConsumedMessages.intValue(), 3);
}

TransferQueue 作为交换点,消费者未从队列获取元素前,生产者无法添加新元素。程序输出如下:

Producer: 1 is waiting to transfer...
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A0
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A1
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A1
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A2
Consumer: 1 received element: A2

由于 TransferQueue 的特性,队列元素的生成和消费是严格顺序化的。

5. 多生产者 - 多消费者场景

最后测试多生产者、多消费者的情况:

@Test
public void whenMultipleConsumersAndProducers_thenProcessAllMessages() 
  throws InterruptedException {
    // given
    TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
    ExecutorService exService = Executors.newFixedThreadPool(3);
    Producer producer1 = new Producer(transferQueue, "1", 3);
    Producer producer2 = new Producer(transferQueue, "2", 3);
    Consumer consumer1 = new Consumer(transferQueue, "1", 3);
    Consumer consumer2 = new Consumer(transferQueue, "2", 3);

    // when
    exService.execute(producer1);
    exService.execute(producer2);
    exService.execute(consumer1);
    exService.execute(consumer2);

    // then
    exService.awaitTermination(10_000, TimeUnit.MILLISECONDS);
    exService.shutdown();

    assertEquals(producer1.numberOfProducedMessages.intValue(), 3);
    assertEquals(producer2.numberOfProducedMessages.intValue(), 3);
}

本例包含两个生产者和两个消费者。程序启动时,两个生产者各能生产一个元素,随后阻塞直到有消费者从队列获取元素:

Producer: 1 is waiting to transfer...
Consumer: 1 is waiting to take element...
Producer: 2 is waiting to transfer...
Producer: 1 transferred element: A0
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 2 transferred element: A0
Producer: 2 is waiting to transfer...
Consumer: 1 received element: A0
Consumer: 1 is waiting to take element...
Producer: 1 transferred element: A1
Producer: 1 is waiting to transfer...
Consumer: 1 received element: A1
Consumer: 2 is waiting to take element...
Producer: 2 transferred element: A1
Producer: 2 is waiting to transfer...
Consumer: 2 received element: A1
Consumer: 2 is waiting to take element...
Producer: 1 transferred element: A2
Consumer: 2 received element: A2
Consumer: 2 is waiting to take element...
Producer: 2 transferred element: A2
Consumer: 2 received element: A2

6. 总结

本文深入探讨了 java.util.concurrent 包中的 TransferQueue 构造。我们展示了如何使用它实现生产者-消费者程序,并通过 transfer() 方法实现背压机制——生产者必须等待消费者从队列中取出元素后才能继续发布新消息。

当需要防止生产者过度生产导致队列溢出(引发 OutOfMemoryError)时,TransferQueue 尤其有用。在这种设计中,消费者实际控制着生产者的消息生成速度

所有示例代码和代码片段可在 GitHub 获取——这是一个 Maven 项目,可直接导入运行。


原始标题:Guide to the Java TransferQueue