RocketMQ学习二-Producer临时

Namespace,顾名思义,命名空间,为消息发送者、消息消费者编入到一个命名空间中。在笔者的理解中,Namespace 的引入,有点类似 RocketMQ 支持多环境、多标签、全链路压测场景。
一言以蔽之,Namespace 主要为消息发送者、消息消费者进行分组,底层的逻辑是改变 Topic 的名称。
消息轨迹:支持跟踪消息发送、消息消费的全过程,即能跟踪消息的发送 IP、存储服务器,什么时候被哪个消费者消费。
ACL:访问权限控制,即可以 Topic 消息发送、订阅进行授权,只有授权用户才能发送消息到指定 Topic。
msgId的生成:
https://www.cnblogs.com/allen...
https://www.cnblogs.com/linli...
消息发送方式
RocketMQ 支持同步、异步、Oneway 三种发送方式。
同步:客户端发起一次消息发送后会同步等待服务器的响应结果。
异步:客户端发起一下消息发起请求后不等待服务器响应结果而是立即返回,这样不会阻塞客户端子线程,当客户端收到服务端(Broker)的响应结果后会自动调用回调函数。
Oneway:客户端发起消息发送请求后并不会等待服务器的响应结果,也不会调用回调函数,即不关心消息的最终发送结果。
队列选择机制
RocketMQ 支持队列级别的顺序消费,故我们只需要在消息发送的时候如果将同一个订单号的不同的消息发送到同一个队列,这样在消费的时候,我们就能按照顺序进行处理。
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)

throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

指定selector
RocketMQ key的使用场景,可以根据key查询消息(控制台,接口)。还有根据消息偏移量、消息全局唯一 msgId查询,这都是kafka所没有的
RocketMQ 可以为 Topic 设置 Tag(标签),这样消费端可以对 Topic 中的消息基于 Tag 进行过滤,即选择性的对 Topic 中的消息进行处理。发送消息时可以根据事件不同流程为消息设置不同的Tag,而消费端进行消费时可以只消费符合自己Tag的消息,API如下:
void subscribe(final String topic, final String subExpression) throws MQClientException;
RocketMQ msgId与offsetMsgId生成规则
消息发送高可用设计与故障规避机制(第5章)
主要参数是sendLatencyFaultEnable
第6章,消息发送错误及常用解决方案
消息发送超时分析,很可能是网络超时
快速失败导致的错误为 system_busy,并不会触发重试
可以将超时等待时间设置短一点,这样容易触发,进而重试
【RocketMQ学习二-Producer临时】System busy、Broker busy原因及应对策略
  1. PageCache 压力较大-使用堆外内存
  2. 发送线程池积压的拒绝策略-扩容
  3. Broker 端快速失败-使用者进行处理,比如重试

    推荐阅读