文章目录
- 引入
- 使用线程池
- 线程池介绍
- 实现消息异步提交的线程池
- 池化RabbitTemplate
引入 先来看下面一段代码
@Autowired
private RabbitTemplate rabbitTemplate;
private void sendKernel(Message message){
String topic=message.getTopic();
String routingKey=message.getRoutingKey();
CorrelationData correlationData=https://www.it610.com/article/new CorrelationData(
String.format("%s#%s", message.getMessageId(), System.currentTimeMillis())
);
rabbitTemplate.convertAndSend(topic,routingKey,message,null,correlationData);
log.info("#RabbitBrokerImpl.sendKernel# send to rabbitmq, messageid: {}",
message.getMessageId());
}
sendKernel()
方法很容易理解,根据传入消息使用rabbitTemplate
来发送消息。这样做的效率并不是很高,我们可以使用线程池来异步发送消息。使用线程池 线程池介绍 Java 5+中的Executor接口定义一个执行线程的工具。它的子类型即线程池接口是ExecutorService。要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,因此在工具类Executors面提供了一些静态工厂方法,生成一些常用的线程池,如下所示:
- newSingleThreadExecutor:创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
- newFixedThreadPool:创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
- newCachedThreadPool:创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
- newScheduledThreadPool:创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
- newSingleThreadExecutor:创建一个单线程的线程池。此线程池支持定时以及周期性执行任务的需求。
Executors
类,自己创建一个ExecutorService
。public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)Creates a new ThreadPoolExecutor with the given initial parameters.
Parameters:
corePoolSize - the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set
maximumPoolSize - the maximum number of threads to allow in the pool
keepAliveTime - when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
unit - the time unit for the keepAliveTime argument
workQueue - the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.
threadFactory - the factory to use when the executor creates a new thread
handler - the handler to use when execution is blocked because the thread bounds and queue capacities are reached
前面几个参数很好理解,这里主要解释下后面几个参数。
workQueue
任务队列:用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。- ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
- LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列
- SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高
- LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
- PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
theadFactory
看名字就明白是用于创建Thread类的。RejectedExecutionHandler
:当Executor已经关闭(即执行了executorService.shutdown()方法后),并且Executor将有限边界用于最大线程和工作队列容量,且已经饱和时,在方法execute()中提交的新任务将被拒绝.在以上述情况下,execute 方法将调用其 RejectedExecutionHandler 的RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。
- ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常,默认方法。
- ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
@Slf4j
public class AsyncBaseQueue {
public static final int THREAD_SIZE = Runtime.getRuntime().availableProcessors();
public static final int QUEUE_SIZE = 10000;
public static ExecutorService executorService =
new ThreadPoolExecutor(THREAD_SIZE, THREAD_SIZE, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_SIZE), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread();
t.setName("rabbitmq_client_async_sender");
return t;
}
}, new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.error("async sender is error rejected, runnable:{}, executor:{}",
r, executor);
}
});
public static void submit(Runnable r){
executorService.submit(r);
}
}
所以在
sendKernel()
方法中,可以有如下调用:private void sendKernel(Message message){
//使用线程池进行异步提交
AsyncBaseQueue.submit(new Runnable() {
@Override
public void run() {
String topic=message.getTopic();
String routingKey=message.getRoutingKey();
CorrelationData correlationData=https://www.it610.com/article/new CorrelationData(
String.format("%s#%s", message.getMessageId(), System.currentTimeMillis())
);
rabbitTemplate.convertAndSend(topic,routingKey,message,null,correlationData);
log.info("#RabbitBrokerImpl.sendKernel# send to rabbitmq, messageid: {}",
message.getMessageId());
}
});
}
不过即使实现了异步提交,还有需要改进的地方。我们看到,首先我们注入了:
@Autowired
private RabbitTemplate rabbitTemplate;
虽然实现了线程池,完成了多线程,但是RabbitTemplate是由Spring帮我们注入的,默认是单例的。所以,我们还需要把RabbitTemplate池化。
池化RabbitTemplate 池化有两个好处:
- 提高发送效率。
- 根据不同需求定制化rabbittemplate模版,比如每一个topic都有自己的routingkey规则。(注:这里的topic指的就是exchange,所谓的规则也就是exchange和routingkey的绑定规则。)
@Slf4j
@Component
public class RabbitTemplateContainer implements RabbitTemplate.ConfirmCallback {
private Map/* TOPIC */, RabbitTemplate> rabbitMap = Maps.newConcurrentMap();
private Splitter splitter=Splitter.on("#");
/* 连接工厂 */
@Autowired
private ConnectionFactory connectionFactory;
public RabbitTemplate getTemplate(Message message) throws MessageRuntimeException {
Preconditions.checkNotNull(message);
String topic = message.getTopic();
RabbitTemplate rabbitTemplate = rabbitMap.get(topic);
if (rabbitTemplate != null) {
return rabbitTemplate;
}
log.info("#RabbitTemplateContainer.getTemplate# topic: {} is not exist, create one",topic);
RabbitTemplate newRabbitTemplate=new RabbitTemplate(connectionFactory);
newRabbitTemplate.setRetryTemplate(new RetryTemplate());
newRabbitTemplate.setRoutingKey(message.getRoutingKey());
newRabbitTemplate.setExchange(topic);
// TODO: 2020-02-27 对message的序列化方式
//newRabbitTemplate.setMessageConverter();
String messageType=message.getMessageType();
if(!MessageType.RAPID.equals(messageType)){
//需要设置callback
newRabbitTemplate.setConfirmCallback(this);
}rabbitMap.put(topic,newRabbitTemplate);
return rabbitMap.get(topic);
}@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
// TODO: 2020-02-27 具体的应答
List> strings=splitter.splitToList(correlationData.getId());
String messageId=strings.get(0);
long sendTime=Long.parseLong(strings.get(1));
if (b){
log.info("send message is OK, confirm messageId: {}, send time: {}",messageId,sendTime);
}else{
log.error("send message is Failed, confirm messageId: {}, send time: {}",messageId,sendTime);
}
}
}
我们首先解释,为什么使用
Map
,使用Map可以将不同topic的路由规则与rabbitTemplate绑定,我们可以为每一种不同的topic来定制化不同的rabbitTemplate,比如有的rabbitTemplate可以设置回调setConfirmCallback()
,有的不需要设置回调。其次,需要解释为什么使用的是
ConcurrentMap
(代码中使用的google api来初始化的,不影响我们的理解)。java8是这么解释的:在多线程环境下,使用普通的Map进行put操作会引起死循环,导致CPU利用率接近100%,普通的Map在并发执行put操作时会引起死循环,是因为多线程会导致普通的Map的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。所以这里使用线程安全且高效的ConcurrentMap(一般是ConcurrentHashMap) 即可。
ConcurrentMap : A Map providing thread safety and atomicity guarantees.
其他代码就是为了实现不同topic定制化的rabbitTemplate而写的了,不做赘述。