周边生态|RoP|周边生态|RoP 重磅发布 0.2.0 版本( 架构全新升级,消息准确性达 100%)
作者简介 冉小龙-腾讯云中间件团队研发工程师,Apache Pulsar committer、RoP 作者及 Maintainer、Apache BookKeeper contributor,Apache Pulsar Go client、Apache Pulsar Go Functions、StreamNative/pulsarctl 作者。摘要 日前,腾讯云中间件团队联合 StreamNative 团队正式发布了 RoP 0.2.0 版本,该版本在架构上全新升级,用户在使用中可以完全避免消息丢失、消息重复消费、只能消费一部分 Partition 的数据等问题。
RoP 的定义 与 KoP、MoP 和 AoP 相似,RoP 是一种可插拔的协议处理插件。
将 RoP 协议处理插件添加到现有 Pulsar 集群后,用户无需修改代码,便能将现有的 RocketMQ 应用程序和服务迁移到 Pulsar,同时还能使用 Pulsar 的强大功能,例如:
- 计算与存储分离
- 多租户
- 跨地域复制
- 分层分片
- 轻量化计算框架 -- Pulsar Functions
- ......
今天,我们重磅发布 RoP 0.2.0,该版本在架构上全新升级,在功能和稳定性上得到了更大的提升。提供了 ACL 鉴权和验证的功能,可以更好的确保用户数据的安全性,同时允许用户对 Partitioned Topic 进行扩容,可以获得更好的并发写入能力, 并且完善了 RocketMQ 原生的管控端接口,可以更好的对服务进行处理和监控。
最新功能优化 在 0.2.0 版本中,腾讯云中间件团队在 0.1.0 的架构上进行全新设计,对 MessageID、消息路由模型进行重构,确保不同场景下 RoP 消息的准确性。
主要有以下三点优化内容:
1. 支持 RoP ACL 功能
ACL 机制是 RocketMQ 社区自带的一个能力,可以很好的对用户的数据进行鉴权和认证。RoP 0.2.0 版本复用了 RocketMQ 自身的 Hook 实现,利用 Pulsar 自身的鉴权机制,实现了对用户数据进行鉴权和认证的功能。
RoP ACL 的使用方式依旧延续了 RocketMQ 的使用方式,只需定义
ACL_ACCESS_KEY
和 ACL_SECRET_KEY
字段,然后利用 RocketMQ 的 ACLRPCHook 函数加载即可,这样可以确保用户尽可能少的改动客户端的业务代码逻辑。具体代码示例如下:
private static final String ACL_ACCESS_KEY = "eyJrZXlJZCI6InJvY2tldG1xLW13bmI3bWFwMjhqZSIsImFsZyI6IkhTMjU2In0."
+ "eyJzdWIiOiJyb2NrZXRtcS1td25iN21hcDI4amVfdGVzdCJ9.BDOjqqY25a6apnZTMZCqg0I0pxVFcqz7fvZbaTqkf5U";
// token
private static final String ACL_SECRET_KEY = "rop";
public static void producer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("rocketmq-mwnb7map28je|nit", "ProducerGroupName",
getAclRPCHook());
...
}
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY, ACL_SECRET_KEY));
}
- ACL_ACCESS_KEY:即用户在 Namespace 级别下创建的 Token。
- ACL_SECRET_KEY:固定值,在 RoP 内部解析时,不会使用到这个字段。
RocketMQ 与 Kafka 类似,都是使用 64 位的 Offset 来唯一标识一条消息,但是在 Pulsar 中,使用 64 位的 LedgerID、64 位的 EntryID 来唯一标识一条消息。针对这个问题,在 RoP 0.1.0 中,我们使用如下的形式来构造 MessageID 对象:
文章图片
- PartitionID: 8 位,可以允许一个 Topic 最多创建 256 个 Partitions
- LedgerID: 32 位
- EntryID: 24 位
PIP-70 是使用插件的方式进行加载的,所以在服务启动时,我们需要做如下配置:
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
Note: Broker Entry Metadata 是在 Pulsar 2.8.0 的版本中才支持的,所以需要确保 Pulsar Broker 的版本在 2.8.0 及以上。需要说明的是,RocketMQ 和 Kafka 在 Offset 的使用方式上又有所不同。RocketMQ 中有两个 Offset,一个是 Queue Offset,用来表示消息在 MessageQueue 中的位置,MessageQueue 本质上是一个数组,一条消息进来数组的下标就会 +1。一个是 CommitLog Offset 用来表示消息存储在 CommitLog 中的位置,消息存储是由 ConsumeQueue 和 CommitLog 配合完成,ConsumeQueue 是逻辑队列,CommitLog 是真正存储消息文件的,ConsumeQueue 存储的是指向物理存储的地址。Topic 下的每个 MessageQueue 都有对应的 ConsumeQueue 文件,内容也会被持久化到磁盘。
所以,在 MessageID 重构的实现中,区别于 Kafka 中只有一个全局的 Offset 来标识消息的唯一性,在 RoP 中需要针对这两种 Offset 的情况分别进行处理,具体如下:
文章图片
- RESERVED_BITS: 1 字节的保留位,避免第一个字节出现负数等情况导致 Offset 计算有误。
- RETRY_TOPIC_TAG_BITS:1 字节的标记为,用来标识这个 Topic 是否为 Retry 类型的 Topic
- PULSAR_PARTITION_ID_BITS:10 字节的 Partition Num,用来记录一个 PartitionedTopic 下有多少个 Partitions,最大支持 1024 个 Partitions。
- OFFSET_BITS:52 字节用来标识消息的 Offset。
在 RoP 0.1.0 的版本中,在消息路由的实现上,RocketMQ 和 Pulsar 都是首先通过 Topic 查找 的操作找到对应的 Owner Broker 节点,然后将该 Broker 的地址返回。但是在这个动作中,忽略了一个重要的问题,即 RocketMQ 与 Kafka 和 Pulsar 都是不同的,它的 Queue 不是全局唯一的。
RocketMQ 路由协议主要包括两部分:
- Broker 服务的 IP 地址信息;
- 某个 Broker 上对应的 Topic 分区总数以及分区可读写信息。
因此 RocketMQ 协议中,客户端只需要获取到 Topic 对应 Broker 上分区总数,就能通过计算获得该 Broker 上分区的 ID;所有的请求都是基于【Broker-Tag】+【Broker-Topic-Seq】构建唯一路由查询原语来请求服务。简单来说:RocketMQ 的分区是有状态的,他绑定在特定的 Broker 之上;分区一旦分配在某个 Broker 上,终身与之相关且不能迁移。客户端解析分区路由信息是通过计算得到;比如:某个 TopicA 有 5 个分区,分别落在 BrokerA 和 BrokerB 上,BrokerA 有 3 个,BrokerB 有 2 个;那么协议记录为(BrokerA,3)(BrokerB,2),客户端通过计算就得到全部的分区数据:
- BrokerA-TopicA-0,BrokerA-TopicA-1,BrokerA-TopicA-2
- BrokerB-TopicA-0,BrokerB-TopicA-1
GET_ROUTEINTO_BY_TOPIC
这个协议请求去和 Pulsar 的 查找 协议去做映射。本质原因是像 Kafka/Pulsar 这种,它的 Partition 信息是全局唯一的,在执行 Topic 路由策略之后,能准确的返回某一个 Topic 的 Partition 所对应的 Owner Broker 是谁。但是 RocketMQ 的 Topic 路由返回的是两个字段,一个是 Broker Name,一个是 Queue 的数量。具体的 QueueID,是 Client 根据 Broker 返回的数量固定的从 0 开始递增计算。所以在 Topic 的路由映射中,RocketMQ 和 Pulsar 自身的路由协议没办法一一映射。为了解决这个问题,在 RoP 0.2.0 中,抽象了一层 Proxy 用来维护 Topic 与 Broker 之间的映射关系。为了达到这个目的,这里主要有以下几方面的事情需要考虑:- 这些映射关系存储在哪里?
- 如何分配路由关系?
- 当路由关系发生变化之后,如何处理?
针对第二个问题,我们是在 RoP 接口创建分区主题的同时,依次 查找 各个分区所在的 Broker 节点,依照初始主题所在节点信息为基准,将映射关系写入到 ZooKeeper 集群中。这样做的好处在于:
- 复用 Pulsar 自身分区分配机制计算的结果,实现简单。
- 初始分配后,虚拟节点和物理节点处于一个节点上,性能好。
- 如果配合路由关系重平衡能力,可以实现最优性能。
文章图片
所以,为了保证 RoP 集群能有较好的容错能力,在部署 RoP 集群中建议使用偶数台节点。可以通过如下参数配置决定当前 Master 节点有几个 Slave 节点作为其备份节点:
RoPBrokerReplicationNum=2
假设有 6 台 Broker 节点,RoPBrokerReplicationNum=2, 那么就说明此时只有三台 Master Broker 节点对外提供服务。但是对于 Pulsar 来说,Broker 节点之间是 对等 的,当创建 Topic 的时候,可能会分配到任意节点上,所以对于不在 Owner Broker 节点上的请求,在 RoP Proxy 层做了一层代理,会先对该 Topic 进行 查找 的操作,然后将请求转发到 Owner Broker 的节点上来返回。
未来规划 为了更好的践行开源协同和开源共建的理念,目前,上述功能均已贡献回社区。除此之外针对 RocketMQ 商业版本的任意延迟消息功能,腾讯云中间件团队也基于 Pulsar 原生的特性开发了相关的插件来进行支持。RoP 的延迟消息功能除了支持多级别的延迟消息之外还具备支持任意延迟消息的能力。
之后,腾讯云中间件团队将会在确保 RoP 项目稳定的同时,持续开发 RoP 相关的功能,诸如消息轨迹,消息查询和回溯以及监控等能力,进一步完善 RoP 的功能以及周边生态。
RoP 项目地址:https://github.com/streamnati...
特别鸣谢 感谢腾讯云中间件团队韩明泽和张勇华对本文提供的技术细节校验和支持。
关注公众号「Apache Pulsar」,获取干货与动态
加入 Apache Pulsar 中文交流群
文章图片
【周边生态|RoP|周边生态|RoP 重磅发布 0.2.0 版本( 架构全新升级,消息准确性达 100%)】点击链接 ,下载最新版本 RoP!
推荐阅读
- 湖州大学生实践团开展系列活动践行生态文明理念
- 你是比特币生态圈的哪种角色(沽空者、退出者还是留守者?)
- K8S|K8S 生态周报| Istio 即将发布重大安全更新,多个版本受影响
- 中途再整理,坚定生态心
- 运行报错Cannot|运行报错Cannot find module '@babel/compat-data/corejs3-shipped-proposals’
- 样板项目乐驿享生活圈-S2B2C2S生态盈利系统
- react学习笔录(二)
- 创建、执行存储过程
- 购物list(手机周边)
- iOS|iOS -Cannot synthesize weak property because the current deployment target does n