最近做项目 做了一个分发器 ,需要 根据请求携带的参数 把请求分发到 不同的服务器上面,最终我选择使用 一致性hash 环 来实现 ,本篇 就主要讲解一下 一致性hash环 它的基本原理
一致性hash算法 由于 均衡性 持久性的映射特点 被广泛应用于负载均衡领域,比如 nginx 、dubbo 、等等 内部都有一致性hash 的实现 ,比如 dubbo ,当你调用rpc 接口的时候,如果有2个提供者,那么你可以通过配置 让其调用通过 一致性hash 进行计算 然后分发到具体的 某个实例接口上 。
1.hash算法 在负载均衡中的问题
先来看看普通的hash算法的特点,普通的hash算法就是把一系列输入 打散成随机的数据,负载均衡就是利用这一点特性,对于大量请求调用,通过一定的 hash将它们均匀的散列,从而实现压力平均化
如果上面图中 key作为缓存的key ,node中寸入该key对应的 value,就是一个 简单的分布式缓存系统了。
问题:可以看出 当 N 节点数发生变化的时候 之前所有的 hash映射几乎全部失效,如果集群是无状态的服务 倒是没什么事情,但是如果是 分布式缓存这种,比如 映射的key1 原本是去 node1上查询 缓存的value1, 但是当N节点变化后 hash后的 key1 可能去了 node2 这样 就产生了致命问题。。
2.一致性 hash 算法
一致性hash算法就是来解决 上面的问题
2.1 特点 (重要)
下面说明 一致性hash算法的 2个 重要的特点
  • 平衡性
  • 单调性
2.2 原理 一致性哈希将整个哈希值空间组织成一个虚拟的圆环,如假设某哈希函数H的值空间为0-2^32-1(即哈希值是一个32位无符号整形),整个哈希空间环如下:
就是所有的输入值都被映射到 0-2^32-1 之间,组成一个圆环
一致性|一致性 hash 环

一致性|一致性 hash 环

**定位数据存储的方法: **将数据key使用相同的函数Hash计算出哈希值,并确定此数据在环上的位置,从此位置沿环顺时针“行走”,第一台遇到的服务器就是其应该定位到的服务器。可以为这些服务器维护一条二分查找树,定位服务器的过程就是在二分查找树中找刚好比其大的节点。
例如我们有Object A、Object B、Object C、Object D四个数据对象,经过哈希计算后,在环空间上的位置如下:
一致性|一致性 hash 环

2.3 一致性hash 优点 如上图,假如当节点A宕机了,那么只会影响 objectA 它会被重新映射到 NodeB节点,其他的ObjectB ObjectC ObjectD 都不会受到影响,大大提高了容错性和稳定性
2.4 一致性hash存在的问题 2.3.1 数据分布不均匀 【一致性|一致性 hash 环】当节点Node很少的时候 比如2台机器,那么 必然造成大量数据集中在NodeA,少量在NodeB
2.3.2 雪崩 当某个节点宕机后,原本属于它的请求 都会被重新hash 映射到 下游节点,会突然造成下游节点压力过大 有可能也会造成 下游节点宕机,从而形成雪崩 这是致命的
为此 引入了 虚拟节点来解决上面两个问题
就是会在圆环上 根据Node 节点 生成很多的虚拟节点 分布在圆环上,这样当 某个 节点挂了后 原本属于它的请求,会被均衡的分布到 其他节点上 降低了产生雪崩的情况,也解决了 节点数少导致 请求分布不均的请求
一致性|一致性 hash 环

看上图,此时如果 group3节点挂了,那么请求会被均分到 group2 和 group1上面 ,到此 一致性hash的正确生产的使用方式讲解完了 下面来看看一个案例代码。
4. 代码测试
public class HashUtil { /** * 计算Hash值, 使用FNV1_32_HASH算法 * @param str * @return */ public static int getHash(String str) { final int p = 16777619; int hash = (int)2166136261L; for (int i = 0; i < str.length(); i++) { hash =( hash ^ str.charAt(i) ) * p; } hash += hash << 13; hash ^= hash >> 7; hash += hash << 3; hash ^= hash >> 17; hash += hash << 5; if (hash < 0) { hash = Math.abs(hash); } return hash; } }

package com.weareint.dispatchservice.hashloop; import org.springframework.stereotype.Component; import java.util.*; /** * * *
*一致性 hash 虚拟 环 *

* * @author johnny * @date 2021-08-26 9:22 上午 */ @Component public class HashVirtualNodeCircle {/** 真实集群列表 */ private static List instanceNodes; /** 虚拟节点映射关系 */ private static SortedMap virtualNodes = new TreeMap<>(); /** 虚拟节点数 */ private static final int VIRTUAL_NODE_NUM = 1000; /** 刷新 服务实例 */ public void refreshVirtualHashCircle(HashCircleInstanceNodeBuild build) { // 当集群变动时,刷新hash环,其余的集群在hash环上的位置不会发生变动 virtualNodes.clear(); // 获取 最新节点 instanceNodes = build.instanceNodes(); // 将虚拟节点映射到Hash环上 for (String realInstance : instanceNodes) { for (int i = 0; i < VIRTUAL_NODE_NUM; i++) { String virtualNodeName = getVirtualNodeName(realInstance, i); int hash = HashUtils.getHash(virtualNodeName); System.out.println("[" + virtualNodeName + "] launched @ " + hash); virtualNodes.put(hash, virtualNodeName); } } }private static String getVirtualNodeName(String realName, int num) { return realName + "&&VN" + num; }private static String getRealNodeName(String virtualName) { return virtualName.split("&&")[0]; }private static String getServer(String widgetKey) { int hash = HashUtils.getHash(widgetKey); // 只取出所有大于该hash值的部分而不必遍历整个Tree SortedMap subMap = virtualNodes.tailMap(hash); String virtualNodeName; if (subMap.isEmpty()) { // hash值在最尾部,应该映射到第一个group上 virtualNodeName = virtualNodes.get(virtualNodes.firstKey()); } else { virtualNodeName = subMap.get(subMap.firstKey()); } return getRealNodeName(virtualNodeName); }public static void main(String[] args) { HashVirtualNodeCircle hashVirtualNodeCircle = new HashVirtualNodeCircle(); hashVirtualNodeCircle.refreshVirtualHashCircle( new HashCircleInstanceNodeBuild() { @Override public List instanceNodes() { LinkedList nodes = new LinkedList<>(); nodes.add(""); nodes.add(""); nodes.add(""); return nodes; } }); // 生成随机数进行测试 Map resMap = new HashMap<>(); List plcList = new ArrayList<>(); for (int i = 0; i < 1000; i++) { String plchost = "192.168.0." + i + 1; for (int j = 0; j < 10; j++) { plcList.add(plchost + ":" + j + 100); } }for (int i = 0; i < plcList.size(); i++) { String plcwideget = plcList.get(i); String group = getServer(plcwideget); if (resMap.containsKey(group)) { resMap.put(group, resMap.get(group) + 1); } else { resMap.put(group, 1); } }resMap.forEach( (k, v) -> { System.out.println("group " + k + ": " + v + "(" + v / 100.0D + "%)"); }); System.out.println("========================================="); } }

可以看到 分布很均衡
一致性|一致性 hash 环

5.Dubbo 一致性Hash 实现
最近给公司做的分发器 使用dubbo 调用远程服务,调研了一下 dubbo 也有自己实现的 一致性hash 不过实际使用起来 发现有些bug ,目前通过SPI机制 自己扩展了一下,来看看 dubbo的 一致性hash实现吧
5.1 版本 dubbo2.7.12 我用的版本是 dubbo2.7.12 已经入了 2.7的坑 不少坑都是自己慢慢调试解决的。
5.2 org.apache.dubbo.rpc.cluster.loadbalance 负载均衡基本就这些 一致性hash,随机 ,轮训,。。 没啥特别的
一致性|一致性 hash 环

5.3 dubbo ConsistentHashLoadBalance源码
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements.See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License.You may obtain a copy of the License at * *http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dubbo.rpc.cluster.loadbalance; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.io.Bytes; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.support.RpcUtils; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN; /** * ConsistentHashLoadBalance */ public class ConsistentHashLoadBalance extends AbstractLoadBalance { public static final String NAME = "consistenthash"; /** * Hash nodes name */ public static final String HASH_NODES = "hash.nodes"; /** * Hash arguments name */ public static final String HASH_ARGUMENTS = "hash.arguments"; private final ConcurrentMap> selectors = new ConcurrentHashMap>(); @SuppressWarnings("unchecked") @Override protected Invoker doSelect(List> invokers, URL url, Invocation invocation) { String methodName = RpcUtils.getMethodName(invocation); String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName; // using the hashcode of list to compute the hash only pay attention to the elements in the list //有bug1.invokers 老是变化,导致 不断的 在创建 ConsistentHashSelector // int invokersHashCode = invokers.hashCode(); ConsistentHashSelector selector = (ConsistentHashSelector) selectors.get(key); if (selector == null || selector.identityHashCode != invokersHashCode) { selectors.put(key, new ConsistentHashSelector(invokers, methodName, invokersHashCode)); selector = (ConsistentHashSelector) selectors.get(key); } return selector.select(invocation); }private static final class ConsistentHashSelector {private final TreeMap> virtualInvokers; private final int replicaNumber; private final int identityHashCode; private final int[] argumentIndex; ConsistentHashSelector(List> invokers, String methodName, int identityHashCode) { this.virtualInvokers = new TreeMap>(); this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); //默认虚拟机节点数 160 ,可以通过 /** dubbo: consumer: parameters: hash: nodes: 560 #指定虚拟分片结点数最终virtualInvokers = nodes * invokerCount arguments: 0指定的是 哪个参数作为 key **/ this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160); //默认取 第 0 个参数 作为 hash的key String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0")); argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } for (Invoker invoker : invokers) { String address = invoker.getUrl().getAddress(); for (int i = 0; i < replicaNumber / 4; i++) { byte[] digest = Bytes.getMD5(address + i); for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } }public Invoker select(Invocation invocation) { String key = toKey(invocation.getArguments()); byte[] digest = Bytes.getMD5(key); return selectForKey(hash(digest, 0)); }private String toKey(Object[] args) { StringBuilder buf = new StringBuilder(); for (int i : argumentIndex) { if (i >= 0 && i < args.length) { buf.append(args[i]); } } return buf.toString(); }private Invoker selectForKey(long hash) { Map.Entry> entry = virtualInvokers.ceilingEntry(hash); if (entry == null) { entry = virtualInvokers.firstEntry(); } return entry.getValue(); }private long hash(byte[] digest, int number) { return (((long) (digest[3 + number * 4] & 0xFF) << 24) | ((long) (digest[2 + number * 4] & 0xFF) << 16) | ((long) (digest[1 + number * 4] & 0xFF) << 8) | (digest[number * 4] & 0xFF)) & 0xFFFFFFFFL; } }}

6.Dubbo 的 一致性 hash的 bug 和 一些配置参数
6.1 invokers 不断的变化 经过调试 发现 invokers 时不时变化导致 一致在 rehash,其实 很多时候只是 节点的顺序变化而已
解决办法: 我直接把 invokers 节点数 取出来进行排序后拼接 成一个字符串 进行 计算hashcode 就不会总变化了
String invokeKey = invokers.stream() .filter(Node::isAvailable) // 按照ip 和port 排序 .sorted(Comparator.comparing(invoke -> invoke.getUrl().getIp())) .sorted(Comparator.comparing(invoke -> invoke.getUrl().getPort())) .map(invoke -> invoke.getUrl().getIp() + ":" + invoke.getUrl().getPort()) .collect(Collectors.joining(","));

6.2 注意 isAvailable invokers中存在 节点不可用的,如果对于节点不可用的 直接过滤 需要注意 isAvailable
6.3 设置虚拟节点发片数
dubbo: consumer: parameters: hash: nodes: 560 #指定虚拟分片结点数最终virtualInvokers = nodes * invokerCount ,默认是160

6.4 设置方法的哪个参数作为 hash的key
dubbo: consumer: parameters: hash: arguments: 0#默认就是0

7.SPI 扩展Dubbo 一致性hash 算法 ExtendConsistentHashLoadBalance
7.1 官方文档 官方文档很详细了
一致性|一致性 hash 环

7.2 ExtendConsistentHashLoadBalance 扩展实现
package com.weareint.dispatchservice.extendconsistenthash; import com.atw.mdc.entity.protocol.webRequest.PlcReadSimpleRequest; import com.atw.mdc.entity.protocol.webRequest.PlcWriteSimpleRequest; import com.weareint.dispatchservice.event.ConnectionRouteEvent; import com.weareint.dispatchservice.event.NodeChangeEvent; import com.weareint.dispatchservice.event.NodeChangeService; import lombok.extern.slf4j.Slf4j; import org.apache.dubbo.common.Node; import org.apache.dubbo.common.URL; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.cluster.loadbalance.AbstractLoadBalance; import org.apache.dubbo.rpc.support.RpcUtils; import org.springframework.context.ApplicationEventPublisher; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN; /** * * *
* 扩展 一致性Hash 环主要是把 hash 的key 只通过 plc 的deviceCode 进行hash , 然后后续添加 Redis 路由表 进行 *

* * @author johnny * @date 2021-08-26 5:32 下午 */ @Slf4j public class ExtendConsistentHashLoadBalance extends AbstractLoadBalance {public static ApplicationEventPublisher publisher; public static NodeChangeService nodeChangeService; public static final String NAME = "consistenthash"; /** Hash nodes name */ public static final String HASH_NODES = "hash.nodes"; /** Hash arguments name */ public static final String HASH_ARGUMENTS = "hash.arguments"; private final ConcurrentMap> selectors = new ConcurrentHashMap< String, ExtendConsistentHashLoadBalance.ConsistentHashSelector>(); public ConcurrentMap> getSelectors() { return selectors; }@SuppressWarnings("unchecked") @Override protected Invoker doSelect(List> invokers, URL url, Invocation invocation) { String methodName = RpcUtils.getMethodName(invocation); // String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName; // 只要是 servicekey 就好 String key = invokers.get(0).getUrl().getServiceKey(); // String key = invokers.get(0).getUrl().getParameter("remote.application"); if (log.isDebugEnabled()) { log.info("【remoteApplication:{}】", key); } // using the hashcode of list to compute the hash only pay attention to the elements in the String invokeKey = invokers.stream() .filter(Node::isAvailable) // 按照ip 和port 排序 .sorted(Comparator.comparing(invoke -> invoke.getUrl().getIp())) .sorted(Comparator.comparing(invoke -> invoke.getUrl().getPort())) .map(invoke -> invoke.getUrl().getIp() + ":" + invoke.getUrl().getPort()) .collect(Collectors.joining(",")); if (log.isDebugEnabled()) { log.info("【invokeKey : {}】", invokeKey); } int invokersHashCode = invokeKey.hashCode(); ExtendConsistentHashLoadBalance.ConsistentHashSelector selector = null; // 同时submit 可能会有问题 加个锁 可能会有第一次提交生成 虚拟节点 selector = (ExtendConsistentHashLoadBalance.ConsistentHashSelector) selectors.get(key); // 判断是否invokers 有变化selector.identityHashCode != invokersHashCode if (selector == null || selector.identityHashCode != invokersHashCode) { synchronized (ExtendConsistentHashLoadBalance.class) { selector = (ExtendConsistentHashLoadBalance.ConsistentHashSelector) selectors.get(key); if (selector == null || selector.identityHashCode != invokersHashCode) { // 这个 isAvailable 要存在 否则有bug List> availableInvoker = invokers.stream() .filter(Node::isAvailable) .collect(Collectors.toList()); ConsistentHashSelector tConsistentHashSelector = new ConsistentHashSelector<>( availableInvoker, methodName, invokersHashCode); selectors.put(key, tConsistentHashSelector); selector = (ExtendConsistentHashLoadBalance.ConsistentHashSelector) selectors.get(key); log.info( "【new selector by invokeKey : {} , availableInvoker:{}】", invokeKey, availableInvoker.stream() .map( invoke -> invoke.getUrl().getIp() + ":" + invoke.getUrl().getPort()) .collect(Collectors.joining(","))); NodeChangeEvent event = new NodeChangeEvent(this, tConsistentHashSelector); publisher.publishEvent(event); } } } String hashKey = selector.toKey(invocation.getArguments()); Invoker select = selector.select(hashKey); log.info( "【plcDeviceCode: {} dispatch toip : {}:{}", hashKey, select.getUrl().getIp(), select.getUrl().getPort()); // deviceCode route tableTo Redis ConnectionRouteEvent event = new ConnectionRouteEvent(this, hashKey, select, selector); publisher.publishEvent(event); return select; }public static final class ConsistentHashSelector {private final TreeMap> virtualInvokers; public final List> invokers; private final int replicaNumber; private final int identityHashCode; private final int[] argumentIndex; ConsistentHashSelector(List> invokers, String methodName, int identityHashCode) { this.invokers = invokers; this.virtualInvokers = new TreeMap>(); this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160); String[] index = COMMA_SPLIT_PATTERN.split( url.getMethodParameter(methodName, HASH_ARGUMENTS, "0")); argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } for (Invoker invoker : invokers) { String address = invoker.getUrl().getAddress(); for (int i = 0; i < replicaNumber / 4; i++) { byte[] digest = md5(address + i); for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } }public Invoker select(String key) { byte[] digest = md5(key); return selectForKey(hash(digest, 0)); }@SuppressWarnings("unchecked") public String toKey(Object[] args) { StringBuilder buf = new StringBuilder(); for (int i : argumentIndex) { if (i >= 0 && i < args.length) { // 只取 PlcReadSimpleRequest的 DeviceCode 作为 hash的key if (args[i] instanceof ArrayList) { ArrayList list = (ArrayList) args[i]; buf.append(list.get(0).getDeviceCode()); // 只取 PlcWriteSimpleRequest DeviceCode 作为 hash的key } else if (args[i] instanceof PlcWriteSimpleRequest) { PlcWriteSimpleRequest req = (PlcWriteSimpleRequest) args[i]; buf.append(req.getDeviceCode()); } else if (args[i] instanceof String) { // PlcConnectionRequest req = (PlcConnectionRequest) args[i]; // 关闭连接 String deviceCode = (String) args[i]; buf.append(deviceCode); } else { buf.append(args[i]); } } } return buf.toString(); }private Invoker selectForKey(long hash) { Map.Entry> entry = virtualInvokers.ceilingEntry(hash); if (entry == null) { entry = virtualInvokers.firstEntry(); } return entry.getValue(); }private long hash(byte[] digest, int number) { return (((long) (digest[3 + number * 4] & 0xFF) << 24) | ((long) (digest[2 + number * 4] & 0xFF) << 16) | ((long) (digest[1 + number * 4] & 0xFF) << 8) | (digest[number * 4] & 0xFF)) & 0xFFFFFFFFL; }private byte[] md5(String value) { MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException(e.getMessage(), e); } md5.reset(); byte[] bytes = value.getBytes(StandardCharsets.UTF_8); md5.update(bytes); return md5.digest(); }public int getIdentityHashCode() { return identityHashCode; } } }

7.3 配置spi扩展 一致性|一致性 hash 环


7.4 使用自定义的扩展 loadbalance
@DubboReference(loadbalance = "extendconsistenthash") private IDeviceWriteService deviceWriteService;

7.5 已知扩展 一致性|一致性 hash 环

本篇主要讲解了 什么是一致性 hash 它有哪些优点和存在的问题,以及 带虚拟节点的一致性hash,最后介绍了一些 dubbo 的一致性hash 实现,dubbo自带的有bug ,但是提供了 spi 扩展机制 你可以自己去实现 ,目前我就是这样去解决的 。
