使用python基于zmq的DEALER-ROUTER模式实现分布式消息分发的demo

【使用python基于zmq的DEALER-ROUTER模式实现分布式消息分发的demo】zmq的DEALER套接字是对REQ的一层包装,本质就是在发送数据前发送一段bytes数据(所以一个bytes数据就意味着一个连接),而ROUTER就是对REP的包装,是在接收数据前先接收bytes数据。REQ-REP可以看作bytes数据为空的DEALER-ROUTER模式,但是DEALER和ROUTER又不像REQ和REP一样严格遵循send-recv-send-recv.....的模式。
总体分为client-broker-worker三个部分,简化如下图所示。
使用python基于zmq的DEALER-ROUTER模式实现分布式消息分发的demo
文章图片


broker的伪码流程图:
使用python基于zmq的DEALER-ROUTER模式实现分布式消息分发的demo
文章图片

附代码:
client.py

import zmqctx = zmq.Context.instance() socket = ctx.socket(zmq.DEALER) socket.connect("tcp://localhost:12000") if __name__ == '__main__': socket.send(b"hello") msg = socket.recv() print(msg)

broker.py
import zmq import time from collections import OrderedDictcontext = zmq.Context.instance() frontend = context.socket(zmq.ROUTER) frontend.bind("tcp://*:12000") backend = context.socket(zmq.ROUTER) backend.bind("tcp://*:13000") frontend.setsockopt(zmq.RCVHWM, 100) backend.setsockopt(zmq.RCVHWM, 100)workers = OrderedDict() clients = {} msg_cache = [] poll = zmq.Poller()poll.register(backend, zmq.POLLIN) poll.register(frontend, zmq.POLLIN)if __name__ == '__main__': while True: socks = dict(poll.poll(10)) now = time.time() # 接收后端消息 if backend in socks and socks[backend] == zmq.POLLIN: # 接收后端地址、客户端地址、后端返回responseps: 此处的worker_addr, client_addr, reply均是bytes类型 worker_addr, client_addr, response = backend.recv_multipart() # 把后端存入workers workers[worker_addr] = time.time() if client_addr in clients: # 如果客户端地址存在,把返回的response转发给客户端,并删除客户端 frontend.send_multipart([client_addr, response]) clients.pop(client_addr) else: # 客户端不存在 print(worker_addr, client_addr) # 处理所有未处理的消息 while len(msg_cache) > 0 and len(workers) > 0: # 取出一个最近通信过的worker worker_addr, t = workers.popitem() # 判断是否心跳过期 过期则重新取worker if t - now > 1: continue msg = msg_cache.pop(0) # 转发缓存的消息 backend.send_multipart([worker_addr, msg[0], msg[1]]) # 接收前端消息 if frontend in socks and socks[frontend] == zmq.POLLIN: # 获取客户端地址和请求内容ps: 此处的client_addr, request均是bytes类型 client_addr, request = frontend.recv_multipart() clients[client_addr] = 1 while len(workers) > 0: # 取出一个最近通信过的worker worker_addr, t = workers.popitem() # 判断是否心跳过期 过期则重新取worker if t - now > 1: continue # 转发消息 backend.send_multipart([worker_addr, client_addr, request]) break else: # while正常结束说明消息未被转发,存入缓存 msg_cache.append([client_addr, request])

worker.py
import zmqif __name__ == '__main__': context = zmq.Context() socket = context.socket(zmq.DEALER) # 设置接收消息超时时间为1秒 socket.setsockopt(zmq.RCVTIMEO, 1000) socket.connect("tcp://localhost:13000") # 发送心跳到broker注册worker socket.send_multipart([b"heart", b""]) while True: try: # 获取客户端地址和消息内容 client_addr, message = socket.recv_multipart() except Exception as e: # 超时 重新发送心跳 print(e) socket.send_multipart([b"heart", b""]) continue # 处理任务 print(client_addr, message) # 返回response socket.send_multipart([client_addr, b"world"])

总结:broker自带缓存消息队列,但是broker宕机就会丢失消息;worker主动连接broker并发送心跳包,但是worker接收到消息后宕机同样可能会丢失消息造成client阻塞。实现这个demo只是帮助理解分布式消息转发,不要直接使用。

    推荐阅读