1. 引言
在多线程环境下,我们有时需要根据自定义规则(而非创建时间)调度任务。本文将介绍如何使用Java中的PriorityBlockingQueue
实现基于优先级的任务调度机制。
2. 概述
假设我们需要根据优先级执行任务,首先定义任务类:
public class Job implements Runnable {
private String jobName;
private JobPriority jobPriority;
@Override
public void run() {
System.out.println("Job:" + jobName +
" Priority:" + jobPriority);
Thread.sleep(1000); // 模拟实际执行耗时
}
// 标准的setter和getter方法
}
在run()
方法中打印任务名称和优先级,并通过sleep()
模拟耗时操作。当任务执行时,更多任务将在优先级队列中累积。
JobPriority
是一个简单的枚举类型:
public enum JobPriority {
HIGH,
MEDIUM,
LOW
}
3. 自定义比较器
我们需要定义比较器实现优先级规则。在Java 8中可以简单粗暴地这样实现:
Comparator.comparing(Job::getJobPriority);
⚠️ 注意:这里直接使用方法引用创建比较器,代码简洁但需要确保getJobPriority()
返回的类型已实现自然排序。
4. 优先级任务调度器
现在实现一个简单的任务调度器,使用单线程执行器从PriorityBlockingQueue
获取并执行任务:
public class PriorityJobScheduler {
private ExecutorService priorityJobPoolExecutor;
private ExecutorService priorityJobScheduler
= Executors.newSingleThreadExecutor();
private PriorityBlockingQueue<Job> priorityQueue;
public PriorityJobScheduler(Integer poolSize, Integer queueSize) {
priorityJobPoolExecutor = Executors.newFixedThreadPool(poolSize);
priorityQueue = new PriorityBlockingQueue<Job>(
queueSize,
Comparator.comparing(Job::getJobPriority));
priorityJobScheduler.execute(() -> {
while (true) {
try {
priorityJobPoolExecutor.execute(priorityQueue.take());
} catch (InterruptedException e) {
// 需要特殊处理中断异常
break;
}
}
});
}
public void scheduleJob(Job job) {
priorityQueue.add(job);
}
}
核心在于创建带自定义比较器的PriorityBlockingQueue
实例。通过take()
方法获取队列头部的任务(会自动阻塞等待任务)。客户端只需调用scheduleJob()
添加任务,队列会根据比较器自动排序。
✅ 关键点:实际任务执行由独立的线程池处理,避免阻塞调度线程。
5. 演示
下面演示调度器的使用效果:
private static int POOL_SIZE = 1;
private static int QUEUE_SIZE = 10;
@Test
public void whenMultiplePriorityJobsQueued_thenHighestPriorityJobIsPicked() {
Job job1 = new Job("Job1", JobPriority.LOW);
Job job2 = new Job("Job2", JobPriority.MEDIUM);
Job job3 = new Job("Job3", JobPriority.HIGH);
Job job4 = new Job("Job4", JobPriority.MEDIUM);
Job job5 = new Job("Job5", JobPriority.LOW);
Job job6 = new Job("Job6", JobPriority.HIGH);
PriorityJobScheduler pjs = new PriorityJobScheduler(
POOL_SIZE, QUEUE_SIZE);
pjs.scheduleJob(job1);
pjs.scheduleJob(job2);
pjs.scheduleJob(job3);
pjs.scheduleJob(job4);
pjs.scheduleJob(job5);
pjs.scheduleJob(job6);
// 清理资源
}
为验证优先级顺序,我们故意设置POOL_SIZE=1
(单线程执行),但队列容量QUEUE_SIZE=10
。提交不同优先级的任务后,典型输出如下:
Job:Job3 Priority:HIGH
Job:Job6 Priority:HIGH
Job:Job4 Priority:MEDIUM
Job:Job2 Priority:MEDIUM
Job:Job1 Priority:LOW
Job:Job5 Priority:LOW
❌ 注意:每次运行输出顺序可能不同,但绝不会出现低优先级任务在高优先级任务存在时被执行的情况。
6. 总结
本文展示了如何利用PriorityBlockingQueue
实现自定义优先级的任务调度。核心要点:
- 通过
Comparator
定义优先级规则 - 使用阻塞队列自动管理任务排序
- 分离调度线程和执行线程池
完整源码可在GitHub仓库获取。