大家好,今天我们来聊一个比较实用的话题,动态可监控的线程池实践,开源项目<
写在前面 稍微有些Java编程经验的小伙伴都知道,Java的精髓在juc包,这是大名鼎鼎的Doug Lea老爷子的杰作,评价一个程序员Java水平怎么样,一定程度上看他对juc包下的一些技术掌握的怎么样,这也是面试中的基本上必问的一些技术点之一。
juc包主要包括:
1.原子类(AtomicXXX)
多线程编程场景下,这些类都是必备技能,会这些可以帮助我们写出高质量、高性能、少bug的代码,同时这些也是Java中比较难啃的一些技术,需要持之以恒,学以致用,在使用中感受他们带来的奥妙。
2.锁类(XXXLock)
3.线程同步类(AQS、CountDownLatch、CyclicBarrier、Semaphore、Exchanger)
4.任务执行器类(Executor体系类,包括今天的主角ThreadPoolExecutor)
5.并发集合类(ConcurrentXXX、CopyOnWriteXXX)相关集合类
6.阻塞队列类(BlockingQueue继承体系类)
7.Future相关类
8.其他一些辅助工具类
上边简单罗列了下juc包下功能分类,这篇文章我们主要来介绍动态可监控线程池的,所以具体内容也就不展开讲了,以后有时间单独来聊吧。看这篇文章前,希望读者最好有一定的线程池ThreadPoolExecutor使用经验,不然看起来会有点懵。
如果你对ThreadPoolExecutor不是很熟悉,推荐阅读下面两篇文章
https://www.javadoop.com/post/java-thread-pool
文章图片
https://www.javadoop.com/post/java-thread-pool
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
文章图片
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
背景 使用ThreadPoolExecutor过程中你是否有以下痛点呢?
1.代码中创建了一个ThreadPoolExecutor,但是不知道那几个核心参数设置多少比较合适
如果你有以上痛点,这篇文章要介绍的动态可监控线程池(DynamicTp)或许能帮助到你。
2.凭经验设置参数值,上线后发现需要调整,改代码重启服务,非常麻烦
3.线程池相对开发人员来说是个黑盒,运行情况不能感知到,直到出现问题
如果看过ThreadPoolExecutor的源码,大概可以知道其实它有提供一些set方法,可以在运行时动态去修改相应的值,这些方法有:
public void setCorePoolSize(int corePoolSize);
public void setMaximumPoolSize(int maximumPoolSize);
public void setKeepAliveTime(long time, TimeUnit unit);
public void setThreadFactory(ThreadFactory threadFactory);
public void setRejectedExecutionHandler(RejectedExecutionHandler handler);
现在大多数的互联网项目其实都会微服务化部署,有一套自己的服务治理体系,微服务组件中的分布式配置中心扮演的就是动态修改配置,实时生效的角色。那么我们是否可以结合配置中心来做运行时线程池参数的动态调整呢?答案是肯定的,而且配置中心相对都是高可用的,使用它也不用过于担心配置推送出现问题这类事儿,而且也能减少研发动态线程池组件的难度和工作量。
综上,我们总结出以下的背景
简介 我们基于配置中心对线程池ThreadPoolExecutor做一些扩展,实现对运行中线程池参数的动态修改,实时生效;以及实时监控线程池的运行状态,触发设置的报警策略时报警,报警信息会推送办公平台(钉钉、企微等)。报警维度包括(队列容量、线程池活性、拒绝触发等);同时也会定时采集线程池指标数据供监控平台可视化使用。使我们能时刻感知到线程池的负载,根据情况及时调整,避免出现问题影响线上业务。
|__ \(_) |____|
| || |__ _ ____ _ _ __ _______| |_ __
| || | | | | '_ \ / _` | '_ ` _ \| |/ __| | '_ \
| |__| | |_| | | | | (_| | | | | | | | (__| | |_) |
|_____/ \__, |_| |_|\__,_|_| |_| |_|_|\___|_| .__/
__/ || |
|___/|_|
:: Dynamic Thread Pool ::
特性
架构设计 主要分四大模块
1.监听特定配置中心的指定配置文件(默认实现Nacos、Apollo),可通过内部提供的SPI接口扩展其他实现
2.解析配置文件内容,内置实现yml、properties配置文件的解析,可通过内部提供的SPI接口扩展其他实现
3.通知线程池管理模块实现刷新
1.服务启动时从配置中心拉取配置信息,生成线程池实例注册到内部线程池注册中心中
2.监听模块监听到配置变更时,将变更信息传递给管理模块,实现线程池参数的刷新
3.代码中通过getExecutor()方法根据线程池名称来获取线程池对象实例
实现监控指标采集以及输出,默认提供以下三种方式,也可通过内部提供的SPI接口扩展其他实现
1.默认实现Json log输出到磁盘
2.MicroMeter采集,引入MicroMeter相关依赖
3.暴雷Endpoint端点,可通过http方式访问
对接办公平台,实现通告告警功能,默认实现钉钉、企微,可通过内部提供的SPI接口扩展其他实现,通知告警类型如下
1.线程池参数变更通知
2.阻塞队列容量达到设置阈值告警
3.线程池活性达到设置阈值告警
4.触发拒绝策略告警
文章图片
使用
spring:
dynamic:
tp:
enabled: true
enabledBanner: true# 是否开启banner打印,默认true
enabledCollect: false# 是否开启监控指标采集,默认false
collectorType: logging# 监控数据采集器类型(JsonLog | MicroMeter),默认logging
logPath: /home# 监控日志数据路径,默认${user.home}/logs
monitorInterval: 5# 监控时间间隔(报警判断、指标采集),默认5s
nacos:# nacos配置,不配置有默认值(规则name-dev.yml这样)
dataId: dynamic-tp-demo-dev.yml
group: DEFAULT_GROUP
apollo:# apollo配置,不配置默认拿apollo配置第一个namespace
namespace: dynamic-tp-demo-dev.yml
configType: yml# 配置文件类型
platforms:# 通知报警平台配置
- platform: wechat
urlKey: 3a7500-1287-4bd-a798-c5c3d8b69c# 替换
receivers: test1,test2# 接受人企微名称
- platform: ding
urlKey: f80dad441fcd655438f4a08dcd6a# 替换
secret: SECb5441fa6f375d5b9d21# 替换,非sign模式可以没有此值
receivers: 15810119805# 钉钉账号手机号
executors:# 动态线程池配置
- threadPoolName: dynamic-tp-test-1
corePoolSize: 6
maximumPoolSize: 8
queueCapacity: 200
queueType: VariableLinkedBlockingQueue# 任务队列,查看源码QueueTypeEnum枚举类
rejectedHandlerType: CallerRunsPolicy# 拒绝策略,查看RejectedTypeEnum枚举类
keepAliveTime: 50
allowCoreThreadTimeOut: false
threadNamePrefix: test# 线程名前缀
notifyItems:# 报警项,不配置自动会配置(变更通知、容量报警、活性报警、拒绝报警)
- type: capacity# 报警项类型,查看源码 NotifyTypeEnum枚举类
enabled: true
threshold: 80# 报警阈值
platforms: [ding,wechat]# 可选配置,不配置默认拿上层platforms配置的所以平台
interval: 120# 报警间隔(单位:s)
- type: change
enabled: true
- type: liveness
enabled: true
threshold: 80
- type: reject
enabled: true
threshold: 1
@Configuration
public class DtpConfig {@Bean
public DtpExecutor demo1Executor() {
return DtpCreator.createDynamicFast("demo1-executor");
}@Bean
public ThreadPoolExecutor demo2Executor() {
return ThreadPoolBuilder.newBuilder()
.threadPoolName("demo2-executor")
.corePoolSize(8)
.maximumPoolSize(16)
.keepAliveTime(50)
.allowCoreThreadTimeOut(true)
.workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false)
.rejectedExecutionHandler(RejectedTypeEnum.CALLER_RUNS_POLICY.getName())
.buildDynamic();
}
}
public static void main(String[] args) {
DtpExecutor dtpExecutor = DtpRegistry.getExecutor("dynamic-tp-test-1");
dtpExecutor.execute(() -> System.out.println("test"));
}
注意事项
能和LinkedBlockingQueue相似,只是capacity不是final类型,可以修改,
VariableLinkedBlockingQueue参考RabbitMq的实现
|__ \(_) |____|
| || |__ _ ____ _ _ __ _______| |_ __
| || | | | | '_ \ / _` | '_ ` _ \| |/ __| | '_ \
| |__| | |_| | | | | (_| | | | | | | | (__| | |_) |
|_____/ \__, |_| |_|\__,_|_| |_| |_|_|\___|_| .__/
__/ || |
|___/|_|
:: Dynamic Thread Pool :: DynamicTp register, executor: DtpMainPropWrapper(dtpName=dynamic-tp-test-1, corePoolSize=6, maxPoolSize=8, keepAliveTime=50, queueType=VariableLinkedBlockingQueue, queueCapacity=200, rejectType=RejectedCountableCallerRunsPolicy, allowCoreThreadTimeOut=false)
DynamicTp [dynamic-tp-test-1] refresh end, changed keys: [corePoolSize, queueCapacity], corePoolSize: [6 => 4], maxPoolSize: [8 => 8], queueType: [VariableLinkedBlockingQueue => VariableLinkedBlockingQueue], queueCapacity: [200 => 2000], keepAliveTime: [50s => 50s], rejectedType: [CallerRunsPolicy => CallerRunsPolicy], allowsCoreThreadTimeOut: [false => false]
报警 触发报警阈值会推送相应报警消息(活性、容量、拒绝),且会高亮显示相应字段
文章图片
配置变更会推送通知消息,且会高亮变更的字段
文章图片
监控日志 通过collectType属性配置监控指标采集类型,默认 logging
项目地址 gitee地址:https://gitee.com/yanhom/dynamic-tp
Prometheus,InfluxDb...)
namictp/${appName}.monitor.log
github地址:https://github.com/lyh200/dynamic-tp
联系我 对项目有什么想法或者建议,可以加我微信交流,或者创建issues,一起完善项目
vx:yanhom1314
【java|基于配置中心的轻量级动态可监控线程池 - DynamicTp】公众号:CodeFox
推荐阅读
- java|java gc日志乱码_让 Bug 无处藏身,Java 线上问题排查思路、常用工具
- spring|java性能优化,产品的 QPS 翻倍了
- java|线程池如何观测(这个方案让你对线程池的运行情况了如指掌)
- 定位|Spring Boot 接口频繁超时,Alibaba 开源 Arthas 精准定位 BUG 问题!
- 源程序|SpringBoot框架入门(二)
- 一夜成名的航班追踪网站,什么来头()
- #|【JAVASE开发】带你零基础学JAVA项目(二嗨租车项目篇)
- 【算法合集】|【算法合集】学习算法第二天(二分与排序篇)
- SpringCloud|SpringCloud (六) ——Gateway服务网关