Java-常见的线程池

一、常见的线程池 FixedThreadPool、CachedThreadPool、ScheduledThreadPool、SingleThreadExecutor
这些常见的线程池,基本都是通过Executors中对应的new方法进行创建的。
1.FixedThreadPool
核心线程数固定,没有非核心线程,LinkedBlockingQueue 无界的Queue

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory); }

2.CachedThreadPool
只有非核心线程,60s的空闲线程保活,SynchronousQueue,直接提交给线程运行。OkHttp中的线程池就是用的CachedThreadPool,核心线程数是0,最大线程数是Integer.MAX_VALUE,使用SynchronousQueue队列,SynchronousQueue队列是0容量的阻塞队列,那么就会直接交给线程执行
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), threadFactory); }

3.ScheduledThreadPool
有延迟执行和周期重复执行的线程池,new DelayedQueue ()
DelayedQueue : 优先队列 PriorityQueue 存储元素
public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }

// ScheduledThreadPoolExecutor.java public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory); }

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

ScheduledThreadPool实现周期性原理
public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0L) throw new IllegalArgumentException(); ScheduledFutureTask sft = new ScheduledFutureTask(command, null, triggerTime(initialDelay, unit), -unit.toNanos(delay), sequencer.getAndIncrement()); RunnableScheduledFuture t = decorateTask(command, sft); sft.outerTask = t; // 将任务添加到DelayWorkQueue队列中 delayedExecute(t); return t; }

获取DelayWorkQueue队列中的任务
public RunnableScheduledFuture take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (; ; ) { RunnableScheduledFuture first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0L) return finishPoll(first); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }

执行任务
public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) super.run(); else if (super.runAndReset()) { // 判断是否需要周期性执行,如果是 // 则重置下一次要执行的时间 setNextRunTime(); reExecutePeriodic(outerTask); } }private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); }

放回到队列中
public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture e = (RunnableScheduledFuture)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); } if (queue[0] == e) { leader = null; available.signal(); } } finally { lock.unlock(); } return true; }

主要就是通过修改ScheduledFutureTask的time值,而time是一个volatile修饰,time的值的修改则是通过优先级period的值来判断对time加上还是减去period值
4.SingleThreadExecutor
LinkedBlockingQueue
一个核心线程,不需要处理同步
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory)); }

二、饱和策略 【Java-常见的线程池】RejectedExecutionHandler 饱和策略
(1)DiscardOldestPolicy:直接丢弃最老的那个任务,执行当前任务。如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程) 意思就是在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心。
(2)AbortPolicy:直接抛出异常,默认的使用,即拒绝执行任务RejectedExecutionException。
(3)CallerRunsPolicy:让调用者线程去执行任务,即谁往线程池中提交任务,就由谁来执行这个任务。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度
(4)DiscardPolicy:把最新提交的任务丢弃
如果这这四种拒绝策略都不满足,则自己实现RejectedExecutionHandler接口,自己定义一个拒绝策略。

    推荐阅读