Python|Python RabbitMQ原理和使用场景以及模式
RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件。一、RabbitMQ 原理简介 1. RabbitMQ 角色
- 生产者:消息的创建者,负责创建和推送数据到消息服务器;
- 消费者:消息的接收方,用于处理数据和确认消息;
- 代理:就是RabbitMQ本身,负责消息的传递。
客户端通过 TCP 连接到 RabbitMQ Server。
连接成功后 RabbitMQ 会创建一个 AMQP 信道。
信道是创建在 TCP 上的虚拟连接,AMQP 命令都是通过信道发送出去的,每个信道都会有一个唯一的 ID,不论是发布消息,订阅队列或者介绍消息都是通过信道完成的。
3. RabbitMQ 中的关键词
- Producer (生产者):消息生产者
- Consumer(消费者):消息的消费者
- Connection(连接):就是一个TCP的连接,Producer 和 Consumer 都是通过 TCP 连接到 RabbitMQ Server 的。
- Channel(信道):他是虚拟连接,他建立在 Connection 的 TCP 连接中。也就是说他是消息推送使用的通道;
- Exchange(交换器):是生产者发布消息的通道,接收生产者消息并将消息路由到消息队列;
- Queue(队列):是消费者接受消息的通道,用于存储生产者的消息;
- RoutingKey(路由键):用于把生成者的数据分配到交换器上;
- BindingKey(绑定键):用于把交换器的消息绑定到队列上;
RabbitMQ 会将持久化的消息写入磁盘上的持久化日志文件,等消息被消费之后,RabbitMQ 会把这条消息标识为等待垃圾回收。设置如下:
RabbitMQ 默认情况下是关闭消息持久化的。需要在创建队列的时候设置。
- 队列持久化
durable=True
消息的生产者和接收者都需要设置队列持久化
# durable=True 队列持久化
channel.queue_declare(queue='test', durable=True)
只做队列持久化是不行的还需要在加上消息持久化
- 消息持久化
delivery_mode=2
channel.basic_publish(
exchange="",
routing_key="test",
body="hello world",
properties=pika.BasicProperties(delivery_mode=2,)
)
- 消息持久化的缺点
因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量。
- 虚拟主机概念
虚拟主机是 RabbitMQ 创建的虚拟消息服务器。为了在一个 RabbitMQ 上实现多用户隔离。为此提供了一个虚拟主机(virtual hosts - vhosts)的概念。
- 虚拟主机的操作
命令 | 说明 | 参数 |
---|---|---|
rabbitmqctl list_vhosts |
列举所有虚拟主机 | |
rabbitmqctl add_vhost |
添加虚拟主机 | vhost_name 创开虚拟主机的名称 |
rabbitmqctl delete_vhost |
删除虚拟主机 | vhost_name 删除虚拟主机的名称 |
rabbitmqctl add_user |
添加用户 | username 用户名 password 密码 |
rabbitmqctl set_user_tags |
设置用户标签 | username 用户名 tag 标签 如:administrator, monitoring, management |
rabbitmqctl set_permissions [-p |
设置权限(如:队列交换机的创建和删除、发布消息、读取消息等) | user 用户名 conf 正则表达式匹配置资源能够被用户访问。write 正则表达式匹配置资源能够被用户读。read 正则表达式匹配置资源能够该用户访问 示例: rabbitmqctl set_permissions ceshi ".*" ".*" ".*" 给 ceshi 用户最高权限 |
前三个说的是 RabbitMQ 的 Exchange 的类型。RPC 是消费者和生产者互通的一种方式。1. Direct 直连交换机
当一个绑定了消息生产者routing_key = 1
的消息被发送给直连交换机时,交换机会把消息发送给绑定了routing_key = 1
的队列。
直连交换机经常用来循环分发任务给多个消费者。然后消息的负载均衡是发生在消费者之间的,而不是队列之间。
如下例:
import pikaconfig = pika.ConnectionParameters(
host='127.0.0.1',
credentials=pika.PlainCredentials('test', 'test'),
)# 创建 MQ 连接
conn = pika.BlockingConnection(config)
channel = conn.channel()# 在频道中创建一个队列
channel.exchange_declare(exchange='ceshi', type='direct')# 发送消息到指定队列
# exchange 指定交换器
# routing_key 设置路由键
# body 发送的内容
channel.basic_publish(
exchange='ceshi',
routing_key='1',
body='Hello World!'
)conn.close()
消息消费者
import pikaconfig = pika.ConnectionParameters(
host='127.0.0.1',
port=5672,
credentials=pika.PlainCredentials('test', 'test'),
)# 创建 MQ 连接
conn = pika.BlockingConnection(config)
channel = conn.channel()# 如果使用exchange, 这里检测 exchange 是否存在,如不存在创建。存在检测是否正确且是否符合 exchange_type 类型
channel.exchange_declare(exchange='ceshi', exchange_type='direct')# 在频道中创建一个队列
channel.queue_declare(queue='hello')# 将队列绑定到指定的 exchange
# routing_key 类似密钥,只接收 routing_key 正确的信息
channel.queue_bind(exchange='ceshi', queue='hello', routing_key='1')# 回调函数四个必须的参数 body 是传入的内容
# channel: BlockingChannel
# method: spec.Basic.Deliver
# properties: spec.BasicProperties
# body: str or unicode
def callback(channel, method, properties, body):
print channel
print method
print properties
print body# 指定队列调用的函数
# no_ack 参数 True 时处理完成后没有返回信息。False 时在处理完后应答
channel.basic_consume(
callback,
queue='hello',
no_ack=False
)
print 'waiting ...'
channel.start_consuming()
:在修改消费者的
routing_key
后,需要重新创建队列。2. Fanout 扇型交换机(订阅者模式)
他回将消息发送给绑定到它身上的所有队列,而不理会绑定的路由键。消息生产者
【Python|Python RabbitMQ原理和使用场景以及模式】如果 N 个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。
扇型交换机实现消息的广播。
以下几个应用场景
- 体育比赛用它来给手机客户端发送比分数据
- 广播各种状态和产品的更新
- 在群聊时分发消息给群聊中的用户
#!coding=utf-8
import pikaconfig = pika.ConnectionParameters(
host='127.0.0.1',
credentials=pika.PlainCredentials('test', 'test'),
)conn = pika.BlockingConnection(config)
channel = conn.channel()# 修改 type 为 fanout
channel.exchange_declare(exchange='ceshi2', type='fanout')channel.basic_publish(
exchange='ceshi2',
routing_key='',
body='Hello World!'
)conn.close()
在生产者中只需要修改
exchange_declare
他的 type
为 fanout
即可消息消费者
#! coding=utf-8
import pikaconfig = pika.ConnectionParameters(
host='127.0.0.1',
port=5672,
credentials=pika.PlainCredentials('test', 'test'),
)conn = pika.BlockingConnection(config)
channel = conn.channel()
channel.exchange_declare(exchange='ceshi2', exchange_type='fanout')# 订阅者模式
# 生成随机 queue_name
# 订阅者之间的 queue name 不能重复
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='ceshi2', queue=queue_name, routing_key='')def callback(channel, method, properties, body):
print channel
print method
print properties
print bodychannel.basic_consume(
callback,
queue=queue_name,
no_ack=False
)
print 'waiting ...'
channel.start_consuming()
3. Topic 主题交换机(模糊匹配)
通过发送者和接收者之间的例如:routing_key
相互匹配,将消息路由给一个或多个队列。
主题交换机通常用来实现消息的多路由广播
以下应用场景
- 发布不同分类或者不同标签的新闻(例如,发送游戏类新闻和体育类新闻)
- 系统中不同种类服务的调用(例如:发布系统和付费系统的调用)
接收者(消费者)
routing_key='a.*'
发送者(生产者)
routing_key = 'a.b.c.d'
结果:匹配失败
因为:
*
表示匹配一个单词#
表示匹配0个或多个单词消息生产者
#!coding=utf-8
import pikaconfig = pika.ConnectionParameters(
host='127.0.0.1',
credentials=pika.PlainCredentials('test', 'test'),
)conn = pika.BlockingConnection(config)
channel = conn.channel()# 修改 type 为 topic
channel.exchange_declare(exchange='ceshi3', type='topic')channel.basic_publish(
exchange='ceshi3',
routing_key='a.b.c.d',
body='Hello World!'
)conn.close()
消息消费者
#! coding=utf-8
import pikaconfig = pika.ConnectionParameters(
host='127.0.0.1',
port=5672,
credentials=pika.PlainCredentials('test', 'test'),
)conn = pika.BlockingConnection(config)channel = conn.channel()channel.exchange_declare(exchange='ceshi3', exchange_type='topic')
channel.queue_declare(queue='hello')# routing_key='a.*' 此时是无法接受到信息的,因为生产者发送的是 a.* a. 后面一个单词。修改 routing_key='a.#' 即可接受成功
channel.queue_bind(exchange='ceshi3', queue='hello', routing_key='a.*')def callback(channel, method, properties, body):
print channel
print method
print properties
print bodychannel.basic_consume(
callback,
queue='hello',
no_ack=False
)
print 'waiting ...'
channel.start_consuming()
4. RPC
生产者发送消息给消费者,并接收消费者处理完的结果消息生产者
原理:
- 生产者会创建一个新的队列,用来接收消费者返回的信息。
- 生产者在发送消息的同时,还会发送 1步骤中创建的队列名和一个
correlation_id
用来验证- 当消费者处理完数据后,会把结果和
correlation_id
发送到 1步骤创建的队列中去- 生产者会使用一个循环
while
来监测返回结果self.response
- 生产者获取到数据后比对
correlation_id
是否一致,然后结束此次发送流程
#! coding=utf-8
import pika
import uuidclass RpcClient(object):
def __init__(self):
config = pika.ConnectionParameters(
host='127.0.0.1',
credentials=pika.PlainCredentials('test', 'test'),
)self.conn = pika.BlockingConnection(config)
self.channel = self.conn.channel()# 在频道中创建一个队列
result = self.channel.queue_declare(exclusive=True)# 生成队列名
self.queue_name = result.method.queue# 指定队列要调用的函数
self.channel.basic_consume(
self.on_request,
no_ack=True,
queue=self.queue_name
)def on_request(self, channel, method, props, body):
if self.corr_id == props.correlation_id:
self.response = bodydef call(self, message):
self.response = None
self.corr_id = str(uuid.uuid4)# 发送消息到指定队列
self.channel.basic_publish(
exchange='',
routing_key='rpc',
# 消费者返回处理结果需要的队列信息(reply_to)和相关 id(correlation_id).
# 以便消费者知道返回给那个生产者
properties=pika.BasicProperties(
reply_to=self.queue_name,
correlation_id=self.corr_id
),
body=message
)while self.response is None:
# 是一个等待消息的阻塞过程,连接的任何消息都可以使它脱离阻塞状态
self.conn.process_data_events()
return self.responsefibonacci_rpc = RpcClient()print "等待处理结果"response = fibonacci_rpc.call('Hello World!')
print "处理完返回信息: %s" % response
消息消费者
#! coding=utf-8
import pika
import timeconfig = pika.ConnectionParameters(
host='127.0.0.1',
credentials=pika.PlainCredentials('test', 'test'),
)conn = pika.BlockingConnection(config)
channel = conn.channel()channel.exchange_declare(exchange='rpc')
channel.queue_declare(queue='rpc')def on_request(channel, method, props, body):
print '处理中, 收到内容:%s' % body# 发送消息到指定队列
# exchange 指定 exchange
# routing_key 设置为队列的名称
# body 发送的内容
channel.basic_publish(
exchange='',
# props.reply_to 是生产者发送过来 接收处理结果的 指定队列
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body='处理完成'
)
# 告知 rabbit 消息已经处理完
channel.basic_ack(delivery_tag=method.delivery_tag)# 使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,只有工作者完成任务之后,才会再次接收到任务
channel.basic_qos(prefetch_count=1)# 接收的结果调用 on_request 函数处理
channel.basic_consume(on_request, queue='rpc')
print("waiting....")# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
channel.start_consuming()conn.close()
三、RabbitMQ 使用场景 1. 任务相互依赖
例: 三个任务,任务1、任务2、任务3。他们之间有相互的制约。执行
任务1
的前提是要有任务2
的结果。执行任务2
需要任务3
的结果。一般会使用
crontab
来做计划任务。预估一下每个任务的完成时间。然后制定任务。当数量变大处理时间变成,就需要经常修改crontab
任务。使用 RabbitMQ 后每个任务结束时只需要发送一个结束信息即可
如:
任务2
订阅任务3
的信息。当任务3
完成后发送一个完成消息。任务2
接收到完成消息后开始执行,在执行结束后发送任务2
完成消息。任务1
订阅任务2
的消息,然后执行。优点:
- 任务执行时间发生变化,不需要修改
crontab
任务 - 每个任务之间的预留时间没有了
例: 有三个用户A、B、C 他们分别发送文章,但后台会根据用户的级别做不同的操作。
A 是普通用户,系统发布。
B 是 VIP 用户,系统发布和推荐给关注这部分内容的客户。
C 是黑卡客户,系统发布、推荐给关注这部分内容的客户、在这个分类中置顶这篇文章。
文章发布服务只关心是否成功,剩下的操作都不关心。可以使用 RabbitMQ 服务将其他操作分离。
优点:
- 减少发布时间,只操作发布,其他操作由别的系统执行。
- 解耦,系统之间不在直接关联。
- 新增发布以后的其他操作不需要在修改发布系统代码。
例: 有一个操作 A。用户执行这个任务后,又非常需要结果。
解决方法:
- 用户调用操作 A。
- 操作 A 直接返回调用成功。此时只是调用成功。
- 操作 A 告诉后台应该执行什么程序。
- 后台执行完成后,将完成消息发送给 RabbitMQ。
- 用户订阅 RabbitMQ 中的操作 A 的消息。
四、开启 WEB 服务 RabbitMQ 自带管理后台,安装后需要配置开启
进入 RabbitMQ 安装目录中的
sbin
目录执行rabbitmq-plugins enable rabbitmq_management
http://localhost:15672/
用户名密码均为guest
推荐阅读
- 做一件事情的基本原理是什么()
- python学习之|python学习之 实现QQ自动发送消息
- 逻辑回归的理解与python示例
- python自定义封装带颜色的logging模块
- 【Leetcode/Python】001-Two|【Leetcode/Python】001-Two Sum
- 【读书笔记】贝叶斯原理
- Python基础|Python基础 - 练习1
- Python爬虫|Python爬虫 --- 1.4 正则表达式(re库)
- Python(pathlib模块)
- python青少年编程比赛_第十一届蓝桥杯大赛青少年创意编程组比赛细则