Java/Android中的优先级任务队列的实践

志不强者智不达,言不信者行不果。这篇文章主要讲述Java/Android中的优先级任务队列的实践相关的知识,希望能为你提供帮助。

版权声明: 转载必须注明本文转自严振杰的博客: http://blog.yanzhenjie.com
刚刚把公司的活干完, 去群里水, 有几个小伙伴问我怎么实现队列, 于是乎我来写一篇吧。本篇文章适用于java和android开发者, 会从实现一个最简单的队列过渡到实现一个带有优先级的队列, 保准你可以掌握基本的队列原理。
队列的基本理解 用生活中的一个情景来举个栗子, 前段时间很火爆的电视剧《人民的名义》中有一个丁义珍式的窗口大家应该都知道了, 我们不说《人民的名义》也不说丁义珍, 我们来说说这个办事窗口。
我们知道在某机构上班期间, 窗口一直是开着的, 有人去办事了窗口就开始做事, 没人办事了窗口就处于等待的状态, 如果去办事的人特别多, 这些办事的人就必须排队( 我特别讨厌在要排队的地方不排队的人, 呼吁大家如果不是性命攸关的急事, 一定要排队, 谁不想早点办完事呢) , 窗口也可能是多个, 而这些窗口中也可能有一些特别窗口, 比如军人优先办理。
在说队列之前说两个名词: Task是任务, TaskExecutor是任务执行器。
而我们今天要说的队列就完全符合某机构这个情况, 队列在有Task进来的时候TaskExecutor就立刻开始执行Task, 当没有Task的时候TaskExecutor就处于一个阻塞状态, 当有很多Task的时候Task也需要排队, TaskExecutor也可以是多个, 并且可以指定某几个Task优先执行或者滞后执行。
综上所说我们得出一个这样的关系: 队列相当于某机构, TaskExecutor相当于窗口, 办事者就是Task
本文源码下载地址:
http://download.csdn.net/detail/yanzhenjie1003/9841188
项目源码使用IDEA写的, 你可以直接import到你的IDEA, 或者把源码直接拷贝到Eclipse或者AndroidStudio。
普通队列 当然很多机构也没有设置什么军人优先的窗口, 所以队列也有不带优先级的队列, 因此我们先来实现一个非优先级的队列。
我们常用的队列接口有: Queue< E> BlockingQueue< E> , 基于上面我们说的特点, 我们用BlockingQueue< E> 来实现任务队列, BlockingQueue< E> 的实现有很多, 这里我们选择LinkedBlockingQueue< E>
和上述某机构不一样, 某机构可以先有机构, 再有窗口, 再有办事者。但是我们写代码的时候, 要想写一个队列, 那么务必要在队列中写TaskExecutor, 那么就得先写好TaskExecutor类, 以此类推就得先有Task类。
因此我们先写一个Task的接口, 也就是办事的人, 我把它设计为接口, 方便办各种不同事的人进来:
// 办事的人。 public interface ITask {// 办事, 我们把办事的方法给办事的人, 也就是你要办什么事, 由你自己决定。 void run(); }

接下来再写一个TaskExecutor的类, 也就是窗口, 用来执行Task, 认真看注释, 非常有助于理解:
// 窗口 public class TaskExecutor extends Thread {// 在窗口拍的队, 这个队里面是办事的人。 private BlockingQueue< ITask> taskQueue; // 这个办事窗口是否在等待着办事。 private boolean isRunning = true; public TaskExecutor(BlockingQueue< ITask> taskQueue) { this.taskQueue = taskQueue; }// 下班。 public void quit() { isRunning = false; interrupt(); }@ Override public void run() { while (isRunning) { // 如果是上班状态就待着。 ITask iTask; try { iTask = taskQueue.take(); // 叫下一个办事的人进来, 没有人就等着。 } catch (InterruptedException e) { if (!isRunning) { // 发生意外了, 是下班状态的话就把窗口关闭。 interrupt(); break; // 如果执行到break, 后面的代码就无效了。 } // 发生意外了, 不是下班状态, 那么窗口继续等待。 continue; }// 为这个办事的人办事。 iTask.run(); } } }

这里要稍微解释下BlockingQueue< T> #take()方法, 这个方法当队列里面的item为空的时候, 它会一直处于阻塞状态, 当队列中进入item的时候它会立刻有一个返回值, 它就和ServerSocket.accept()方法一样, 所以我们把它放入一个Thread中, 以免阻塞调用它的线程( Android中可能是主线程) 。
办事的人和窗口都有了, 下面我们封装一个队列, 也就是某机构, 用来管理这些窗口:
// 某机构。 public class TaskQueue {// 某机构排的队, 队里面是办事的人。 private BlockingQueue< ITask> mTaskQueue; // 好多窗口。 private TaskExecutor[] mTaskExecutors; // 在开发者new队列的时候, 要指定窗口数量。 public TaskQueue(int size) { mTaskQueue = new LinkedBlockingQueue< > (); mTaskExecutors = new TaskExecutor[size]; }// 开始上班。 public void start() { stop(); // 把各个窗口都打开, 让窗口开始上班。 for (int i = 0; i < mTaskExecutors.length; i+ + ) { mTaskExecutors[i] = new TaskExecutor(mTaskQueue); mTaskExecutors[i].start(); } }// 统一各个窗口下班。 public void stop() { if (mTaskExecutors != null) for (TaskExecutor taskExecutor : mTaskExecutors) { if (taskExecutor != null) taskExecutor.quit(); } }// 开一个门, 让办事的人能进来。 public < T extends ITask> int add(T task) { if (!mTaskQueue.contains(task)) { mTaskQueue.add(task); } // 返回排的队的人数, 公开透明, 让外面的人看的有多少人在等着办事。 return mTaskQueue.size(); } }

某机构、窗口、办事的人都有了, 下面我们就派一个人去一件具体的事, 但是上面我的Task是一个接口, 所以我们需要用一个类来实现这个接口, 来做某一件事:
// 做一件打印自己的id的事。 public class PrintTask implements ITask {private int id; public PrintTask(int id) { this.id = id; }@ Override public void run() { // 为了尽量模拟窗口办事的速度, 我们这里停顿两秒。 try { Thread.sleep(2000); } catch (InterruptedException ignored) { }System.out.println(" 我的id是: " + id); } }

下面就让我们模拟的虚拟世界运行一次:
public class Main {public static void main(String... args) { // 这里暂时只开一个窗口。 TaskQueue taskQueue = new TaskQueue(1); taskQueue.start(); for (int i = 0; i < 10; i+ + ) { PrintTask task = new PrintTask(i); taskQueue.add(task); } }}

没错, 队列按照我们理想的状况打印出来了:
我的id是: 0 我的id是: 1 我的id是: 2 我的id是: 3 我的id是: 4 我的id是: 5 我的id是: 6 我的id是: 7 我的id是: 8 我的id是: 9

上面我门只开了一个窗口, 下面我多开几个窗口:
public class Main {public static void main(String... args) { // 开三个窗口。 TaskQueue taskQueue = new TaskQueue(3); taskQueue.start(); // 某机构开始工作。for (int i = 0; i < 10; i+ + ) { // new 10 个需要办事的人, 并且进入某机构办事。 PrintTask task = new PrintTask(i); taskQueue.add(task); } } }

这里要说明一下, 在初始化的时候我们开了3个窗口, 内部的顺序应该是这样的:
当某机构的大门开了以后, 第一个办事的人进去到了第一个窗口, 第二个办事的人进去到了第二个窗口, 第三个办事的人进去到了第三个窗口, 第四个办事的人进去排队在第一位, 当第一、第二、第三个窗口中不论哪一个窗口的事办完了, 第四个人就去哪一个窗口继续办事, 第五个人等待, 一次类推。这样子就达到了队列同事并发三个任务的效果。
这就是一个普通的队列, 其它的一些特性也是基于此再次封装的, 那么下面我就基于此再把人物的优先级加上, 也就是我们上面说的特殊窗口-> 军人优先!
优先级队列 我们排队等待办事的时候, 来了一个办事的人, 那么如何判断这个办事人是否可以优先办理呢? 那就要判断它是否具有优先的权限甚至他可以优先到什么程度。
所以我们需要让这个Task有一标志, 那就是优先级, 所以我用一个枚举类标记优先级:
public enum Priority { LOW, // 最低。 DEFAULT, // 默认级别。 HIGH, // 高于默认级别。 Immediately // 立刻执行。 }

这里我把分了四个等级: 最低默认立刻, 这个等级肯定要给到我们的办事的人, 也就是Task
public interface ITask {void run(); void setPriority(Priority priority); Priority getPriority(); }

可以设置优先级和可以拿到优先级。
下面我们要把上面的LinkedBlockingQueue替换成PriorityBlockingQueue< E> , 因为它可以自动做到优先级的比较, 它要求泛型< E> , 也就是我们的Task必须实现Comparable< E> 接口, 而Comparable< E> 有一个compareTo(E)方法可以对两个< T> 做比较, 因此我们的队列需要改一下实现的方法:
// 某机构。 public class TaskQueue {// 某机构排的队, 队里面是办事的人。 private BlockingQueue< ITask> mTaskQueue; // 好多窗口。 private TaskExecutor[] mTaskExecutors; // 在开发者new队列的时候, 要指定窗口数量。 public TaskQueue(int size) { mTaskQueue = new PriorityBlockingQueue< > (); mTaskExecutors = new TaskExecutor[size]; }...

然后ITask接口继承Comparable< E> 接口:
public interface ITask extends Comparable< ITask> {void run(); void setPriority(Priority priority); Priority getPriority(); }

因为有setPriority(Priority)方法和getPriority()方法和Comparable< E> compareTo(E)方法, 所以我们的每一个Task都需要实现这几个方法, 这样就会很麻烦, 所以我们封装一个BasicTask:
public abstract class BasicTask implements ITask {// 默认优先级。 private Priority priority = Priority.DEFAULT; @ Override public void setPriority(Priority priority) { this.priority = priority; }@ Override public Priority getPriority() { return priority; }// 做优先级比较。 @ Override public int compareTo(ITask another) { final Priority me = this.getPriority(); final Priority it = another.getPriority(); return me = = it ? [...] : it.ordinal() - me.ordinal(); } }

其它都好说, 我们看到compareTo(E)方法就不太理解了, 这里说一下这个方法:
compareTo(E)中传进来的E是另一个Task, 如果当前Task比另一个Task更靠前就返回负数, 如果比另一个Task靠后, 那就返回正数, 如果优先级相等, 那就返回0。
这里要特别注意, 我们看到上面当两个Task优先级不一样的时候调用了Priority.orinal()方法, 并有后面的orinal减去了当前的orinal, 怎么理解呢? 首先要理解Priority.orinal()方法, 在Java中每一个枚举值都有这个方法, 这个枚举的值是它的下标+ 1, 也就是[index + 1], 所以我们写的Priority类其实可以这样理解:
public enum Priority { 1, 2, 3, 4 }

继续, 如果给当前Task比较低, 给compareTo(E)中的Task设置的优先级别比较高, 那么Priority不一样, 那么返回的值就是整数, 因此当前Task就会被PriorityBlockingQueue< E> 排到后面, 如果调换那么返回结果也就调换了。
但是我们注意到me = = it ? [...] : it.ordinal() - me.ordinal(); 中的[...]是什么鬼啊? 因为这里缺一段代码呀哈哈哈( 这个作者怎么傻乎乎的) , 这一段代码的意思是当优先级别一样的时候怎么办, 那就是谁先加入队列谁排到前面呗, 那么怎样返回值呢, 我们怎么知道哪个Task先加入队列呢? 这个时候可爱的我就出现了, 我给它给一个序列标记它什么时候加入队列的不久完事了, 于是我们可以修改下ITask接口, 增加两个方法:
public interface ITask extends Comparable< ITask> {void run(); void setPriority(Priority priority); Priority getPriority(); void setSequence(int sequence); int getSequence(); }

我们用setSequence(int)标记它加入队列的顺序, 然后因为setSequence(int)getSequence()是所有Task都需要实现的, 所以我们在BasicTask中实现这两个方法:
public abstract class BasicTask implements ITask {// 默认优先级。 private Priority priority = Priority.DEFAULT; private int sequence; @ Override public void setPriority(Priority priority) { this.priority = priority; }@ Override public Priority getPriority() { return priority; }@ Override public void setSequence(int sequence) { this.sequence = sequence; }@ Override public int getSequence() { return sequence; }// 做优先级比较。 @ Override public int compareTo(ITask another) { final Priority me = this.getPriority(); final Priority it = another.getPriority(); return me = = it ?this.getSequence() - another.getSequence() : it.ordinal() - me.ordinal(); } }

看到了吧, 刚才的[...]已经变成了this.getSequence() - another.getSequence(), 这里需要和上面的it.ordinal() - me.ordinal(); 的逻辑对应, 上面说到如果给当前Task比较低, 给compareTo(E)中的Task设置的优先级别比较高, 那么Priority不一样, 那么返回的值就是整数, 因此当前Task就会被PriorityBlockingQueue< E> 排到后面, 如果调换那么返回结果也就调换了。
这里的逻辑和上面对应就是和上面的逻辑相反, 因为这里是当两个优先级一样时的返回, 上面是两个优先级不一样时的返回, 所以当优先级别一样时, 返回负数表示当前Task在前, 返回正数表示当前Task在后, 正好上面上的逻辑对应。
接下来就是给Task设置序列了, 于是我们在TaskQueue中的T void add(T)方法做个手脚:
public class TaskQueue {private AtomicInteger mAtomicInteger = new AtomicInteger(); ...public TaskQueue(int size) { ... }public void start() { ... }public void stop() { ... }public < T extends ITask> int add(T task) { if (!mTaskQueue.contains(task)) { task.setSequence(mAtomicInteger.incrementAndGet()); // 注意这行。 mTaskQueue.add(task); } return mTaskQueue.size(); } }

这里我们使用了AtomicInteger类, 它的incrementAndGet()方法会每次递增1, 其实它相当于:
mAtomicInteger.addAndGet(1);

其它具体用法请自行搜索, 这里不再赘述。
到此为止, 我们的优先级别的队列就实现完毕了, 我们来做下测试:
public static void main(String... args) { // 开一个窗口, 这样会让优先级更加明显。 TaskQueue taskQueue = new TaskQueue(1); taskQueue.start(); //// 某机构开始工作。// 为了显示出优先级效果, 我们预添加3个在前面堵着, 让后面的优先级效果更明显。 taskQueue.add(new PrintTask(110)); taskQueue.add(new PrintTask(112)); taskQueue.add(new PrintTask(122)); for (int i = 0; i < 10; i+ + ) { // 从第0个人开始。 PrintTask task = new PrintTask(i); if (1 = = i) { task.setPriority(Priority.LOW); // 让第2个进入的人最后办事。 } else if (8 = = i) { task.setPriority(Priority.HIGH); // 让第9个进入的人第二个办事。 } else if (9 = = i) { task.setPriority(Priority.Immediately); // 让第10个进入的人第一个办事。 } // ... 其它进入的人, 按照进入顺序办事。 taskQueue.add(task); }

没错这就是我们看到的效果:
我的id是: 9 我的id是: 8 我的id是: 110 我的id是: 112 我的id是: 122 我的id是: 0 我的id是: 2 我的id是: 3 我的id是: 4 我的id是: 5 我的id是: 6 我的id是: 7 我的id是: 1

到这里就结束啦, 本文源码下载地址:
http://download.csdn.net/detail/yanzhenjie1003/9841188
项目源码使用IDEA写的, 你可以直接import到你的IDEA, 或者把源码直接拷贝到Eclipse或者AndroidStudio。
【Java/Android中的优先级任务队列的实践】版权声明: 转载必须注明本文转自严振杰的博客: http://blog.yanzhenjie.com

    推荐阅读