1. 引言

在多线程环境下,我们有时需要根据自定义规则(而非创建时间)调度任务。本文将介绍如何使用Java中的PriorityBlockingQueue实现基于优先级的任务调度机制。

2. 概述

假设我们需要根据优先级执行任务,首先定义任务类:

public class Job implements Runnable {
    private String jobName;
    private JobPriority jobPriority;
    
    @Override
    public void run() {
        System.out.println("Job:" + jobName +
          " Priority:" + jobPriority);
        Thread.sleep(1000); // 模拟实际执行耗时
    }

    // 标准的setter和getter方法
}

run()方法中打印任务名称和优先级,并通过sleep()模拟耗时操作。当任务执行时,更多任务将在优先级队列中累积。

JobPriority是一个简单的枚举类型:

public enum JobPriority {
    HIGH,
    MEDIUM,
    LOW
}

3. 自定义比较器

我们需要定义比较器实现优先级规则。在Java 8中可以简单粗暴地这样实现:

Comparator.comparing(Job::getJobPriority);

⚠️ 注意:这里直接使用方法引用创建比较器,代码简洁但需要确保getJobPriority()返回的类型已实现自然排序。

4. 优先级任务调度器

现在实现一个简单的任务调度器,使用单线程执行器从PriorityBlockingQueue获取并执行任务:

public class PriorityJobScheduler {

    private ExecutorService priorityJobPoolExecutor;
    private ExecutorService priorityJobScheduler 
      = Executors.newSingleThreadExecutor();
    private PriorityBlockingQueue<Job> priorityQueue;

    public PriorityJobScheduler(Integer poolSize, Integer queueSize) {
        priorityJobPoolExecutor = Executors.newFixedThreadPool(poolSize);
        priorityQueue = new PriorityBlockingQueue<Job>(
          queueSize, 
          Comparator.comparing(Job::getJobPriority));
        priorityJobScheduler.execute(() -> {
            while (true) {
                try {
                    priorityJobPoolExecutor.execute(priorityQueue.take());
                } catch (InterruptedException e) {
                    // 需要特殊处理中断异常
                    break;
                }
            }
        });
    }

    public void scheduleJob(Job job) {
        priorityQueue.add(job);
    }
}

核心在于创建带自定义比较器的PriorityBlockingQueue实例。通过take()方法获取队列头部的任务(会自动阻塞等待任务)。客户端只需调用scheduleJob()添加任务,队列会根据比较器自动排序。

✅ 关键点:实际任务执行由独立的线程池处理,避免阻塞调度线程。

5. 演示

下面演示调度器的使用效果:

private static int POOL_SIZE = 1;
private static int QUEUE_SIZE = 10;

@Test
public void whenMultiplePriorityJobsQueued_thenHighestPriorityJobIsPicked() {
    Job job1 = new Job("Job1", JobPriority.LOW);
    Job job2 = new Job("Job2", JobPriority.MEDIUM);
    Job job3 = new Job("Job3", JobPriority.HIGH);
    Job job4 = new Job("Job4", JobPriority.MEDIUM);
    Job job5 = new Job("Job5", JobPriority.LOW);
    Job job6 = new Job("Job6", JobPriority.HIGH);
    
    PriorityJobScheduler pjs = new PriorityJobScheduler(
      POOL_SIZE, QUEUE_SIZE);
    
    pjs.scheduleJob(job1);
    pjs.scheduleJob(job2);
    pjs.scheduleJob(job3);
    pjs.scheduleJob(job4);
    pjs.scheduleJob(job5);
    pjs.scheduleJob(job6);

    // 清理资源
}

为验证优先级顺序,我们故意设置POOL_SIZE=1(单线程执行),但队列容量QUEUE_SIZE=10。提交不同优先级的任务后,典型输出如下:

Job:Job3 Priority:HIGH
Job:Job6 Priority:HIGH
Job:Job4 Priority:MEDIUM
Job:Job2 Priority:MEDIUM
Job:Job1 Priority:LOW
Job:Job5 Priority:LOW

❌ 注意:每次运行输出顺序可能不同,但绝不会出现低优先级任务在高优先级任务存在时被执行的情况。

6. 总结

本文展示了如何利用PriorityBlockingQueue实现自定义优先级的任务调度。核心要点:

  • 通过Comparator定义优先级规则
  • 使用阻塞队列自动管理任务排序
  • 分离调度线程和执行线程池

完整源码可在GitHub仓库获取。


原始标题:Priority-based Job Scheduling in Java | Baeldung