mq从零开始实现 mq-07-负载均衡 load balance

于今腐草无萤火,终古垂杨有暮鸦。这篇文章主要讲述mq从零开始实现 mq-07-负载均衡 load balance相关的知识,希望能为你提供帮助。
前景回顾【mq】从零开始实现 mq-01-生产者、消费者启动
【mq】从零开始实现 mq-02-如何实现生产者调用消费者?
【mq】从零开始实现 mq-03-引入 broker 中间人
【mq】从零开始实现 mq-04-启动检测与实现优化
【mq】从零开始实现 mq-05-实现优雅停机
【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat
【mq】从零开始实现 mq-07-负载均衡 load balance
为什么需要负载均衡大家好,我是老马。
这一节让我们看一下如何实现 MQ 的负载均衡。
为什么需要负载均衡呢?
作用负载均衡最核心的作用:
(1)可以避免单点故障
(2)可以让请求均分的分散到每一个节点
实现思路负载均衡实现的方式比较多,最简单的就是随机选择一个。
拓展阅读:

mq从零开始实现 mq-07-负载均衡 load balance

文章图片

MQ 中用到负载均衡的地方 生产者发送生产者发送消息时,可以发送给任一 broker。
broker 推送给消费者broker 接收到消息以后,在推送给消费者时,也可以任一选择一个。
消费者的消费 ACK消费者消费完,状态回执给 broker,可以选择任一一个。
消息黏连有些消息比较特殊,比如需要保证消费的有序性,可以通过 shardingKey 的方式,在负载的时候固定到指定的片区。
代码实现 生产者发送统一调整获取 channel 的方法。
@Override public Channel getChannel(String key) // 等待启动完成 while (!statusManager.status()) log.debug("等待初始化完成..."); DateUtil.sleep(100); RpcChannelFuture rpcChannelFuture = RandomUtils.loadBalance(this.loadBalance, channelFutureList, key); return rpcChannelFuture.getChannelFuture().channel();

工具类实现为核心实现:
/** * 负载均衡 * * @param list 列表 * @param key 分片键 * @return 结果 * @since 0.0.7 */ public static < T extends IServer> T loadBalance(final ILoadBalance< T> loadBalance, final List< T> list, String key) if(CollectionUtil.isEmpty(list)) return null; if(StringUtil.isEmpty(key)) LoadBalanceContext< T> loadBalanceContext = LoadBalanceContext.< T> newInstance() .servers(list); return loadBalance.select(loadBalanceContext); // 获取 code int hashCode = Objects.hash(key); int index = hashCode % list.size(); return list.get(index);

如果指定了 shardingKey,那么根据 shadringKey 进行 hash 判断。
如果没有,则进行默认的负载均衡策略。
Broker 消息推送给消费者消费者订阅列表的获取:
@Override public List< Channel> getSubscribeList(MqMessage mqMessage) final String topicName = mqMessage.getTopic(); Set< ConsumerSubscribeBo> set = subscribeMap.get(topicName); if(CollectionUtil.isEmpty(set)) return Collections.emptyList(); //2. 获取匹配的 tag 列表 final List< String> tagNameList = mqMessage.getTags(); Map< String, List< ConsumerSubscribeBo> > groupMap = new HashMap< > (); for(ConsumerSubscribeBo bo : set) String tagRegex = bo.getTagRegex(); if(hasMatch(tagNameList, tagRegex)) //TODO: 这种设置模式,统一添加处理 haven String groupName = bo.getGroupName(); List< ConsumerSubscribeBo> list = groupMap.get(groupName); if(list == null) list = new ArrayList< > (); list.add(bo); groupMap.put(groupName, list); //3. 按照 groupName 分组之后,每一组只随机返回一个。最好应该调整为以 shardingkey 选择 final String shardingKey = mqMessage.getShardingKey(); List< Channel> channelList = new ArrayList< > (); for(Map.Entry< String, List< ConsumerSubscribeBo> > entry : groupMap.entrySet()) List< ConsumerSubscribeBo> list = entry.getValue(); ConsumerSubscribeBo bo = RandomUtils.loadBalance(loadBalance, list, shardingKey); final String channelId = bo.getChannelId(); BrokerServiceEntryChannel entryChannel = registerMap.get(channelId); if(entryChannel == null) log.warn("channelId:对应的通道信息为空", channelId); continue; channelList.add(entryChannel.getChannel()); return channelList;

核心逻辑:RandomUtils.loadBalance(loadBalance, list, shardingKey); 获取,其他的保持不变。
消费者 ACK消费者也是类似的,获取 channel 的方式调整如下:
public Channel getChannel(String key) // 等待启动完成 while (!statusManager.status()) log.debug("等待初始化完成..."); DateUtil.sleep(100); RpcChannelFuture rpcChannelFuture = RandomUtils.loadBalance(loadBalance, channelFutureList, key); return rpcChannelFuture.getChannelFuture().channel();

小结负载均衡在分布式服务中,是必备的特性之一。实现的原理并不算复杂。
希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。
我是老马,期待与你的下次重逢。
开源地址拓展阅读【mq从零开始实现 mq-07-负载均衡 load balance】rpc-从零开始实现 rpc https://github.com/houbb/rpc

    推荐阅读