一、Java构建线程的方式
- 继承Thread(也是实现的Runnable接口)
- 实现Runnable
- 实现Callable
- 线程池方式(可以实现以上兼容)
- Java提供了Executors可以创建(阿里规约不允许直接创建线程池)
- 推荐手动创建线程池(new ThreadPoolExecutor的方式)
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空 闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue workQueue,//任务队列,用来储 存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程, 一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的 任务过多而不能及时处理时,我们可以定制策略来处理任务
)
三、线程池的执行流程
文章图片
为什么要先进行阻塞在去尝试创建非核心线程:
**举例解释:**饭店(线程池)-厨子(线程)-排队(阻塞)、招厨子(创建线程)?-今日客满(拒绝策略)
四、线程池的属性标识 4.1、线程池的核心属性
//ct1:1:声明当前线程池的状态,2:声明当前线程池中的线程数
//高3位:线程池状态,低29位是:线程池中的线程个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
//29,方便后面做位运算
private static final int CAPACITY= (1 << COUNT_BITS) - 1;
//通过位运算得出最大容量// runState is stored in the high-order bits
private static final int RUNNING= -1 << COUNT_BITS;
//111 代表正常接受任务
private static final int SHUTDOWN=0 << COUNT_BITS;
//000 代表不接受新任务,但还会处理内部阻塞队列中的任务,正在进行的任务也正常处理
private static final int STOP=1 << COUNT_BITS;
//001 代表线程池立马停止,不接受新任务,不处理阻塞队列中的任务,同时终端执行中的任务
private static final int TIDYING=2 << COUNT_BITS;
//010 代表过渡状态,代表当前线程池即将结束
private static final int TERMINATED =3 << COUNT_BITS;
//011 代表线程池为TREMINATED,线程池已经结束// Packing and unpacking ctl
private static int runStateOf(int c){ return c & ~CAPACITY;
}//得到线程池的状态
private static int workerCountOf(int c){ return c & CAPACITY;
}//得到当前线程池的线程数
private static int ctlOf(int rs, int wc) { return rs | wc;
}//获取ctl
4.2、线程池的状态变化
【后端开发|ThreadPoolExecutor源码剖析】
文章图片
五、线程池的exectute方法 从execute方法开始(对应4.2的线程变化图):
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task.The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread.If it fails, we know we are shut down or saturated
* and so reject the task.
*/
//拿到32位的int
int c = ctl.get();
//工作线程数 < 核心线程数
if (workerCountOf(c) < corePoolSize) {
//进入if,代表可以创建 核心 线程数
if (addWorker(command, true))
//在此结束
return;
//如果没进入if,代表创建核心线程数失败,重新获取 ctl
c = ctl.get();
}
//判断线程池为Running状态,将任务添加入阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
//在此获取ctl
int recheck = ctl.get();
//再次判断是否为Running状态,若不是Running状态,remove任务
if (! isRunning(recheck) && remove(command))
reject(command);
//如果线程池在Running状态,线程池数量为0
else if (workerCountOf(recheck) == 0)
//阻塞队列有任务,但是没有工作线程,添加一个任务为空的工作线程处理阻塞队列中的任务
addWorker(null, false);
}
//阻塞队列已满,创建非核心线程,拒绝策略
else if (!addWorker(command, false))
reject(command);
}
通过上述源码,线程池的执行流程,同4.2,以下将进行addWorker方法
六、addWorker方法剖析
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;
;
) {
//获取ctl
int c = ctl.get();
//获取线程池状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//除了RUNNING的其他状态
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty())
//rs==SHUTDOWN,如果不是SHUTDOWN,就代表是STOP或者更高的状态,此时不需要添加线程处理任务
//任务为空,,如果任务为null,并且线程池状态不是RUNNING,不需要处理
//阻塞队列不为null,如果阻塞队列为空,返回flase,取反,获取true,不需要处理
)
//创建工作线程失败
return false;
for (;
;
) {
//获取工作线程个数
int wc = workerCountOf(c);
if (wc >= CAPACITY ||//如果当前线程大于线程池最大容量,不创建线程
//判断wc是否超过核心线程或者最大线程数
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//将工作线程数+1,采用CAS的方式
if (compareAndIncrementWorkerCount(c))
//成功退出外侧for循环
break retry;
c = ctl.get();
// Re-read ctl
//判断线程池状态,如果有变化;如果没变化,走内侧循环
if (runStateOf(c) != rs)
//continue外侧循环
continue retry;
// else CAS failed due to workerCount change;
retry inner loop
}
}boolean workerStarted = false;
boolean workerAdded = false;
//Worker为工作线程
Worker w = null;
try {
w = new Worker(firstTask);
//从Worker中获取线程t
final Thread t = w.thread;
if (t != null) {
//获取线程池的全局锁,避免我添加任务时,其他线程干掉了线程池,干掉线程池需要先获取这个锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//线程状态为RUNNING状态
if (rs < SHUTDOWN ||
//是SHUTDOWN状态,创建空任务工作线程,先处理阻塞队列中的任务
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将工作线程添加到集合(hashSet)中
workers.add(w);
//获取工作线程个数
int s = workers.size();
//如果线程工作线程数,大于之前记录的最大线程数,做替换操作
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
//添加工作线程成功
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
//启动工作线程
workerStarted = true;
//启动工作线程成功
}
}
} finally {
//如果启动工作线程失败,调用addWorkerFailed方法
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
//返回工作线程启动
}
七、Worker的封装
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in.Null if factory fails. */
final Thread thread;
/** Initial task to run.Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1);
// inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}/** Delegates main run loop to outer runWorker*/
public void run() {
runWorker(this);
}
runWorker方法:
final void runWorker(Worker w) {
//获取当前线程
Thread wt = Thread.currentThread();
//拿到任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
// allow interrupts
boolean completedAbruptly = true;
try {
//任务不为空,执行任务;如果任务为空,通过getTask从阻塞队列中获取任务!
while (task != null || (task = getTask()) != null) {
w.lock();
//枷锁,避免shutdown,任务也不会中单
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted.This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//执行任务前的操作
beforeExecute(wt, task);
Throwable thrown = null;
try {
//开始执行任务
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
//执行任务后的操作
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
可以继续深入getTask方法,以及processWorkerExit方法
未完待续
推荐阅读
- net|net java python_TIOBE 6 月编程语言排行榜(Python 势不可挡,或在四年之内超越 Java、C--中享思途...)
- java|java vb 速度_TIOBE 2月编程语言排行榜(JAVA第一,VB连续上涨)
- 笔记|Java实习面试题
- 读书笔记|读书笔记 之《软件架构设计: 大型网站技术架构与业务架构融合之道》
- java|Java 中的超快微服务( 当Microstream遇上Open Liberty)
- 程序员|Java开发面试题!java基础知识梳理
- 系统安全|AppScan检查到的一些中高危漏洞解决方案
- Java桥接方法
- 前端|JAVA WEB技术