2018-04-21|2018-04-21 DefaultMQProducer 重写版
package com.okdeer.base.framework.mq;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.SendStatus;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.okdeer.base.framework.mq.codec.MQCodecUtils;
import com.okdeer.base.framework.mq.message.MQMessage;
/** * 娑堟伅鐢熶骇鑰? * * @pr yschome-base * @author guocp * @date 2015骞?11鏈?19鏃? 涓嬪崍4:09:29 */public class RocketMQProducer implements IMQProducer, InitializingBean {/** 鏃ュ織瀵硅薄 */private static final Logger logger = LoggerFactory.getLogger(RocketMQProducer.class);
/** 瀹炰緥鍚嶇О */private static final String INSTANCE_NAME = "rocketmq_instance";
/** 娑堟伅鐢熶骇鑰? */private DefaultMQProducer producer;
/** rocketMq閰嶇疆 */@Autowiredprivate RocketMQConfigurer rocketMQconfig;
/** * @desc 鍒濆鍖? * * @throws Exception */@Overridepublic void afterPropertiesSet() throws Exception {// 鍒濆鍖栨秷鎭敓浜ц??initProducer();
// 鍚姩鏈嶅姟this.start();
// 鍚姩搴旂敤鏈嶅姟鍣ㄥ叧闂挬瀛?shutdownHook();
}/** * 鍒濆鍖栨秷鎭敓浜ц?? * * @param config 閰嶇疆 */private void initProducer() {producer = new DefaultMQProducer(rocketMQconfig.getProducerGroupName());
producer.setInstanceName(INSTANCE_NAME);
// name Server鍦板潃producer.setNamesrvAddr(rocketMQconfig.getNamesrvAddr());
producer.setDefaultTopicQueueNums(rocketMQconfig.getDefaultTopicQueueNums());
producer.setRetryTimesWhenSendFailed(rocketMQconfig.getRetryTimesWhenSendFailed());
producer.setSendMsgTimeout(rocketMQconfig.getSendMsgTimeout());
producer.setMaxMessageSize(rocketMQconfig.getMaxMessageSize());
}@Overridepublic SendResult send(final Message msg) throws Exception {try {SendResult sendResult = producer.send(msg);
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {logger.warn("娑堟伅涓棿浠跺彂閫佹秷鎭紓甯革細{},鐘舵?佷负锛歿}", msg.getKeys(), sendResult.getSendStatus());
}return sendResult;
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {logger.error("娑堟伅涓棿浠跺彂閫佹秷鎭紓甯革紝娑堟伅key涓猴細{}", msg.getKeys(), e);
throw e;
}}@Overridepublic SendResult send(final Message msg, final String orderCode) throws Exception {try {SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List mqs, Message msg, Object arg) {
int index = Math.abs(arg.hashCode() % mqs.size());
return mqs.get(index);
}
}, orderCode);
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
logger.warn("娑堟伅涓棿浠跺彂閫侀『搴忔秷鎭紓甯革細{},鐘舵?佷负锛歿}", msg.getKeys(), sendResult.getSendStatus());
}
return sendResult;
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
logger.error("娑堟伅涓棿浠跺彂閫侀『搴忔秷鎭紓甯革紝娑堟伅key涓猴細{}", msg.getKeys(), e);
throw e;
}
}
/**
* @Description: 鍙戦?佹秷鎭?
* @param anMessage 鍖呰娑堟伅浣?
* @return
* @throws Exception
* @author guocp
* @date 2016骞?10鏈?28鏃?
*/
public SendResult sendMessage(final MQMessage anMessage) throws Exception {
final Message message = MQCodecUtils.wrap(anMessage);
if (message == null) {
throw new Exception("鍖呰娑堟伅鍑洪敊锛?");
}
if (anMessage.getKey() != null) {
message.setKeys(anMessage.getKey());
}
if (anMessage.getDelayTimeLevel() != null) {
message.setDelayTimeLevel(anMessage.getDelayTimeLevel());
}
SendResult sendResult = this.send(message);
return sendResult;
}
/**
* @Description: 鍙戦?侀『搴忔秷鎭?
* @param anMessage 鍖呰娑堟伅浣?
* @param orderCode
* @return
* @throws Exception
* @author guocp
* @date 2016骞?10鏈?28鏃?
*/
public SendResult sendMessage(final MQMessage anMessage, final String orderCode) throws Exception {
final Message message = MQCodecUtils.wrap(anMessage);
if (message == null) {
throw new Exception("鍖呰娑堟伅鍑洪敊锛?");
}
if (anMessage.getKey() != null) {
message.setKeys(anMessage.getKey());
}
if (anMessage.getDelayTimeLevel() != null) {
message.setDelayTimeLevel(anMessage.getDelayTimeLevel());
}
SendResult sendResult = this.send(message, orderCode);
return sendResult;
}
@Override
public void start() throws MQClientException {
producer.start();
}
/**
* 搴旂敤閫?鍑烘椂锛岃璋冪敤shutdown鏉ユ竻鐞嗚祫婧愶紝鍏抽棴缃戠粶杩炴帴锛屼粠rocketMQ鏈嶅姟鍣ㄤ笂娉ㄩ攢鑷繁 娉ㄦ剰锛氭垜浠缓璁簲鐢ㄥ湪JBOSS銆乀omcat绛夊鍣ㄧ殑閫?鍑洪挬瀛愰噷璋冪敤shutdown鏂规硶
*/
public void shutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
shutdown();
}
}));
}
@Override
public void shutdown() {
producer.shutdown();
}
【2018-04-21|2018-04-21 DefaultMQProducer 重写版】}
推荐阅读
- Speedy Duplicate Finder帮你查找重复和删除文件
- 【对线面试官】如何实现去重和幂等
- 投稿|摸底酒馆赛道:市场下沉,行业遭遇冰火两重天
- Java实现文件批量重命名|Java实现文件批量重命名,移动和删除
- 用 Antlr 重构脚本解释器
- 成都白癜风医院(白癜风在治疗上需要重视什么)
- 【重磅】YOYOW产品发布会预告
- 你的人生确定吗
- 【111-90】重新定位,我是谁()
- 何时,我能重拾像你们一样的勇气