Java架构直通车——RabbitMQ池化方案


文章目录

  • 引入
  • 使用线程池
    • 线程池介绍
    • 实现消息异步提交的线程池
  • 池化RabbitTemplate

引入 先来看下面一段代码
@Autowired private RabbitTemplate rabbitTemplate; private void sendKernel(Message message){ String topic=message.getTopic(); String routingKey=message.getRoutingKey(); CorrelationData correlationData=https://www.it610.com/article/new CorrelationData( String.format("%s#%s", message.getMessageId(), System.currentTimeMillis()) ); rabbitTemplate.convertAndSend(topic,routingKey,message,null,correlationData); log.info("#RabbitBrokerImpl.sendKernel# send to rabbitmq, messageid: {}", message.getMessageId()); }

sendKernel()方法很容易理解,根据传入消息使用rabbitTemplate来发送消息。这样做的效率并不是很高,我们可以使用线程池来异步发送消息。
使用线程池 线程池介绍 Java 5+中的Executor接口定义一个执行线程的工具。它的子类型即线程池接口是ExecutorService。要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,因此在工具类Executors面提供了一些静态工厂方法,生成一些常用的线程池,如下所示:
  • newSingleThreadExecutor:创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
  • newFixedThreadPool:创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
  • newCachedThreadPool:创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
  • newScheduledThreadPool:创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
  • newSingleThreadExecutor:创建一个单线程的线程池。此线程池支持定时以及周期性执行任务的需求。
当然,也可以不使用Executors类,自己创建一个ExecutorService
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)Creates a new ThreadPoolExecutor with the given initial parameters. Parameters: corePoolSize - the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set maximumPoolSize - the maximum number of threads to allow in the pool keepAliveTime - when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating. unit - the time unit for the keepAliveTime argument workQueue - the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method. threadFactory - the factory to use when the executor creates a new thread handler - the handler to use when execution is blocked because the thread bounds and queue capacities are reached

前面几个参数很好理解,这里主要解释下后面几个参数。
workQueue任务队列:用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。
  • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
  • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列
  • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高
  • LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
  • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
theadFactory看名字就明白是用于创建Thread类的。
RejectedExecutionHandler:当Executor已经关闭(即执行了executorService.shutdown()方法后),并且Executor将有限边界用于最大线程和工作队列容量,且已经饱和时,在方法execute()中提交的新任务将被拒绝.
在以上述情况下,execute 方法将调用其 RejectedExecutionHandler 的RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。
  • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常,默认方法。
  • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
  • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
实现消息异步提交的线程池 【Java架构直通车——RabbitMQ池化方案】最终,我们实现的线程池如下:
@Slf4j public class AsyncBaseQueue { public static final int THREAD_SIZE = Runtime.getRuntime().availableProcessors(); public static final int QUEUE_SIZE = 10000; public static ExecutorService executorService = new ThreadPoolExecutor(THREAD_SIZE, THREAD_SIZE, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(QUEUE_SIZE), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(); t.setName("rabbitmq_client_async_sender"); return t; } }, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { log.error("async sender is error rejected, runnable:{}, executor:{}", r, executor); } }); public static void submit(Runnable r){ executorService.submit(r); } }

所以在sendKernel()方法中,可以有如下调用:
private void sendKernel(Message message){ //使用线程池进行异步提交 AsyncBaseQueue.submit(new Runnable() { @Override public void run() { String topic=message.getTopic(); String routingKey=message.getRoutingKey(); CorrelationData correlationData=https://www.it610.com/article/new CorrelationData( String.format("%s#%s", message.getMessageId(), System.currentTimeMillis()) ); rabbitTemplate.convertAndSend(topic,routingKey,message,null,correlationData); log.info("#RabbitBrokerImpl.sendKernel# send to rabbitmq, messageid: {}", message.getMessageId()); } }); }

不过即使实现了异步提交,还有需要改进的地方。我们看到,首先我们注入了:
@Autowired private RabbitTemplate rabbitTemplate;

虽然实现了线程池,完成了多线程,但是RabbitTemplate是由Spring帮我们注入的,默认是单例的。所以,我们还需要把RabbitTemplate池化。
池化RabbitTemplate 池化有两个好处:
  1. 提高发送效率。
  2. 根据不同需求定制化rabbittemplate模版,比如每一个topic都有自己的routingkey规则。(注:这里的topic指的就是exchange,所谓的规则也就是exchange和routingkey的绑定规则。)
实现方式直接看代码:
@Slf4j @Component public class RabbitTemplateContainer implements RabbitTemplate.ConfirmCallback { private Map/* TOPIC */, RabbitTemplate> rabbitMap = Maps.newConcurrentMap(); private Splitter splitter=Splitter.on("#"); /* 连接工厂 */ @Autowired private ConnectionFactory connectionFactory; public RabbitTemplate getTemplate(Message message) throws MessageRuntimeException { Preconditions.checkNotNull(message); String topic = message.getTopic(); RabbitTemplate rabbitTemplate = rabbitMap.get(topic); if (rabbitTemplate != null) { return rabbitTemplate; } log.info("#RabbitTemplateContainer.getTemplate# topic: {} is not exist, create one",topic); RabbitTemplate newRabbitTemplate=new RabbitTemplate(connectionFactory); newRabbitTemplate.setRetryTemplate(new RetryTemplate()); newRabbitTemplate.setRoutingKey(message.getRoutingKey()); newRabbitTemplate.setExchange(topic); // TODO: 2020-02-27 对message的序列化方式 //newRabbitTemplate.setMessageConverter(); String messageType=message.getMessageType(); if(!MessageType.RAPID.equals(messageType)){ //需要设置callback newRabbitTemplate.setConfirmCallback(this); }rabbitMap.put(topic,newRabbitTemplate); return rabbitMap.get(topic); }@Override public void confirm(CorrelationData correlationData, boolean b, String s) { // TODO: 2020-02-27 具体的应答 List> strings=splitter.splitToList(correlationData.getId()); String messageId=strings.get(0); long sendTime=Long.parseLong(strings.get(1)); if (b){ log.info("send message is OK, confirm messageId: {}, send time: {}",messageId,sendTime); }else{ log.error("send message is Failed, confirm messageId: {}, send time: {}",messageId,sendTime); } } }

我们首先解释,为什么使用Map,使用Map可以将不同topic的路由规则与rabbitTemplate绑定,我们可以为每一种不同的topic来定制化不同的rabbitTemplate,比如有的rabbitTemplate可以设置回调setConfirmCallback(),有的不需要设置回调。
其次,需要解释为什么使用的是ConcurrentMap(代码中使用的google api来初始化的,不影响我们的理解)。
java8是这么解释的:
ConcurrentMap : A Map providing thread safety and atomicity guarantees.
在多线程环境下,使用普通的Map进行put操作会引起死循环,导致CPU利用率接近100%,普通的Map在并发执行put操作时会引起死循环,是因为多线程会导致普通的Map的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。所以这里使用线程安全且高效的ConcurrentMap(一般是ConcurrentHashMap) 即可。
其他代码就是为了实现不同topic定制化的rabbitTemplate而写的了,不做赘述。

    推荐阅读