SpringBoot技术专题「Async&Future」异步编程机制以及功能分析讲解

【SpringBoot技术专题「Async&Future」异步编程机制以及功能分析讲解】识字粗堪供赋役,不须辛苦慕公卿。这篇文章主要讲述SpringBoot技术专题「Async&Future」异步编程机制以及功能分析讲解相关的知识,希望能为你提供帮助。
本文内容

  • Future 模式介绍以及核心思想
  • 核心线程数、最大线程数的区别,队列容量代表什么;
  • ThreadPoolTaskExecutor 饱和策略;
  • SpringBoot 异步编程实战,搞懂代码的执行逻辑。
Future 模式
  • 异步编程在处理耗时操作以及多任务处理的场景下非常有用,我们可以更好的让我们的系统利用好机器的CPU和内存,提高它们的利用率。
  • 多线程设计模式有很多种,Future模式是多线程开发中较为常见的一种设计模式,本文也是基于这种模式来说明 SpringBoot 对于异步编程的知识。
Future的核心思想SpringBoot 异步编程实战
  1. @EnableAsync:通过在配置类或者Main类上加@EnableAsync开启对异步方法的支持。
  2. @Async 可以作用在类上或者方法上,作用在类上代表这个类的所有方法都是异步方法。
TaskExecutor
  • 很多人对于TaskExecutor 不是太了解,所以我们花一点篇幅先介绍一下这个东西。从名字就能看出它是任务的执行者,它领导执行着线程来处理任务,就像司令官一样,而我们的线程就好比一只只军队一样,这些军队可以异步对敌人进行打击。
  • Spring提供了TaskExecutor接口作为任务执行者的抽象,它和java.util.concurrent包下的Executor接口很像。稍微不同的 TaskExecutor接口用到了 Java 8 的语法@FunctionalInterface声明这个接口是一个函数式接口。
org.springframework.core.task.TaskExecutor @FunctionalInterface public interface TaskExecutor extends Executor { void execute(Runnable var1); }

自定义AsyncConfigurer
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; @Configuration @EnableAsync public class AsyncConfig implements AsyncConfigurer {private static final int CORE_POOL_SIZE = 6; private static final int MAX_POOL_SIZE = 10; private static final int QUEUE_CAPACITY = 100; @Bean public Executor taskExecutor() { // Spring 默认配置是核心线程数大小为1,最大线程容量大小不受限制,队列容量也不受限制。 ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 核心线程数 executor.setCorePoolSize(CORE_POOL_SIZE); // 最大线程数 executor.setMaxPoolSize(MAX_POOL_SIZE); // 队列大小 executor.setQueueCapacity(QUEUE_CAPACITY); // 当最大池已满时,此策略保证不会丢失任务请求,但是可能会影响应用程序整体性能。 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setThreadNamePrefix("My ThreadPoolTaskExecutor-"); executor.initialize(); return executor; } }

ThreadPoolTaskExecutor常见概念
  • Core Pool Size : 核心线程数线程数定义了最小可以同时运行的线程数量。
  • Queue Capacity : 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,信任就会被存放在队列中。
  • Maximum Pool Size : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
ThreadPoolTaskExecutor 饱和策略定义:如果当前同时运行的线程数量达到最大线程数量时,ThreadPoolTaskExecutor 定义一些策略:
  • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务。您不会任务请求。但是这种策略会降低对于新任务提交速度,影响程序的整体性能。
    • 另外,这个策略喜欢增加队列容量。如果您的应用程序可以承受此延迟并且你不能任务丢弃任何一个任务请求的话,你可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求。
  1. 编写一个异步的方法
请留意completableFutureTask方法中的第一行打印日志这句代码,后面分析程序中会用到,很重要!
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @Service public class AsyncService { private static final Logger logger = LoggerFactory.getLogger(AsyncService.class); private List< String> movies = new ArrayList< > ( Arrays.asList( "Forrest Gump", "Titanic", "Spirited Away", "The Shawshank Redemption", "Zootopia", "Farewell ", "Joker", "Crawl")); /** 示范使用:找到特定字符/字符串开头的电影 */ @Async public CompletableFuture< List< String> > completableFutureTask(String start) { // 打印日志 logger.warn(Thread.currentThread().getName() + "start this task!"); // 找到特定字符/字符串开头的电影 List< String> results = movies.stream().filter(movie -> movie.startsWith(start)).collect(Collectors.toList()); // 模拟这是一个耗时的任务 try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } //返回一个已经用给定值完成的新的CompletableFuture。 return CompletableFuture.completedFuture(results); } }

  1. 测试编写的异步方法
    @RestController @RequestMapping("/async") public class AsyncController {@Autowired AsyncService asyncService; @GetMapping("/movies") public String completableFutureTask() throws ExecutionException, InterruptedException { //开始时间 long start = System.currentTimeMillis(); // 开始执行大量的异步任务 List< String> words = Arrays.asList("F", "T", "S", "Z", "J", "C"); List< CompletableFuture< List< String> > > completableFutureList = words.stream() .map(word -> asyncService.completableFutureTask(word)) .collect(Collectors.toList()); // CompletableFuture.join() 方法可以获取他们的结果并将结果连接起来 List< List< String> > results = completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList()); // 打印结果以及运行程序运行花费时间 System.out.println("Elapsed time: " + (System.currentTimeMillis() - start)); return results.toString(); } }

请求这个接口,控制台打印出下面的内容:
2019-10-01 13:50:17.007WARN 18793 --- [lTaskExecutor-1] g.j.a.service.AsyncService: My ThreadPoolTaskExecutor-1start this task! 2019-10-01 13:50:17.007WARN 18793 --- [lTaskExecutor-6] g.j.a.service.AsyncService: My ThreadPoolTaskExecutor-6start this task! 2019-10-01 13:50:17.007WARN 18793 --- [lTaskExecutor-5] g.j.a.service.AsyncService: My ThreadPoolTaskExecutor-5start this task! 2019-10-01 13:50:17.007WARN 18793 --- [lTaskExecutor-4] g.j.a.service.AsyncService: My ThreadPoolTaskExecutor-4start this task! 2019-10-01 13:50:17.007WARN 18793 --- [lTaskExecutor-3] g.j.a.service.AsyncService: My ThreadPoolTaskExecutor-3start this task! 2019-10-01 13:50:17.007WARN 18793 --- [lTaskExecutor-2] g.j.a.service.AsyncService: My ThreadPoolTaskExecutor-2start this task! Elapsed time: 1010

List< String> words = Arrays.asList("F", "T", "S", "Z", "J", "C"); List< CompletableFuture< List< String> > > completableFutureList = words.stream() .map(word -> asyncService.completableFutureTask(word)) .collect(Collectors.toList());

特殊情况 无需返回值下面会演示一下客户端不需要返回结果的情况:
将completableFutureTask方法变为 void 类型
@Async public void completableFutureTask(String start) { ...... //这里可能是系统对任务执行结果的处理,比如存入到数据库等等...... //doSomeThingWithResults(results); }

Controller 代码修改如下:
@GetMapping("/movies") public String completableFutureTask() throws ExecutionException, InterruptedException { // Start the clock long start = System.currentTimeMillis(); // Kick of multiple, asynchronous lookups List< String> words = Arrays.asList("F", "T", "S", "Z", "J", "C"); words.stream() .forEach(word -> asyncService.completableFutureTask(word)); // Wait until they are all done // Print results, including elapsed time System.out.println("Elapsed time: " + (System.currentTimeMillis() - start)); return "Done"; }

请求这个接口,控制台打印出下面的内容:
Elapsed time: 0 2019-10-01 14:02:44.052WARN 19051 --- [lTaskExecutor-4] g.j.a.service.AsyncService: My ThreadPoolTaskExecutor-4start this task! 2019-10-01 14:02:44.052WARN 19051 --- [lTaskExecutor-3] g.j.a.service.AsyncService: My ThreadPoolTaskExecutor-3start this task! 2019-10-01 14:02:44.052WARN 19051 --- [lTaskExecutor-2] g.j.a.service.AsyncService: My ThreadPoolTaskExecutor-2start this task! 2019-10-01 14:02:44.052WARN 19051 --- [lTaskExecutor-1] g.j.a.service.AsyncService: My ThreadPoolTaskExecutor-1start this task! 2019-10-01 14:02:44.052WARN 19051 --- [lTaskExecutor-6] g.j.a.service.AsyncService: My ThreadPoolTaskExecutor-6start this task! 2019-10-01 14:02:44.052WARN 19051 --- [lTaskExecutor-5] g.j.a.service.AsyncService: My ThreadPoolTaskExecutor-5start this task!

下一章内容:
  • Future vs. CompletableFuture的对比分析
  • CompetableFuture和Future的源代码分析
参考引用
  • https://spring.io/guides/gs/async-method/
  • https://medium.com/trendyol-tech/spring-boot-async-executor-management-with-threadpooltaskexecutor-f493903617d

    推荐阅读