【使用python基于zmq的DEALER-ROUTER模式实现分布式消息分发的demo】zmq的DEALER套接字是对REQ的一层包装,本质就是在发送数据前发送一段bytes数据(所以一个bytes数据就意味着一个连接),而ROUTER就是对REP的包装,是在接收数据前先接收bytes数据。REQ-REP可以看作bytes数据为空的DEALER-ROUTER模式,但是DEALER和ROUTER又不像REQ和REP一样严格遵循send-recv-send-recv.....的模式。
总体分为client-broker-worker三个部分,简化如下图所示。
文章图片
broker的伪码流程图:
文章图片
附代码:
client.py
import zmqctx = zmq.Context.instance()
socket = ctx.socket(zmq.DEALER)
socket.connect("tcp://localhost:12000")
if __name__ == '__main__':
socket.send(b"hello")
msg = socket.recv()
print(msg)
broker.py
import zmq
import time
from collections import OrderedDictcontext = zmq.Context.instance()
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:12000")
backend = context.socket(zmq.ROUTER)
backend.bind("tcp://*:13000")
frontend.setsockopt(zmq.RCVHWM, 100)
backend.setsockopt(zmq.RCVHWM, 100)workers = OrderedDict()
clients = {}
msg_cache = []
poll = zmq.Poller()poll.register(backend, zmq.POLLIN)
poll.register(frontend, zmq.POLLIN)if __name__ == '__main__':
while True:
socks = dict(poll.poll(10))
now = time.time()
# 接收后端消息
if backend in socks and socks[backend] == zmq.POLLIN:
# 接收后端地址、客户端地址、后端返回responseps: 此处的worker_addr, client_addr, reply均是bytes类型
worker_addr, client_addr, response = backend.recv_multipart()
# 把后端存入workers
workers[worker_addr] = time.time()
if client_addr in clients:
# 如果客户端地址存在,把返回的response转发给客户端,并删除客户端
frontend.send_multipart([client_addr, response])
clients.pop(client_addr)
else:
# 客户端不存在
print(worker_addr, client_addr)
# 处理所有未处理的消息
while len(msg_cache) > 0 and len(workers) > 0:
# 取出一个最近通信过的worker
worker_addr, t = workers.popitem()
# 判断是否心跳过期 过期则重新取worker
if t - now > 1:
continue
msg = msg_cache.pop(0)
# 转发缓存的消息
backend.send_multipart([worker_addr, msg[0], msg[1]])
# 接收前端消息
if frontend in socks and socks[frontend] == zmq.POLLIN:
# 获取客户端地址和请求内容ps: 此处的client_addr, request均是bytes类型
client_addr, request = frontend.recv_multipart()
clients[client_addr] = 1
while len(workers) > 0:
# 取出一个最近通信过的worker
worker_addr, t = workers.popitem()
# 判断是否心跳过期 过期则重新取worker
if t - now > 1:
continue
# 转发消息
backend.send_multipart([worker_addr, client_addr, request])
break
else:
# while正常结束说明消息未被转发,存入缓存
msg_cache.append([client_addr, request])
worker.py
import zmqif __name__ == '__main__':
context = zmq.Context()
socket = context.socket(zmq.DEALER)
# 设置接收消息超时时间为1秒
socket.setsockopt(zmq.RCVTIMEO, 1000)
socket.connect("tcp://localhost:13000")
# 发送心跳到broker注册worker
socket.send_multipart([b"heart", b""])
while True:
try:
# 获取客户端地址和消息内容
client_addr, message = socket.recv_multipart()
except Exception as e:
# 超时 重新发送心跳
print(e)
socket.send_multipart([b"heart", b""])
continue
# 处理任务
print(client_addr, message)
# 返回response
socket.send_multipart([client_addr, b"world"])
总结:broker自带缓存消息队列,但是broker宕机就会丢失消息;worker主动连接broker并发送心跳包,但是worker接收到消息后宕机同样可能会丢失消息造成client阻塞。实现这个demo只是帮助理解分布式消息转发,不要直接使用。
推荐阅读
- 推荐系统论文进阶|CTR预估 论文精读(十一)--Deep Interest Evolution Network(DIEN)
- Python专栏|数据分析的常规流程
- Python|Win10下 Python开发环境搭建(PyCharm + Anaconda) && 环境变量配置 && 常用工具安装配置
- Python绘制小红花
- Pytorch学习|sklearn-SVM 模型保存、交叉验证与网格搜索
- OpenCV|OpenCV-Python实战(18)——深度学习简介与入门示例
- python|8. 文件系统——文件的删除、移动、复制过程以及链接文件
- 爬虫|若想拿下爬虫大单,怎能不会逆向爬虫,价值过万的逆向爬虫教程限时分享
- 分布式|《Python3网络爬虫开发实战(第二版)》内容介绍
- java|微软认真聆听了开源 .NET 开发社区的炮轰( 通过CLI 支持 Hot Reload 功能)