学习笔记|使用rocketmq,rocketmq-client-python的api开发rocketmq生产者和消费者

rocketmq-python 是一个基于 rocketmq-client-cpp 封装的 RocketMQ Python 客户端。
rocketmq-client-python安装 目前rocketmq库只支持linux和mac。
rocketmq-client-python 的安装:

pip install rocketmq

安装太慢?国内源安装:
pip install rocketmq -i https://pypi.tuna.tsinghua.edu.cn/simple

示例代码:
Producer
from rocketmq.client import Producer, Messageproducer = Producer('PID-XXX') producer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')#rocketmq队列接口地址(服务器ip:port) # For ip and port name server address, use `set_namesrv_addr` method, for example: # producer.set_namesrv_addr('127.0.0.1:9887') producer.set_session_credentials('XXX', 'XXXX', 'ALIYUN')#可以不使用 producer.start()msg_body = {"id":"test_id","name":"test_name","message":"test_message"} ss = json.dumps(msg_body).encode('utf-8')msg = Message('YOUR-TOPIC') #topic名称 msg.set_keys('XXX')#每个消息在业务层面的唯一标识码,要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic,key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。 msg.set_tags('XXX')#一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用tags在broker做消息过滤。 msg.set_body(ss) ret = producer.send_sync(msg) print(ret.status, ret.msg_id, ret.offset) producer.shutdown()

其中:
  • 设置ip:port的位置:producer.set_namesrv_addr('xxx.xxx.xxx.xxx:xxxxx')
当只有单一服务器时,格式是上面这个;
当有多个服务器地址(集群模式)时,可以使用:producer.set_namesrv_addr("xxx.xxx.xxx.xxx:xxxxx,xxx.xxx.xxx.xxx:xxxxx")
  • 如果使用pandas数据,pandas数据可以直接转换
df.to_json(orient='records').encode('utf-8'),然后放入body中发送。
  • 不同应用的多个Topic使用同一个namesrv_addr时数据传输会发生冲突
解决方案:每一个Topic对应一个 “PID-XXX”
PushConsumer
import timefrom rocketmq.client import PushConsumerdef callback(msg): print(msg.id, msg.body)consumer = PushConsumer('CID_XXX') consumer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet') # For ip and port name server address, use `set_namesrv_addr` method, for example: # consumer.set_namesrv_addr('127.0.0.1:9887') consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') consumer.subscribe('YOUR-TOPIC', callback) consumer.start()while True: time.sleep(3600)consumer.shutdown()

PullConsumer
from rocketmq.client import PullConsumerconsumer = PullConsumer('CID_XXX') consumer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet') # For ip and port name server address, use `set_namesrv_addr` method, for example: # consumer.set_namesrv_addr('127.0.0.1:9887') consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') consumer.start()for msg in consumer.pull('YOUR-TOPIC'): print(msg.id, msg.body)consumer.shutdown()

控制日志的输出频率
from rocketmq.client import dll dll.SetPushConsumerLogLevel(namesrv_addr.encode('utf-8'), 1)

ffi.py
class _CLogLevel(CtypesEnum): FATAL = 1 ERROR = 2 WARN = 3 INFO = 4 DEBUG = 5 TRACE = 6 LEVEL_NUM = 7

log4j定义了8个级别的log(除去OFF和ALL,可以说分为6个级别),优先级从高到低依次为:OFF、FATAL、ERROR、WARN、INFO、DEBUG、TRACE、 ALL。
ALL 最低等级的,用于打开所有日志记录。
TRACE designates finer-grained informational events than the DEBUG.Since:1.2.12,很低的日志级别,一般不会使用。
DEBUG 指出细粒度信息事件对调试应用程序是非常有帮助的,主要用于开发过程中打印一些运行信息。
INFO 消息在粗粒度级别上突出强调应用程序的运行过程。打印一些你感兴趣的或者重要的信息,这个可以用于生产环境中输出程序运行的一些重要信息,但是不能滥用,避免打印过多的日志。
WARN 表明会出现潜在错误的情形,有些信息不是错误信息,但是也要给程序员的一些提示。
ERROR 指出虽然发生错误事件,但仍然不影响系统的继续运行。打印错误和异常信息,如果不想输出太多的日志,可以使用这个级别。
FATAL 指出每个严重的错误事件将会导致应用程序的退出。这个级别比较高了。重大错误,这种级别你可以直接停止程序了。
应用案例 PushConsumer
import json from rocketmq.client import PushConsumer, dll import traceback import loggingclass RocketMQ(): def __init__(self): logging.basicConfig(level=logging.CRITICAL, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') self.logger = logging.getLogger(__name__)self.consumer = PushConsumer("PID-XXX") self.consumer.set_namesrv_addr("XX.XX.XX.XX:XXXX") self.topic_name = "xxx"#减少日志输出 dll.SetPushConsumerLogLevel(namesrv_addr.encode('utf-8'), 1)def callback(self,msg): test_body = json.loads(msg.body) try: self.my_func(test_body) return PushConsumer except Exception as e: print('>>>>>>>>>>allback msg:\n{}'.format(es_body)) print('>>>>>>>>>>callback error:\n{}'.format(e)) return PushConsumerdef onMessage(self): self.consumer.subscribe(self.topic_name, self.callback) self.consumer.start() while True: time.sleep(2) self.consumer.shutdown()def my_func(test_body): print(test_body)if __name__ == '__main__': mq = RocketMQ() mq.onMessage()

Producer
from rocketmq.client import Producer, Message import jsonproducer = Producer("PID-XXX") producer.set_namesrv_addr('XX.XX.XX.XX:XXXX') producer.start()topic_name = "xxx" key_name = "abc" tags = "123"msg_body = { "key_1":value_1, "key_2":value_2 }ss = json.dumps(msg_body).encode('utf-8')msg = Message(topic_name) msg.set_keys(key_name) msg.set_tags(tags) msg.set_body(ss) ret = producer.send_sync(msg) print(ret.status, ret.msg_id, ret.offset) producer.shutdown()

PullConsumer
from rocketmq.client import PullConsumer consumer = PullConsumer("PID-XXX") consumer.set_namesrv_addr("XX.XX.XX.XX:XXXX") consumer.start() while True: topic_name = "xxx" for msg in consumer.pull(topic_name):print(msg.id, msg.body)

Topic
Topic创建的核心步骤如下
  • 1、mqadmin向broker发起创建Topic的命令。
  • 2、broker生成Topic对应的topicConfig配置保存在broker的TopicConfigManager中。
  • 3、broker向所有的namesrv上报topicConfig信息。
  • 4、namesrv的RouteInfoManager的topicQueueTable保存topic的QueueData信息。
  • 5、broker会通过定时任务定期向namesrv发送心跳信息更新topic配置。
usage: mqadmin updateTopic -b| -c[-h] [-n ] [-o ] [-p ] [-r ] [-s ] -t [-u ] [-w ] -b,--brokerAddrcreate topic to which broker -c,--clusterNamecreate topic to which cluster -h,--helpPrint help -n,--namesrvAddrName server address list, eg: 192.168.0.1:9876; 192.168.0.2:9876 -o,--orderset topic's order(true|false) -p,--permset topic's permission(2|4|6), intro[2:W 4:R; 6:RW] -r,--readQueueNumsset read queue nums -s,--hasUnitSubhas unit sub (true|false) -t,--topictopic name -u,--unitis unit topic (true|false) -w,--writeQueueNumsset write queue nums

  • 通过 --brokerAddr在指定的broker创建topic。
  • 通过 --clusterName在整个集群创建topic。
  • 通过 --namesrvAddr指定namesrv地址。
  • 通过 --topic来指定topic名称。
  • 通过 --perm来指定Topic的权限管理。
在rocketmq中添加新的Topic
sh mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t orderTopic

创建Topic时报错解决方案
Java HotSpot? 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0 Java HotSpot? 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0 org.apache.rocketmq.tools.command.SubCommandException: UpdateTopicSubCommand command failed at org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:181) at org.apache.rocketmq.tools.command.MQAdminStartup.main0(MQAdminStartup.java:135) at org.apache.rocketmq.tools.command.MQAdminStartup.main(MQAdminStartup.java:86) Caused by: org.apache.rocketmq.acl.common.AclException: [10015:signature-failed] unable to calculate a request signature. error=[10015:signature-failed] unable to calculate a request signature. error=Algorithm HmacSHA1 not available at org.apache.rocketmq.acl.common.AclSigner.signAndBase64Encode(AclSigner.java:84) at org.apache.rocketmq.acl.common.AclSigner.calSignature(AclSigner.java:73) at org.apache.rocketmq.acl.common.AclSigner.calSignature(AclSigner.java:68) at org.apache.rocketmq.acl.common.AclUtils.calSignature(AclUtils.java:58) at org.apache.rocketmq.acl.common.AclClientRPCHook.doBeforeRequest(AclClientRPCHook.java:44) at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.doBeforeRpcHooks(NettyRemotingAbstract.java:172) at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:370) at org.apache.rocketmq.client.impl.MQClientAPIImpl.getBrokerClusterInfo(MQClientAPIImpl.java:1180) at org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl.examineBrokerClusterInfo(DefaultMQAdminExtImpl.java:275) at org.apache.rocketmq.tools.admin.DefaultMQAdminExt.examineBrokerClusterInfo(DefaultMQAdminExt.java:222) at org.apache.rocketmq.tools.command.CommandUtil.fetchMasterAddrByClusterName(CommandUtil.java:83) at org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:154) … 2 more Caused by: org.apache.rocketmq.acl.common.AclException: [10015:signature-failed] unable to calculate a request signature. error=Algorithm HmacSHA1 not available at org.apache.rocketmq.acl.common.AclSigner.sign(AclSigner.java:63) at org.apache.rocketmq.acl.common.AclSigner.signAndBase64Encode(AclSigner.java:79) … 13 more Caused by: java.security.NoSuchAlgorithmException: Algorithm HmacSHA1 not available at javax.crypto.Mac.getInstance(Mac.java:181) at org.apache.rocketmq.acl.common.AclSigner.sign(AclSigner.java:57) … 14 more

解决办法是:
1.进入rocketmq的bin目录下:/var/www/rocketmq/rocketmq-all-4.4.0-bin-release/bin,
/var/www/rocketmq是我自己的安装路径。
2.用vim tools.sh打开tools.sh.在JAVA_OPT配置中,在-Djava.ext.dirs这一行的后面添加ext的路径,原配置如下
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=128m" JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext" JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"

添加ext文件的绝对路径,添加后重新执行命令即可
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=128m" JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:/usr/java/jdk1.8.0_65/jre/lib/ext" JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"

删除Topic
deleteTopic -n localhost:9876 -c DefaultCluster -t orderTopic

rocketmq查看命令
首先进入 RocketMQ 工程,进入/RocketMQ/bin在该目录下有个 mqadmin 脚本 .
查看帮助:在 mqadmin 下可以查看有哪些命令
a: 查看具体命令的使用 : sh mqadmin
b: sh mqadmin help 命令名称
例如,查看 updateTopic 的使用
sh mqadmin help updateTopic
2. 关闭nameserver和所有的broker:
进入到bin下:
sh mqshutdown namesrv sh mqshutdown broker

3. 查看所有消费组group:
sh mqadmin consumerProgress -n 192.168.1.23:9876

4. 查看指定消费组下的所有topic数据堆积情况:
sh mqadmin consumerProgress -n 192.168.1.23:9876 -g warning-group

5. 查看所有topic :
sh mqadmin topicList -n 192.168.1.23:9876

6. 查看topic信息列表详情统计
sh mqadmin topicstatus -n 192.168.1.23:9876 -t topicWarning

7.新增topic
sh mqadmin updateTopic –n 192.168.1.23:9876 –c DefaultCluster –t topicWarning

8. 删除topic
sh mqadmin deleteTopic –n 192.168.1.23:9876 –c DefaultCluster –t topicWarning

9、查询集群消息
sh mqadminclusterList -n 192.168.1.23:9876


Reference https://github.com/apache/rocketmq-client-python
https://www.oschina.net/p/rocketmq-python
https://www.cnblogs.com/qi-yuan-008/p/14022378.html
https://blog.csdn.net/shiyong1949/article/details/52643711
https://www.jianshu.com/p/b84190af20a8
【学习笔记|使用rocketmq,rocketmq-client-python的api开发rocketmq生产者和消费者】https://www.cnblogs.com/gmq-sh/p/6232633.html

    推荐阅读