带你快速搞定java并发库
目录
- 一、总览
- 二、Executor总览
- 三、继承结构
- 四、怎么保证只有一个线程
- 五、怎么保证时间可以定时执行
- 六、使用
- 总结
一、总览 计算机程序 = 数据 + 算法。
并发编程的一切根本原因是为了保证数据的正确性,线程的效率性。
Java并发库共分为四个大的部分,如下图
文章图片
Executor 和 future 是为了保证线程的效率性
Lock 和数据结构 是为了维持数据的一致性。
Java并发编程的时候,思考顺序为,
对自己的数据要么加锁。要么使用提供的数据结构,保证数据的安全性
调度线程的时候使用Executor更好的调度。
二、Executor总览
文章图片
Executor 提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。
相当于manager,老板让manager去执行一件任务,具体的是谁执行,什么时候执行,就不管了。
看上图的继承关系,介绍几个
文章图片
内置的线程池基本上都在这里
newScheduledThreadPool
定时执行的线程池newCachedThreadPool
缓存使用过的线程newFixedThreadPool
固定数量的线程池newWorkStealingPool
将大任务分解为小任务的线程池文章图片
三、继承结构 构造函数
包含一个定时的service
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); }static class DelegatedScheduledExecutorServiceextends DelegatedExecutorServiceimplements ScheduledExecutorService {private final ScheduledExecutorService e; DelegatedScheduledExecutorService(ScheduledExecutorService executor) {super(executor); e = executor; }
四、怎么保证只有一个线程 定时执行的时候调用这个方法,调用过程如下,注意看其中的注释,由上往下的调用顺序
public ScheduledFuture> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException(); if (delay <= 0)throw new IllegalArgumentException(); ScheduledFutureTasksft =new ScheduledFutureTask (command,null,triggerTime(initialDelay, unit),unit.toNanos(-delay)); RunnableScheduledFuture t = decorateTask(command, sft); sft.outerTask = t; //延迟执行delayedExecute(t); return t; }private void delayedExecute(RunnableScheduledFuture> task) {if (isShutdown())reject(task); else {// 加入任务队列super.getQueue().add(task); if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false); else// 确保执行ensurePrestart(); }}// 如果worker数量小于corePoolSize,创建新的线程,其他情况不处理void ensurePrestart() {int wc = workerCountOf(ctl.get()); if (wc < corePoolSize)addWorker(null, true); else if (wc == 0)addWorker(null, false); }
五、怎么保证时间可以定时执行
public ScheduledFuture> schedule(Runnable command,long delay,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException(); RunnableScheduledFuture> t = decorateTask(command,new ScheduledFutureTask(command, null,triggerTime(delay, unit))); delayedExecute(t); return t; }
在每次执行的时候会把下一次执行的时间放进任务中
private long triggerTime(long delay, TimeUnit unit) {return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); }/** * Returns the trigger time of a delayed action. */long triggerTime(long delay) {return now() +((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); }
FutureTask 定时是通过LockSupport.parkNanos(this, nanos); LockSupport.park(this);
private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (; ; ) {if (Thread.interrupted()) {removeWaiter(q); throw new InterruptedException(); }int s = state; if (s > COMPLETING) {if (q != null)q.thread = null; return s; }else if (s == COMPLETING) // cannot time out yetThread.yield(); else if (q == null)q = new WaitNode(); else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q); else if (timed) {nanos = deadline - System.nanoTime(); if (nanos <= 0L) {removeWaiter(q); return state; }//注意这里LockSupport.parkNanos(this, nanos); }else //注意这里LockSupport.park(this); }}
总结:Executor是通过将任务放在队列中,生成的futureTask。然后将生成的任务在队列中排序,将时间最近的需要出发的任务做检查。如果时间不到,就阻塞线程到下次出发时间。
注意:newSingleThreadScheduledExecutor只会有一个线程,不管你提交多少任务,这些任务会顺序执行,如果发生异常会取消下面的任务,线程池也不会关闭,注意捕捉异常
六、使用
ScheduledExecutorService single = Executors.newSingleThreadScheduledExecutor(); Runnable runnable1 = () -> {try {Thread.sleep(4000); System.out.println("11111111111111"); } catch (InterruptedException e) {e.printStackTrace(); }}; Runnable runnable2 = () -> {try {Thread.sleep(4000); System.out.println("222"); } catch (InterruptedException e) {e.printStackTrace(); }}; single.scheduleWithFixedDelay(runnable1,0,1, TimeUnit.SECONDS); single.scheduleWithFixedDelay(runnable2,0,2, TimeUnit.SECONDS);
11111111111111 222 11111111111111 222 11111111111111在项目中要注意关闭线程池
actionService = Executors.newSingleThreadScheduledExecutor(); actionService.scheduleWithFixedDelay(() -> {try {Thread.currentThread().setName("robotActionService"); Integer robotId = robotQueue.poll(); if (robotId == null) {//关闭线程池actionService.shutdown(); } else {int aiLv = robots.get(robotId); if (actionQueueMap.containsKey(aiLv)) {ActionQueue actionQueue = actionQueueMap.get(aiLv); actionQueue.doAction(robotId); }}} catch (Exception e) {//捕捉异常LOG.error("",e); }}, 1, 1, TimeUnit.SECONDS);
总结 本篇文章就到这里了,希望能给你带来帮助,也希望您能够多多关注脚本之家的更多内容!
推荐阅读
- 不废话,代码实践带你掌握|不废话,代码实践带你掌握 强缓存、协商缓存!
- 生发知识,带你深入了解
- 带你了解类型系统以及flow和typescript的基本使用
- neo4j|neo4j cql语句 快速查询手册
- 带你来看花
- 通过复盘快速成长(附模板)
- 5|5 个 PPT 常用快捷键带你从此走向高效
- jar|springboot项目打成jar包和war包,并部署(快速打包部署)
- 快速阅读作业【2/21】《阅读(革命性新定义》)
- Spring注解05|Spring注解05 @Import 给容器快速导入一个组件