线程池的使用

线程池ThreadPoolExecutor的使用 标签(空格分隔): ThreadPoolExecutor
一 创建ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler){}

参数说明:
  1. corePoolSize,核心线程数,池中所保存的最少线程个数。
  2. maximumPoolSize,池中允许的最大线程数,核心线程数+非核心线程数。
  3. keepAliveTime,当池中总线程数大于核心时,此为终止多余的空闲线程 等待新任务的最长时间。
  4. unit,keepAliveTime参数的时间单位,有DAYS、HOURS、 MINUTES、SECONDS、MILLISECONDS、MICROSECONDS、NANOSECONDS。常用MILLISECONDS。
  5. workQueue,执行前用于保持任务的队列。
  6. threadFactory,执行程序创建新线程时使用的工厂,默认使用Executors.defaultThreadFactory()。
  7. handler,由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序(策略),详情见ThreadPoolExecutor的内部静态类(一共有4个,都实现了RejectedExecutionHandler接口),默认使用AbortPolicy(用于被拒绝任务的处理程序,它将抛出 RejectedExecutionException)。
二 提交任务
public Future submit(Runnable task){}public Future submit(Callable task){}public void execute(Runnable command){}

说明:
  1. 常见有3种提交任务方式,其中submit是继承是基类AbstractExecutorService(类如其名,是抽样类),execute方法是其自己实现,查看源码可知submit底层还是调用execute方法。
  2. execute和submit方法的区别,二者的入参类型明显不同,而且execute没有返回值,submit是带有返回值的;
三 基本知识点
  1. 使用完线程池后,corePoolSize会保持不变,在核心线程创建之后的非核心线程会在keepAliveTime之后失效。非核心线程就像任务比较多的时候临时招进来的临时工,当活干完就不再需要。
@Test public void corePoolSizeIsKeep(){ ThreadPoolExecutor executor = new ThreadPoolExecutor(6, 10, 5L, TimeUnit.SECONDS, new SynchronousQueue<>()); submit9(executor); watch(executor); }

运行结果见下图:

线程池的使用
文章图片
corePoolSize不会变化.png
  1. 当使用无界的任务队列创建线程池时,即便提交的任务数超过了核心线程数,也不会再开辟新的非核心线程。
@Test public void meaninglessMaximunPoolSize(){ ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); submit9(executor); watch(executor); }

运行结果见下图:

线程池的使用
文章图片
无意义的maximumPoolSize.png
  1. 当使用了有界的任务队列创建线程,并且提交的任务数超过核心线程数加上任务队列,但不超过最大线程数+任务队列大小时,会开辟新的非核心线程。
@Test public void maximunPoolSize(){ ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 4, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2)); submit6(executor); watch(executor); }

运行结果见下图:

线程池的使用
文章图片
maximumPoolSize.png
  1. 当使用了有界的任务队列创建线程,并且提交的任务数超过最大线程数+任务队列大小时,会启用拒绝策略AbortPolicy并抛出异常。
@Test public void abortPolicyReject(){ ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1)); submit6(executor); executor.submit(new SleepChild()); watch(executor); }----------java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@3f8f9dd6 rejected from java.util.concurrent.ThreadPoolExecutor@aec6354[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) at com.taobao.juc.ExecutorsTest.abortPolicyReject(ExecutorsTest.java:123)

  1. ThreadPoolExecutor.CallerRunsPolicy策略的解决方法是提交任务的线程不再提交任务,而是帮助执行任务。
@Test public void CallerRunsPolicy(){ ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1), new CallerRunsPolicy()); submit6(executor); executor.submit(new SleepChild()); watch(executor); }

【线程池的使用】运行结果如下:

线程池的使用
文章图片
CallerRunsPolicy策略.png 四 附录
  1. SleepChild的源码
public class SleepChild implements Runnable { @Override public void run() { try { Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + " run."); } catch (InterruptedException e) { e.printStackTrace(); } } }

  1. 其他源码
private void submit6(ThreadPoolExecutor executor){ executor.submit(new SleepChild()); executor.submit(new SleepChild()); executor.submit(new SleepChild()); System.out.println("===================first===================="); System.out.println("core pool size: " + executor.getCorePoolSize() + ", pool size: " + executor.getPoolSize() + ", work queue size: " + executor.getQueue().size()); executor.submit(new SleepChild()); executor.submit(new SleepChild()); executor.submit(new SleepChild()); System.out.println("===================second===================="); System.out.println("core pool size: " + executor.getCorePoolSize() + ", pool size: " + executor.getPoolSize() + ", work queue size: " + executor.getQueue().size()); } ----------private void submit9(ThreadPoolExecutor executor){ executor.submit(new SleepChild()); executor.submit(new SleepChild()); executor.submit(new SleepChild()); System.out.println("===================first===================="); System.out.println("core pool size: " + executor.getCorePoolSize() + ", pool size: " + executor.getPoolSize() + ", work queue size: " + executor.getQueue().size()); executor.submit(new SleepChild()); executor.submit(new SleepChild()); executor.submit(new SleepChild()); System.out.println("===================second===================="); System.out.println("core pool size: " + executor.getCorePoolSize() + ", pool size: " + executor.getPoolSize() + ", work queue size: " + executor.getQueue().size()); executor.submit(new SleepChild()); executor.submit(new SleepChild()); executor.submit(new SleepChild()); System.out.println("===================third===================="); System.out.println("core pool size: " + executor.getCorePoolSize() + ", pool size: " + executor.getPoolSize() + ", work queue size: " + executor.getQueue().size()); } -------------private void watch(ThreadPoolExecutor executor){ //主线程检测线程池的状态 while(true){ try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("watch, core pool size: " + executor.getCorePoolSize() + ", pool size: " + executor.getPoolSize() + ", work queue size: " + executor.getQueue().size()); } }

    推荐阅读