大道之行,天下为公。这篇文章主要讲述Java技术指南「并发编程专题」CompletionService框架基本使用和原理探究(基础篇相关的知识,希望能为你提供帮助。
前提概要
CompletionService的介绍
- CompletionService 接口是一个独立的接口,并没有扩展ExecutorService 。 其默认实现类是ExecutorCompletionService。
- 接口CompletionService 的功能是:以异步的方式一边执行未完成的任务,一边记录、处理已完成任务的结果。从而可以将任务的执行与处理任务的执行结果分离开来。
- CompletionService就是监视着 Executor线程池执行的任务,用BlockingQueue将完成的任务的结果存储下来。
- 要不断遍历与每个任务关联的Future,然后不断去轮询,判断任务是否已经完成,功能比较繁琐。
public interface CompletionService<
V>
{
Future<
V>
submit(Callable<
V>
task);
Future<
V>
submit(Runnable task, V result);
Future<
V>
take() throws InterruptedException;
Future<
V>
poll();
Future<
V>
poll(long timeout, TimeUnit unit) throws InterruptedException;
}
方法摘要
Future submit(Callable task):
Future submit(Runnable task, V result):
Future take() throws InterruptedException
Future poll()
Future poll(long timeout, TimeUnit unit) throws InterruptedException
void eample(Executor e, Collection<
Callable<
Result>
>
solvers) throws InterruptedException {
CompletionService<
Result>
completionService = new ExecutorCompletionService<
Result>
(e);
int n = solvers.size();
List<
Future<
Result>
>
futures = new ArrayList<
Future<
Result>
>
(n);
Result result = null;
try {
//提交多个任务
for (Callable<
Result>
s : solvers)
futures.add(completionService.submit(s));
//
for (int i = 0;
i <
n;
++i) {
try {
//等待获取一个已经完成的任务
Result r = completionService.take().get();
//判断返回结果是否为空
if (r != null) {
result = r;
break;
}
} catch (ExecutionException ignore) {}
}
}
finally {
//取消所有任务
for (Future<
Result>
f : futures)
f.cancel(true);
}
if (result != null)
use(result);
}
ExecutorCompletionService的介绍
- ExecutorCompletionService内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果。
- ExecutorCompletionService实现了CompletionService,内部通过Executor以及BlockingQueue来实现接口提出的规范,ExecutorCompletionService,提交任务后,可以按任务返回结果的先后顺序来获取各任务执行后的结果,该类实现了接口CompletionService
- 指定一个Executor来执行任务,存储完成的任务的完成队列是LinkedBlockingQueue ;
- Executor由调用者传递进来,而Blocking可以使用默认的LinkedBlockingQueue,也可以由调用者传递。
ExecutorCompletionService(Executor executor):
指定了任务执行器Executor和已完成的任务队列completionQueue
ExecutorCompletionService(Executor executor, BlockingQueue<
Future>
completionQueue)
实现构造器
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<
Future<
V>
>
();
}
- 该接口定义了一系列方法:提交实现了Callable或Runnable接口的任务,并获取这些任务的结果。
- 包装后提交任务的submit()方法,该类还会将提交的任务封装成QueueingFuture,这样就可以实现FutureTask.done()方法,以便于在任务执行完毕后,将结果放入阻塞队列中。
public Future<
V>
submit(Callable<
V>
task) {
if (task == null) throw new NullPointerException();
RunnableFuture<
V>
f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
QueueingFuture为内部类:
private class QueueingFuture extends FutureTask<
Void>
{
QueueingFuture(RunnableFuture<
V>
task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task);
}
private final Future<
V>
task;
}
- 在调用take()、poll()方法时,会从阻塞队列中获取Future对象,以取得任务执行的结果。
- 【Java技术指南「并发编程专题」CompletionService框架基本使用和原理探究(基础篇】它继承自 FutureTask,并且重写了 done 方法,其方法把任务放到我们包装线程池创建的堵塞队列里面;就是当任务执行完成后,就会被放到队列里面去了。
- 调用其take() 方法,就是阻塞等待,等到的一定是能够获取的结果的future,然后再调用get()方法获取执行结果;
推荐阅读
- Java架构师-十项全能 百度网盘
- 循环神经网络LSTM RNN回归(sin曲线预测)
- Kibana系列--安装与配置
- flink sql 知其所以然(flink sql tumble window 的奇妙解析之路)
- 搭建PG数据库一些归档的配置
- CentOS7.3在终端输入字母时出现了大小写乱跳
- 智慧物流可视化,能否解决购物节后的爆仓危机()
- VS Code 翻译插件一览表
- 2021年项目实战-升级项目实战