2018-04-21|2018-04-21 DefaultMQProducer 重写版

package com.okdeer.base.framework.mq; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.MessageQueueSelector; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.client.producer.SendStatus; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.remoting.exception.RemotingException; import com.okdeer.base.framework.mq.codec.MQCodecUtils; import com.okdeer.base.framework.mq.message.MQMessage; /** * 娑堟伅鐢熶骇鑰? * * @pr yschome-base * @author guocp * @date 2015骞?11鏈?19鏃? 涓嬪崍4:09:29 */public class RocketMQProducer implements IMQProducer, InitializingBean {/** 鏃ュ織瀵硅薄 */private static final Logger logger = LoggerFactory.getLogger(RocketMQProducer.class); /** 瀹炰緥鍚嶇О */private static final String INSTANCE_NAME = "rocketmq_instance"; /** 娑堟伅鐢熶骇鑰? */private DefaultMQProducer producer; /** rocketMq閰嶇疆 */@Autowiredprivate RocketMQConfigurer rocketMQconfig; /** * @desc 鍒濆鍖? * * @throws Exception */@Overridepublic void afterPropertiesSet() throws Exception {// 鍒濆鍖栨秷鎭敓浜ц??initProducer(); // 鍚姩鏈嶅姟this.start(); // 鍚姩搴旂敤鏈嶅姟鍣ㄥ叧闂挬瀛?shutdownHook(); }/** * 鍒濆鍖栨秷鎭敓浜ц?? * * @param config 閰嶇疆 */private void initProducer() {producer = new DefaultMQProducer(rocketMQconfig.getProducerGroupName()); producer.setInstanceName(INSTANCE_NAME); // name Server鍦板潃producer.setNamesrvAddr(rocketMQconfig.getNamesrvAddr()); producer.setDefaultTopicQueueNums(rocketMQconfig.getDefaultTopicQueueNums()); producer.setRetryTimesWhenSendFailed(rocketMQconfig.getRetryTimesWhenSendFailed()); producer.setSendMsgTimeout(rocketMQconfig.getSendMsgTimeout()); producer.setMaxMessageSize(rocketMQconfig.getMaxMessageSize()); }@Overridepublic SendResult send(final Message msg) throws Exception {try {SendResult sendResult = producer.send(msg); if (sendResult.getSendStatus() != SendStatus.SEND_OK) {logger.warn("娑堟伅涓棿浠跺彂閫佹秷鎭紓甯革細{},鐘舵?佷负锛歿}", msg.getKeys(), sendResult.getSendStatus()); }return sendResult; } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {logger.error("娑堟伅涓棿浠跺彂閫佹秷鎭紓甯革紝娑堟伅key涓猴細{}", msg.getKeys(), e); throw e; }}@Overridepublic SendResult send(final Message msg, final String orderCode) throws Exception {try {SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List mqs, Message msg, Object arg) {
int index = Math.abs(arg.hashCode() % mqs.size());
return mqs.get(index);
}
}, orderCode);
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
logger.warn("娑堟伅涓棿浠跺彂閫侀『搴忔秷鎭紓甯革細{},鐘舵?佷负锛歿}", msg.getKeys(), sendResult.getSendStatus());
}
return sendResult;
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
logger.error("娑堟伅涓棿浠跺彂閫侀『搴忔秷鎭紓甯革紝娑堟伅key涓猴細{}", msg.getKeys(), e);
throw e;
}
}
/**
* @Description: 鍙戦?佹秷鎭?
* @param anMessage 鍖呰娑堟伅浣?
* @return
* @throws Exception
* @author guocp
* @date 2016骞?10鏈?28鏃?
*/
public SendResult sendMessage(final MQMessage anMessage) throws Exception {
final Message message = MQCodecUtils.wrap(anMessage);
if (message == null) {
throw new Exception("鍖呰娑堟伅鍑洪敊锛?");
}
if (anMessage.getKey() != null) {
message.setKeys(anMessage.getKey());
}
if (anMessage.getDelayTimeLevel() != null) {
message.setDelayTimeLevel(anMessage.getDelayTimeLevel());
}
SendResult sendResult = this.send(message);
return sendResult;
}
/**
* @Description: 鍙戦?侀『搴忔秷鎭?
* @param anMessage 鍖呰娑堟伅浣?
* @param orderCode
* @return
* @throws Exception
* @author guocp
* @date 2016骞?10鏈?28鏃?
*/
public SendResult sendMessage(final MQMessage anMessage, final String orderCode) throws Exception {
final Message message = MQCodecUtils.wrap(anMessage);
if (message == null) {
throw new Exception("鍖呰娑堟伅鍑洪敊锛?");
}
if (anMessage.getKey() != null) {
message.setKeys(anMessage.getKey());
}
if (anMessage.getDelayTimeLevel() != null) {
message.setDelayTimeLevel(anMessage.getDelayTimeLevel());
}
SendResult sendResult = this.send(message, orderCode);
return sendResult;
}
@Override
public void start() throws MQClientException {
producer.start();
}
/**
* 搴旂敤閫?鍑烘椂锛岃璋冪敤shutdown鏉ユ竻鐞嗚祫婧愶紝鍏抽棴缃戠粶杩炴帴锛屼粠rocketMQ鏈嶅姟鍣ㄤ笂娉ㄩ攢鑷繁 娉ㄦ剰锛氭垜浠缓璁簲鐢ㄥ湪JBOSS銆乀omcat绛夊鍣ㄧ殑閫?鍑洪挬瀛愰噷璋冪敤shutdown鏂规硶
*/
public void shutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
shutdown();
}
}));
}
@Override
public void shutdown() {
producer.shutdown();
}
【2018-04-21|2018-04-21 DefaultMQProducer 重写版】}

    推荐阅读