RabbitMQ简单入门
1. 什么是RabbitMQ?
- RabbitMQ是一个基于AMQP(高级消息队列协议)、采用ErlangErlang语言编写并且开源的消息代理和队列服务器,通过普通的协议在完全不同的应用之前共享数据
- 官网介绍
- AMQP全称是Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
- 基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
- AMQP的概念模型参考 ??
文章图片
mq.png
Server:常被称作Broker,接收客户端的连接,实现AMQP消息队列的路由功能的进程。简单来说就是消息队列服务器实体;
Connection :连接,应用程序与Server的网络连接;
Channel:网络信道,进行消息的读写通道;
Message:服务与应用之间传送的数据由 properties 和 Body 组成;
Virtual Host:虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个vhost中可以有若干个Exchange和Queue,默认的vhost是"/";
Exchange:交换机,接收消息,根据 Routing Key 转发消息到绑定的队列,一个 Exchange 可以绑定多个 Message Queue;
Queue:消息队列,用来保存、转发消息到消费者;
Routing Key:一个路由规则,虚拟机可以用它来确定如何路由一个特定的消息;
Binding:EExchange 和 Queue 之间的虚拟链接,Binding 中可以包含 Routing Key;
Producer:消息生产者,就是投递消息的程序。
Consumer:消息消费者,就是接受消息的程序
4. 基于什么考量选择RabbitMQ?
- 开源的,跨平台、支持多种语言、文档齐全
- 可以提供高稳定性的保障
- 提供可靠性的消息投递模式
- 集群模式丰富并且易扩展
- 保证数据不丢失的前提下做到高可用
- Mac版本
brew update
brew install rabbitmq
- Linux版本
1) RabbitMQ本身是Erlang编写的,所以它的安装是依赖Erlang版本支持: http://www.rabbitmq.com/which-erlang.html
2) RabbitMQ还需要依赖好多插件,具体可以直接执行
3) 下载相关Erlang
http://www.rabbitmq.com/releases/erlang/
https://www.erlang-solutions.com/resources/download.html
4) 下载对应版本的RabbitMQ
http://www.rabbitmq.com/install-rpm.html#downloads5) 使用 rpm -ivh 先后安装 Erlang 和 RabbitMQ
- 启动:
【RabbitMQ简单入门】启动: rabbitmq-server start 或者 rabbitmqctl start_app6.生产消费入门示例
停止: rabbitmq-server stop ,如果使用 rabbitmqctl start_app 启动则需要使用 rabbitmqctl stop_app 停止
状态: rabbitmq-server status
- 引入依赖
com.rabbitmq
amqp-client
5.5.0
- 服务端
public class Producer {
public static void main(String[] args) throws Exception {
//1.获取连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2.创建一个新的连接
Connection connection = connectionFactory.newConnection();
//3.创建一个消息信道
Channel channel = connection.createChannel();
//4.发送消息
final String queueName = "test01";
String msg = "Hello RabbitMQ";
for (int i = 0;
i < 6;
i++) {
channel.basicPublish("", queueName,null,(msg + i).getBytes());
}
channel.close();
connection.close();
}
}
- 消费端
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
//1.创建一个消息信道
Channel channel = connection.createChannel();
//2.声明队列:队列名称,持久化,独占,自动删除,其他参数
final String queueName = "test01";
channel.queueDeclare(queueName, true, false, false, null);
//3.消费消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received: " + message);
}
});
}
}
7.温馨提示:
- amqp-client 在 5.X以后去掉了QueueingConsumer,具体原因引用一篇文章 为什么QueueingConsumer会被Deprecated?
- 官方建议使用 DefaultConsumer 替代 QueueingConsumer,因为后者存在内存溢出的问题
推荐阅读
- 科学养胃,别被忽悠,其实真的很简单
- opencv|opencv C++模板匹配的简单实现
- 松软可口易消化,无需烤箱超简单,新手麻麻也能轻松成功~
- 简单心理2019春A期+32+张荣
- 《算法》-图[有向图]
- android防止连续点击的简单实现(kotlin)
- 机器学习一些简单笔记
- typeScript入门基础介绍
- Android超简单实现沉浸式状态栏
- v-charts简单使用