springboot线程池的使用和扩展

少年意气强不羁,虎胁插翼白日飞。这篇文章主要讲述springboot线程池的使用和扩展相关的知识,希望能为你提供帮助。
欢迎访问我的GitHub
本篇概览

  • 我们常用ThreadPoolExecutor提供的线程池服务,springboot框架提供了@Async注解,帮助我们更方便的将业务逻辑提交到线程池中异步执行,今天我们就来实战体验这个线程池服务,并根据实际需要做定制化的扩展;
实战环境
  1. windowns10;
  2. jdk1.8;
  3. springboot 1.5.9.RELEASE;
  4. 开发工具:IntelliJ IDEA;
实战源码
  • 本次实战的源码可以在我的GitHub下载,地址:git@github.com:zq2599/blog_demos.git ,项目主页:https://github.com/zq2599/blog_demos
  • 这里面有多个工程,本次用到的工程为threadpooldemoserver,如下图红框所示:
    springboot线程池的使用和扩展

    文章图片
实战步骤梳理
  • 本次实战的步骤如下:
    1. 创建springboot工程;
    2. 创建Service层的接口和实现;
    3. 创建controller,开发一个http服务接口,里面会调用service层的服务;
    4. 创建线程池的配置;
    5. 将Service层的服务异步化,这样每次调用都会都被提交到线程池异步执行;
    6. 扩展ThreadPoolTaskExecutor,在提交任务到线程池的时候可以观察到当前线程池的情况;
创建springboot工程
  • 用IntelliJ IDEA创建一个springboot的web工程threadpooldemoserver,pom.xml内容如下:
    < ?xml version="1.0" encoding="UTF-8"?> < project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> < modelVersion> 4.0.0< /modelVersion> < groupId> com.bolingcavalry< /groupId> < artifactId> threadpooldemoserver< /artifactId> < version> 0.0.1-SNAPSHOT< /version> < packaging> jar< /packaging> < name> threadpooldemoserver< /name> < description> Demo project for Spring Boot< /description> < parent> < groupId> org.springframework.boot< /groupId> < artifactId> spring-boot-starter-parent< /artifactId> < version> 1.5.9.RELEASE< /version> < relativePath/> < !-- lookup parent from repository --> < /parent> < properties> < project.build.sourceEncoding> UTF-8< /project.build.sourceEncoding> < project.reporting.outputEncoding> UTF-8< /project.reporting.outputEncoding> < java.version> 1.8< /java.version> < /properties> < dependencies> < dependency> < groupId> org.springframework.boot< /groupId> < artifactId> spring-boot-starter-web< /artifactId> < /dependency> < /dependencies> < build> < plugins> < plugin> < groupId> org.springframework.boot< /groupId> < artifactId> spring-boot-maven-plugin< /artifactId> < /plugin> < /plugins> < /build> < /project>

创建Service层的接口和实现
  • 创建一个service层的接口AsyncService,如下:
    public interface AsyncService /** * 执行异步任务 */ void executeAsync();

  • 对应的AsyncServiceImpl,实现如下:
    @Service public class AsyncServiceImpl implements AsyncService private static final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class); @Override public void executeAsync() logger.info("start executeAsync"); try Thread.sleep(1000); catch(Exception e) e.printStackTrace(); logger.info("end executeAsync");

  • 这个方法做的事情很简单:sleep了一秒钟;
创建controller
  • 创建一个controller为Hello,里面定义一个http接口,做的事情是调用Service层的服务,如下:
    @RestController public class Hello private static final Logger logger = LoggerFactory.getLogger(Hello.class); @Autowired private AsyncService asyncService; @RequestMapping("/") public String submit() logger.info("start submit"); //调用service层的任务 asyncService.executeAsync(); logger.info("end submit"); return "success";

  • 至此,我们已经做好了一个http请求的服务,里面做的事情其实是同步的,接下来我们就开始配置springboot的线程池服务,将service层做的事情都提交到线程池中去处理;
springboot的线程池配置
  • 创建一个配置类ExecutorConfig,用来定义如何创建一个ThreadPoolTaskExecutor,要使用@Configuration和@EnableAsync这两个注解,表示这是个配置类,并且是线程池的配置类,如下所示:
    @Configuration @EnableAsync public class ExecutorConfig private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class); @Bean public Executor asyncServiceExecutor() logger.info("start asyncServiceExecutor"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(5); //配置最大线程数 executor.setMaxPoolSize(5); //配置队列大小 executor.setQueueCapacity(99999); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix("async-service-"); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化 executor.initialize(); return executor;

  • 注意,上面的方法名称为asyncServiceExecutor,稍后马上用到;
将Service层的服务异步化
  • 打开AsyncServiceImpl.java,在executeAsync方法上增加注解@Async(" asyncServiceExecutor" ),asyncServiceExecutor是前面ExecutorConfig.java中的方法名,表明executeAsync方法进入的线程池是asyncServiceExecutor方法创建的,如下:
    @Override @Async("asyncServiceExecutor") public void executeAsync() logger.info("start executeAsync"); try Thread.sleep(1000); catch(Exception e) e.printStackTrace(); logger.info("end executeAsync");

验证效果
  1. 将这个springboot运行起来(pom.xml所在文件夹下执行mvn spring-boot:run);
  2. 在浏览器输入:http://localhost:8080;
  3. 在浏览器用F5按钮快速多刷新几次;
  4. 在springboot的控制台看见日志如下:
    2018-01-21 22:43:18.630INFO 14824 --- [nio-8080-exec-8] c.b.t.controller.Hello: start submit 2018-01-21 22:43:18.630INFO 14824 --- [nio-8080-exec-8] c.b.t.controller.Hello: end submit 2018-01-21 22:43:18.929INFO 14824 --- [async-service-1] c.b.t.service.impl.AsyncServiceImpl: end executeAsync 2018-01-21 22:43:18.930INFO 14824 --- [async-service-1] c.b.t.service.impl.AsyncServiceImpl: start executeAsync 2018-01-21 22:43:19.005INFO 14824 --- [async-service-2] c.b.t.service.impl.AsyncServiceImpl: end executeAsync 2018-01-21 22:43:19.006INFO 14824 --- [async-service-2] c.b.t.service.impl.AsyncServiceImpl: start executeAsync 2018-01-21 22:43:19.175INFO 14824 --- [async-service-3] c.b.t.service.impl.AsyncServiceImpl: end executeAsync 2018-01-21 22:43:19.175INFO 14824 --- [async-service-3] c.b.t.service.impl.AsyncServiceImpl: start executeAsync 2018-01-21 22:43:19.326INFO 14824 --- [async-service-4] c.b.t.service.impl.AsyncServiceImpl: end executeAsync 2018-01-21 22:43:19.495INFO 14824 --- [async-service-5] c.b.t.service.impl.AsyncServiceImpl: end executeAsync 2018-01-21 22:43:19.930INFO 14824 --- [async-service-1] c.b.t.service.impl.AsyncServiceImpl: end executeAsync 2018-01-21 22:43:20.006INFO 14824 --- [async-service-2] c.b.t.service.impl.AsyncServiceImpl: end executeAsync 2018-01-21 22:43:20.191INFO 14824 --- [async-service-3] c.b.t.service.impl.AsyncServiceImpl: end executeAsync

  • 如上日志所示,我们可以看到controller的执行线程是" nio-8080-exec-8" ,这是tomcat的执行线程,而service层的日志显示线程名为“async-service-1”,显然已经在我们配置的线程池中执行了,并且每次请求中,controller的起始和结束日志都是连续打印的,表明每次请求都快速响应了,而耗时的操作都留给线程池中的线程去异步执行;
扩展ThreadPoolTaskExecutor
  • 虽然我们已经用上了线程池,但是还不清楚线程池当时的情况,有多少线程在执行,多少在队列中等待呢?这里我创建了一个ThreadPoolTaskExecutor的子类,在每次提交线程的时候都会将当前线程池的运行状况打印出来,代码如下:
    public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class); private void showThreadPoolInfo(String prefix) ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(); if(null==threadPoolExecutor) return; logger.info(", ,taskCount [], completedTaskCount [], activeCount [], queueSize []", this.getThreadNamePrefix(), prefix, threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getQueue().size()); @Override public void execute(Runnable task) showThreadPoolInfo("1. do execute"); super.execute(task); @Override public void execute(Runnable task, long startTimeout) showThreadPoolInfo("2. do execute"); super.execute(task, startTimeout); @Override public Future< ?> submit(Runnable task) showThreadPoolInfo("1. do submit"); return super.submit(task); @Override public < T> Future< T> submit(Callable< T> task) showThreadPoolInfo("2. do submit"); return super.submit(task); @Override public ListenableFuture< ?> submitListenable(Runnable task) showThreadPoolInfo("1. do submitListenable"); return super.submitListenable(task); @Override public < T> ListenableFuture< T> submitListenable(Callable< T> task) showThreadPoolInfo("2. do submitListenable"); return super.submitListenable(task);

  • 如上所示,showThreadPoolInfo方法中将任务总数、已完成数、活跃线程数,队列大小都打印出来了,然后Override了父类的execute、submit等方法,在里面调用showThreadPoolInfo方法,这样每次有任务被提交到线程池的时候,都会将当前线程池的基本情况打印到日志中;
  • 修改ExecutorConfig.java的asyncServiceExecutor方法,将ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor()改为ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(),如下所示:
    @Bean public Executor asyncServiceExecutor() logger.info("start asyncServiceExecutor"); //使用VisiableThreadPoolTaskExecutor ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(5); //配置最大线程数 executor.setMaxPoolSize(5); //配置队列大小 executor.setQueueCapacity(99999); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix("async-service-"); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化 executor.initialize(); return executor;

  • 再次启动该工程,再浏览器反复刷新http://localhost:8080,看到的日志如下:
    2018-01-21 23:04:56.113INFO 15580 --- [nio-8080-exec-1] c.b.t.e.VisiableThreadPoolTaskExecutor: async-service-, 2. do submit,taskCount [99], completedTaskCount [85], activeCount [5], queueSize [9] 2018-01-21 23:04:56.113INFO 15580 --- [nio-8080-exec-1] c.b.t.controller.Hello: end submit 2018-01-21 23:04:56.225INFO 15580 --- [async-service-1] c.b.t.service.impl.AsyncServiceImpl: end executeAsync 2018-01-21 23:04:56.225INFO 15580 --- [async-service-1] c.b.t.service.impl.AsyncServiceImpl: start executeAsync 2018-01-21 23:04:56.240INFO 15580 --- [nio-8080-exec-2] c.b.t.controller.Hello: start submit 2018-01-21 23:04:56.240INFO 15580 --- [nio-8080-exec-2] c.b.t.e.VisiableThreadPoolTaskExecutor: async-service-, 2. do submit,taskCount [100], completedTaskCount [86], activeCount [5], queueSize [9] 2018-01-21 23:04:56.240INFO 15580 --- [nio-8080-exec-2] c.b.t.controller.Hello: end submit 2018-01-21 23:04:56.298INFO 15580 --- [async-service-2] c.b.t.service.impl.AsyncServiceImpl: end executeAsync 2018-01-21 23:04:56.298INFO 15580 --- [async-service-2] c.b.t.service.impl.AsyncServiceImpl: start executeAsync 2018-01-21 23:04:56.372INFO 15580 --- [nio-8080-exec-3] c.b.t.controller.Hello: start submit 2018-01-21 23:04:56.373INFO 15580 --- [nio-8080-exec-3] c.b.t.e.VisiableThreadPoolTaskExecutor: async-service-, 2. do submit,taskCount [101], completedTaskCount [87], activeCount [5], queueSize [9] 2018-01-21 23:04:56.373INFO 15580 --- [nio-8080-exec-3] c.b.t.controller.Hello: end submit 2018-01-21 23:04:56.444INFO 15580 --- [async-service-3] c.b.t.service.impl.AsyncServiceImpl: end executeAsync 2018-01-21 23:04:56.445INFO 15580 --- [async-service-3] c.b.t.service.impl.AsyncServiceImpl: start executeAsync

  • 注意这一行日志:2. do submit,taskCount [101], completedTaskCount [87], activeCount [5], queueSize [9]
  • 【springboot线程池的使用和扩展】这说明提交任务到线程池的时候,调用的是submit(Callable< T> task)这个方法,当前已经提交了101个任务,完成了87个,当前有5个线程在处理任务,还剩9个任务在队列中等待,线程池的基本情况一路了然;
  • 至此,springboot线程池服务的实战就完成了,希望能帮您在工程中快速实现异步服务;
欢迎关注51CTO博客:程序员欣宸

    推荐阅读