后端开发|「后端开发」系列——Java线程池

Java线程池 ?线程池是用来管理线程生命周期的一个对象池,通过使用线程池,可以让开发人员不过于关注线程的创建、销毁等过程。并且通过使用线程池,合理的设置核心线程数,最大线程池等参数,可以提高系统的性能,避免出现一些例如OOM的问题。
后端开发|「后端开发」系列——Java线程池
文章图片

以上就是主线程中提交任务后的执行顺序。
首先看核心线程池是否已满,如果已经满了,就把任务放入到阻塞队列中,否则就创建新的线程,执行任务。
然后看阻塞队列是否已满,如果未满,就将任务放入阻塞队列,如果已满,就放入到最大线程池中。
再看最大线程池是否已满,如果已满就执行拒绝策略。未满就在最大线程池中创建线程执行任务。
在这个任务提交过程中深入考虑可能会遇到这几个问题。

  • 主线程是通过execute来提交任务的吗?还有其他方式吗?
  • 核心线程池中有存放线程对象吗?还是每次有任务到核心线程池的时候,再去创建线程?
  • 任务放入到阻塞队列之后,线程池是在什么时机去获取的任务?
  • 任务流转到了最大线程池这里,最大线程池会自己创建线程执行任务?如果这样,中间为什么要有一个阻塞队列的步骤?
下面我们就一一看一下这几个问题。
1. 主线程是通过execute来提交任务的吗?还有其他方式吗?
线程池可以通过execute,submit来提交任务。
但是最终都是通过调用execute来执行提交任务。
  • 当核心线程数还未达到corePoolSize的时候,会尝试新建一个线程来执行这个任务。调用addWorker方法会自动检查运行状态和工作者数量,确保任务交给新线程出错时会有异常返回。
  • 如果一个任务可以正常排队等待被执行,我们依旧需要二次检查我们是否可以新建一个线程(因为存在的一个线程可能在最后一次检查之后死掉了)或者线程池是否在进入这个方法之前已经关掉了。所以我们需要再一次检查在任务入队停止的时候是否需要回滚,或者新建一个线程。
  • 如果任务不能排队,我们试图创建一个新的线程,如果这也失败了,我们要知道线程池可能被关掉了,或者线程池饱和了,这个时候就需要执行拒绝策略。
//接口的实现 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * 1. If fewer than corePoolSize threads are running, try to start a new thread with the given command as its first task.The call to addWorker atomically checks runState and workerCount, and so prevents false alarms that would add threads when it shouldn't, by returning false. * 2. If a task can be successfully queued, then we still need to double-check whether we should have added a thread (because existing ones died since last checking) or that the pool shut down since entry into this method. So we recheck state and if necessary roll back the enqueuing if stopped, or start a new thread if there are none. * 3. If we cannot queue task, then we try to add a new thread.If it fails, we know we are shut down or saturated and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }

2. 核心线程池中有存放线程对象吗?还是每次有任务到核心线程池的时候,再去创建线程?
核心线程池的大小是通过ThreadPollExecutor的构造函数来指定的,一个新创建的线程池是没有任何线程的。线程会在执行execute的时候创建线程。而且核心线程池如果不设置线程的过期时间,在第一次创建线程之后,线程是不会被回收的。
另一种方法是调用prestartAllCoreThreads方法来提前创建并启动所有线程。
其中不管使用那种方式来创建线程,最终都是通过addWorker方法来实现的。
/** * addWorker这个方法有四种传参方式,每一种对应不同的用法。 * firstTask != null core == true *这种参数说明这个是在核心线程池中创建线程个去执行任务 * firstTask != null core == false *这种参数说明这个是在最大线程池中创建线程执行任务 * firstTask == null core == true *这种参数说明这个是在核心线程池中创建一个空的线程,当有任务过来的时候可以通过这个线程来执行任务 * firstTask == null core == false *在最大线程池中创建一个空的线程 */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); // 检查线程池运行状态 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (; ; ) { // 检查线程池中运行的线程数量 int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //更新当前线程池中线程的数量 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } }boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //创建一个新的工作者线程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 二次检查线程池的状态 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // 检查当前线程的状态 throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); // 更新当前正在运行线程数的最大值 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }

3. 任务放入到阻塞队列之后,线程池是在什么时机去获取的任务?
阻塞队列中存放的是Runnable对象,这个对象是在核心线程池已经达到最大的时候放入到workQueue里边的。那么队列中的任务是在什么时候执行的呢?通过看工作者线程(Worker)我们会发现这个类实现类实现类Runnable接口,并且实现类run方法。在实现的run方法中只调用类一个方法 runWorker。就是下边这个样子。
public void run() { runWorker(this); }

因此我们只要来分析这个runWorker方法就可以了。在这个方法中主要做的就是获取可执行的任务,然后交给线程池中的线程去执行。可以看到工作者线程是一直循环获取task来执行的。这个task包括当前用户提交的任务,也包括阻塞队列中的任务。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //判断当前任务和从队列中获取到的第一个任务是否不为空,只要有一个不为空就可以执行对应的任务。 // 其中这个getTask方法就是去阻塞队列中取任务。 while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { //执行任务的run方法 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

4. 任务流转到了最大线程池这里,最大线程池会自己创建线程执行任务?如果这样,中间为什么要有一个阻塞队列的步骤?
??对于这个问题,我是这么想的,如果没有中间的这个阻塞队列,所有提交过来的任务都新建一个线程去执行,这样当任务很多的时候,我们就会创建太多的线程,系统内存吃不消,影响系统运行,可能会使系统当机。另外即使系统能够处理过来,但是任务只是在某一个时间段内提交的很多,那么过了这个时间段之后系统需要去管理每一个线程的关闭,回收等操作,依旧是一个非常消耗资源的事情。
??用一个工作车间打个比喻。比如工厂只需要20个人就可以满足平时的组装工作,然后某一天工厂接到来一个大单子,这20个人已经处理不过来了,这个时候有两种策略,一种是将任务排队,然后有工人工作完了就去取一个新的任务继续工作,直到做完,还有一种就是招临时工,多一个任务招一个临时工,这个时候工厂就需要管理这些临时工,告诉他们工作多长时间,给多少工钱,管理吃住……然后过了几天,这个大单子做完了,工厂还是只需要20个人,其他的人都不要了,工厂就需要给所有的临时工结工资,清理他们住宿的地方…这是一个很繁琐的过程,如果临时工太多,工厂出现管理上的问题,这个工厂只能GG了。
??这个例子中工厂原来的20个人就相当于核心线程,后来的临时工就是最大线程池中的线程,所以他们之间需要有一个组素队列,来接收一定范围内的任务,只有队列里边的任务已经够核心线程处理一段时间了之后,才在最大线程池里边创建新的线程。以期让系统达到最好的处理性能。
写在最后~:
?从开始开线程池的知识就开始说要写一篇博客,要写一篇博客,然鹅…过去了一个多月才写完。写完之后发现,看书的时间也就一周,写这个博客的时间用了三天。&_&也是佛了,这种拖拉的坏习惯也不知道从啥时候开始的,贼可怕
?准备写一个系列的文章,从Java线程,锁,到数据库Mysql、Redis,再到Es等等。这个过程中会有并发的情况,不过线程最大不会超过两个。毕竟我觉得两个方向的知识点换着来,偶尔换点思路会更高效一点。
Next:
?接下来看一下ThreadPoolExecutor,分析一下几种常用的线程池之间的异同。




君小黑 | 文 【原创】

参考
【后端开发|「后端开发」系列——Java线程池】由浅入深理解Java线程池及线程池的如何使用
Java线程池的使用
Java线程池
Java线程池(二)
java线程池ThreadPoolExecutor和阻塞队列BlockingQueue,Executor, ExecutorService

    推荐阅读