spring大家太熟,就不多说了
rabbitmq一个amqp的队列服务实现,具体介绍请参考本文http://lynnkong.iteye.com/blog/1699684
本文侧重介绍如何将rabbitmq整合到项目中
ps:本文只是简单一个整合介绍,属于抛砖引玉,具体实现还需大家深入研究哈..
【spring整合消息队列rabbitmq】1.首先是生产者配置
<-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
<-- spring template声明-->
2.fastjson messageconver插件实现
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
import fe.json.FastJson;
public class FastJsonMessageConverterextends AbstractMessageConverter {
private static Log log = LogFactory.getLog(FastJsonMessageConverter.class);
public static final String DEFAULT_CHARSET = "UTF-8";
private volatile String defaultCharset = DEFAULT_CHARSET;
public FastJsonMessageConverter() {
super();
//init();
}public void setDefaultCharset(String defaultCharset) {
this.defaultCharset = (defaultCharset != null) ? defaultCharset
: DEFAULT_CHARSET;
}public Object fromMessage(Message message)
throws MessageConversionException {
return null;
}public T fromMessage(Message message,T t) {
String json = "";
try {
json = new String(message.getBody(),defaultCharset);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return (T) FastJson.fromJson(json, t.getClass());
}protected Message createMessage(Object objectToConvert,
MessageProperties messageProperties)
throws MessageConversionException {
byte[] bytes = null;
try {
String jsonString = FastJson.toJson(objectToConvert);
bytes = jsonString.getBytes(this.defaultCharset);
} catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"Failed to convert Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setContentEncoding(this.defaultCharset);
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
}
return new Message(bytes, messageProperties);
}
}
3.生产者端调用
import java.util.List;
import org.springframework.amqp.core.AmqpTemplate;
public class MyMqGatway {@Autowired
private AmqpTemplate amqpTemplate;
public void sendDataToCrQueue(Object obj) {
amqpTemplate.convertAndSend("queue_one_key", obj);
}
}
4.消费者端配置(与生产者端大同小异)
5.消费者端调用
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class QueueOneLitener implementsMessageListener{
@Override
public void onMessage(Message message) {
System.out.println(" data :" + message.getBody());
}
}
6.由于消费端当队列有数据到达时,对应监听的对象就会被通知到,无法做到批量获取,批量入库,因此可以在消费端缓存一个临时队列,将mq取出来的数据存入本地队列,后台线程定时批量处理即可
推荐阅读
- Kafka|Kafka VS RocketMQ VS RabbitMQ
- RabbitMQ|RabbitMQ 四种类型发送接收数据方式
- mac下安装rabbitmq遇到的坑
- rocketmq-连接rocketmq遇到的异常
- RabbitMQ读写消息的Java Demo
- RabbitMQ 常见面试题
- mac 安装 RabbitMQ
- Go|RabbitMQ系列笔记封装篇
- rabbitmq工具类带连接池
- Java|RabbitMQ-使用Java操作简单队列 simple queues