JDK、Dubbo中的线程池

如果某个Dubbo请求并发比较高,同时响应上由于数据库原因或者网络原因导致接口内部请求慢,则该Dubbo方法及其容易导致Dubbo里的线程池耗尽,此时消费端会收到如下异常堆栈信息

Caused by: java.util.concurrent.RejectedExecutionException: Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-192.168.112.12:8045, Pool Size: 200 (active: 200, core: 200, max: 200, largest: 200), Task: 165633 (completed: 165433), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://192.168.112.12:8045! at com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport.rejectedExecution(AbortPolicyWithReport.java:53) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656) at com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler.caught(AllChannelHandler.java:65)

报错核心信息是Thread pool is EXHAUSTED,说明是线程池耗尽了,同时注意到如下信息, Pool Size: 200 (active: 200, core: 200, max: 200, largest: 200),说明Dubbo中默认最大200个线程,此时必然已经达到了最大线程数,再创建线程时导致系统异常,那么Dubbo中默认的线程池是哪种呢,为什么会抛出这个异常?
本文以此异常为出发点,分析Dubbo里为什么会出现这种异常以及线程池配置。
瞎想的面试题 1. JDK线程池核心参数有哪些?
2. Java常见线程池有哪些,分别有什么问题?
3. Dubbo中的有哪几种线程池?
4. Dubbo中的线程池和JDK中的有什么区别?
JDK中线程池参数 了解Dubbo中的线程池时,有必要先从最基本的JDK中的线程池出发,先看下ThreadPoolExecutor的构造函数
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

【JDK、Dubbo中的线程池】先分别解释下各个参数
参数 含义
corePoolSize 核心线程数,当线程池中线程数量小于corePoolSize时,会不断创建新线程,直到数量到达corePoolSize
maximumPoolSize 线程池中容许最大的线程数
keepAliveTime 线程存活时间,如果当前线程数> corePoolSize,多余的线程会存活keepAliveTime时间,之后被回收释放
workQueue 线程池任务缓存队列
threadFactory 创建线程的工厂,通常用该该工厂来指定创建线程的名称
handler 线程池中线程到达maximumPoolSize最大线程数后,再提交任务会使用handler拒绝策略来拒绝提交
举个实际例子来说明,假设现在创建如下线程池
private static Executor statisticThreadPool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1), new ThreadFactoryBuilder().setNameFormat("test-thread-poot-%d").build(), (r, executor) -> { log.info("丢弃统计任务处理"); });


corePoolSize = 1 maximumPoolSize = 3 keepAliveTime = 60秒 workQueue = LinkedBlockingQueue<>(1) 容量为1 threadFactory设置新建线程的名称为test-thread-poot-1,test-thread-poot-2。。。。。 handler为直接拒绝策略

  1. 提交第1个任务,此时poolSize(0) < corePoolSize(1),即核心线程数没满,新建核心线程,线程池中线程数从poolSize(0)变为poolSize(1)
  2. 提交第2个任务,此时poolSize(1) = corePoolSize(1),无法新建核心线程,但是队列queue没满,往队列中提交任务,此时queueSize=1
  3. 提交第3个,此时queueSize(1) = maxQueueSize(1),无法往队列中提交任务,但是判断
    poolSize(1) < maximumPoolSize(2),没有达到最大线程数,继续创建线程,直到poolSize(2) =maximumPoolSize(2)
    4.提交第4个任务时,判断poolSize(2) =maximumPoolSize(2),已经到了线程池最大创建的线程数,再提交任务会触发拒绝策略
    5.假设任务执行完了,此时线程中poolSize(2) > corePoolSize(1),有2-1 = 1个空闲线程,这2个空闲线程在没有任务执行时,会存活keepAliveTime时间后被回收
用一张别人整理的流程图(我懒)来梳理下整体流程
JDK、Dubbo中的线程池
文章图片

至此问题1:JDK线程池核心参数有哪些? 已经得到了解答
Executors中常见线程池 JDK的Executors工具类给我们封装好了几个常用的线程池,
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }

顾名思义,固定线程数为N的线程池,参数如下
核心线程数=n 最大线程数=n 阻塞队列=LinkedBlockingQueue 存活时间=0

聪明的人已经看出来这个配置有什么问题了,其配置的队列是无界的,LinkedBlockingQueue(容量无限),而我们知道当提交任务时在核心线程数满了后会往队列里塞任务,只有队列满了,才会创建非核心线程。那么这个无界队列会满么?
所以假设程序中不小心写了个死循环,不断往该线程池中提交任务,则队列中会不断塞入任务,最终导致OOM
还是用一张图来说明
JDK、Dubbo中的线程池
文章图片

newSingleThreadExecutor
只有一个线程的线程池,和上面固定大小线程池类似,阻塞队列是无界的问题也一样存在
return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory));

newCachedThreadPool
缓存线程池,提交任务时会不断创建线程(不往队列里塞)
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }

可以看到配置如下
核心线程数=0 最大线程数=Integer.MAX_VALUE 阻塞队列=SynchronousQueue 存活时间=60秒

先介绍下SynchronousQueue
同步Queue,属于线程安全的BlockingQueue的一种,此队列设计的理念类似于"单工模式",对于每个put/offer操作,必须等待一个take/poll操作,类似于我们的现实生活中的"火把传递":一个火把传递地他人,需要2个人"触手可及"才行. 因为这种策略,最终导致队列中并没有一个真正的元素
可以简单点理解为一个容量为0,不存储任何任务的阻塞队列
按上文正常线程池流程走,核心线程数为0,直接往队列里塞,同时队列容量又为0,而线程池数量又小于Integer.MAX_VALUE,所以会不断创建核心线程来执行任务,同时当任务执行完,空闲线程会存活60秒
聪明的人又发现了,缓存线程池的队列虽然不是无界队列,但是其最大线程数是Integer.MAX_VALUE,同样存在不断创建任务导致内存溢出OOM的风险
newScheduledThreadPool
定时调度线程池,常用于定时执行任务的场景,关键点在于DelayedWorkQueue优先级队列(基于最小堆)的使用,本文不多介绍
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }

public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }

看完上述线程池介绍,想必各位对开头的问题2:Java常见线程池有哪些,分别有什么问题也有了答案,关键就在于阻塞队列的配置、最大线程数的配置。
大家使用过IDEA的阿里巴巴代码规约插件的,肯定也看到过如下提示,其不推荐使用Executors来创建线程池,现在明白原因了吧?
JDK、Dubbo中的线程池
文章图片

Dubbo中的线程池 说了这么多JDK相关的线程池知识,目的还是引出Dubbo里的线程池,先从接口入手,看下Dubbo中都有哪些线程池,默认又是什么线程池
Dubbo中获取线程池的接口org.apache.dubbo.common.threadpool.ThreadPool定义如下
@SPI("fixed") public interface ThreadPool {/** * Thread pool * * @param url URL contains thread parameter * @return thread pool */ @Adaptive({THREADPOOL_KEY}) Executor getExecutor(URL url); }

可以看到默认是fixed线程池的实现,再看SPI下线程池总共有几种扩展实现
JDK、Dubbo中的线程池
文章图片

可以看到Dubbo里有4种线程池实现,分别是FixedThreadPoolCachedThreadPoolLimitedThreadPoolEagerThreadPool
大家先有个大概的了解
  • fixed 固定大小线程池,启动时建立线程,不关闭,一直持有。(缺省)
  • cached 缓存线程池,空闲一分钟自动删除,需要时重建。
  • limited 可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。
  • eager 优先创建Worker线程池。在任务数量大于corePoolSize但是小于maximumPoolSize时,优先创建Worker来处理任务。当任务数量大于maximumPoolSize时,将任务放入阻塞队列中。阻塞队列充满时抛出RejectedExecutionException。(相比于cached:cached在任务数量超过maximumPoolSize时直接抛出异常而不是将任务放入阻塞队列)
看到这,Dubbo中的有哪几种线程池这个问题大家也有了答案了。
FixedThreadPool
从上面接口可以知道,Dubbo中默认的线程池实现是FixedThreadPool,但是从上文JDK的线程池中可以得知,JDK中的FixedThreadPool中的队列是无界队列,是不会塞满的,但是文章开头的那个Dubbo线程池异常又是怎么回事呢?是不是Dubbo对FixedThreadPool做了定制化实现?
先看下Dubbo下的FixedThreadPool实现代码
int DEFAULT_QUEUES = 0; int DEFAULT_THREADS = 200; public class FixedThreadPool implements ThreadPool {@Override public Executor getExecutor(URL url) { String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME); int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS); int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES); return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue() : (queues < 0 ? new LinkedBlockingQueue() : new LinkedBlockingQueue(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); }}

拒绝策略AbortPolicyWithReport实现如下,看到拒绝时的异常信息是不是有种似曾相识的感觉?
。。略去其他无用代码 @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { String msg = String.format("Thread pool is EXHAUSTED!" + " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: " + "%d)," + " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!", threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), url.getProtocol(), url.getIp(), url.getPort()); logger.warn(msg); //dump堆栈信息 dumpJStack(); throw new RejectedExecutionException(msg); }

参数如下
核心线程数=url中指定的线程数(默认值200) 最大线程数=url中指定的线程数(默认值200) 线程存活时间=0 拒绝策略为抛出RejectedExecutionException异常,并打印异常日志,dump堆栈信息默认情况下:使用SynchronousQueue队列 queues<0时:使用无界阻塞队列 queues>0时:使用有界阻塞队列(队列大小为queues)

看到这里,大家是不是对开头的异常信息有大概的思路了?知道为什么文章开头的线程池耗尽异常发生了?
默认情况下,Dubbo中每来一个请求,FixedThreadPool都会新建线程执行,直到200个核心线程数,如果再来一个请求,由于使用的是不存储任何任务的SynchronousQueue,会尝试创建非核心线程,但是最大线程数也是200,所以直接执行拒绝策略,抛出RejectedExecutionException,并打印异常信息,而异常信息就是文章开头中的异常信息
如果真的发生了这种情况,又该怎么处理呢?
检查系统哪里拖慢了Dubbo服务,网络IO慢?数据库慢?找到线程池中任务没执行完的原因,对症下药。
CachedThreadPool
和Java中缓存线程池类似,大家看下参数设置即可,这里不再赘述
public class CachedThreadPool implements ThreadPool { int DEFAULT_CORE_THREADS = 0; int DEFAULT_QUEUES = 0; @Override public Executor getExecutor(URL url) { String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME); int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS); int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE); int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES); int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE); return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue() : (queues < 0 ? new LinkedBlockingQueue() : new LinkedBlockingQueue(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }

LimitedThreadPool
可伸缩线程池,但池中的线程数只会增长不会收缩,和CachedThreadPool类似,但是其线程存活时间为Long.MAX_VALUE,标识线程不会被回收,只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。
public class LimitedThreadPool implements ThreadPool {@Override public Executor getExecutor(URL url) { String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME); int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS); int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS); int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES); return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue() : (queues < 0 ? new LinkedBlockingQueue() : new LinkedBlockingQueue(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); }}

EagerThreadPool
EagerThreadPool的实现就不分析了,大家对着Dubbo文档去看吧,这个并不常用
Dubbo里的线程池实现,并没有直接套用Executors类中的实现,毕竟有OOM的风险,同时详细分析了其默认FixedThreadPool的实现,从源码中找到了文章开头异常信息的发生原因。
总结 不管是JDK还是Dubbo中的线程池,了解其实现都是为了更好的了解其实现原理,这样就可以在业务中自定义合理的线程池,配置合理的线程池参数,从而避免各种OOM异常,提高程序运行效率。

    推荐阅读