• 一致性哈希出现的背景

先来看这样一个场景问题:有三个 redis 实例分别为 node0、node1、node2,有 3000 万个缓存数据需要存储在这三个实例组成的集群中,希望可以将这些数据均匀地缓存到三个实例上,如何实现呢?

一个比较直观的方案是:对缓存数据的 key 进行 hash 计算后对 N 取模,其中 N 是节点的个数,即取模算法 hash (key) % N 。将计算后的结果映射到对应的集群中的节点。具体如下图所示:

上图所示,首先对 key 进行 hash 计算后的结果对 3 取模,得到的结果一定是 0、1 或者 2,对应实例 node0、node1、node2,然后找到对应的服务器实例存取数据即可。

取模算法可以将每个数据请求均匀地分散到三个不同的服务器节点上,从效果上看很完美。但是在分布式集群系统的负载均衡实现上,这种模型在集群扩容和缩容时有一定的局限性:在生产环境中根据业务量的大小,调整服务器数量是常有的事,而服务器数量 N 发生变化后 hash (key) % N 计算的结果也会随之变化。导致整个集群的缓存数据必须重新计算调整,导致大量缓存在同一时间失效,造成缓存的雪崩,最终导致整个缓存系统不可用,这时不能接受的。为了解决这个问题,一致性哈希算法应运而生。

  • 一致性哈希主要解决的问题以及使用场景

一致性哈希可以解决哈希取模算法在分布式集群中存在的动态扩缩容问题,降低节点上下线的过程中带来的数据迁移成本。一致性哈希算法作为分布式系统中的重要算法,主要应用于负载均衡、缓存数据分区等场景。具体来讲,RPC 框架 Dubbo 用来选择服务提供者;分布式关系数据库分库分表实现数据与节点的映射关系;LVS 负载均衡调度器。

  • 一致性哈希的原理

一致性哈希算法本质上也是一种取模算法,只不过一般的取模算法是对服务器数量进行取模,而一致性哈希算法是对固定值 232 取模,这就使得一致性算法具备良好的单调性:不管集群是否发生添加或删除节点,只要 key 固定,那所请求的服务器节点也同样固定。具体算法工作原理如下:

  1. 一致性哈希算法将整个哈希值空间映射成一个虚拟圆环,整个哈希空间的取值范围是 0 ~ 232 – 1;
  2. 计算各服务器节点的哈希值 hash(key) % (232 – 1),并映射到哈希环上;
  3. 将服务发来的数据请求使用哈希算法 hash(key) % (232 – 1) 算出对应的哈希值;
  4. 将计算的哈希值映射到哈希环上,同时沿圆环顺时针方向查找,遇到的第一台服务器就是该请求对应的请求处理服务器;
  5. 当增加或删除一台服务器时,受影响的数据仅仅是新增或删除的服务器到其环空间中前一台的服务器(也就是顺着逆时针方向遇到的第一台服务器)之间的数据,其它数据都不会受到影响。

这样看来,一致性哈希算法对于节点的新增和删除都只需要重定位环空间中的一小部分数据,具有良好的容错性和可扩展性。

结合下图将会详细讲述哈希环、服务器实例映射到哈希环、请求对象 key 映射到服务器实例的实现过程。

哈希环所属的整个哈希空间取值范围为 0 ~ 232 – 1,按顺时针方向开始从 0 ~ 232 – 1排列,最后的节点 232 – 1 在 0 开始位置重合,形成一个虚拟的圆环。

服务器节点经由一致性哈希算法计算后映射到哈希环上对应的位置。具体可以用服务器 IP 地址作为 key 进行哈希计算,结果对 232 取模,结果一定是一个 0 到 232 – 1 之间的整数。如图所示的 node0、node1、node2 三个服务器节点已映射在哈希环上。

当服务器接收到数据请求时,首先计算数据请求 key 的哈希值,将结果映射到哈希环上的具体位置。然后,从该位置沿着哈希环顺时针查找,遇到的第一个 node 节点就是对应处理该请求 key 的服务器节点。最后,执行对应的数据操作即可。如图所示,通过哈希计算,key-01 顺时针找到 node0,key-02 顺时针找到 node1,key-03 顺时针找到 node2。最终,所有请求都找到了对应的服务器节点执行具体的业务操作。

  • 服务器扩容与缩容
  1. 服务器缩容

服务器缩容就是减少集群中服务器节点的数量或是集群中某个节点故障。假设,集群中的某个节点故障,原本映射到该节点的请求,会找到哈希环中的下一个节点,数据也同样被重新分配至下一个节点,其它节点的数据和请求不受任何影响。这样就确保节点发生故障时,集群能保持正常稳定。如下图所示:

如图所示,节点 node2 发生故障时,数据 key-01 和 key-02 不会受到影响,只有 key-03 的请求被重定位到 node0。在一致性哈希算法中,如何某个节点宕机不可用了,那么受影响的数据仅仅是会寻址到此节点和前一节点之间的数据。其它哈希环上的数据不会受到影响。

2. 服务器扩容

    服务器扩容就是集群中需要增加一个新的服务器节点。假设,由于需要缓存的数据量太大,必须对集群进行扩容增加一个新的数据节点。此时,只需要计算新节点的哈希值并将新的节点加入到哈希环中,然后将哈希环中从上一个节点到新节点的数据映射到新的数据节点即可。其它节点数据不受影响。如下图所示:

    如上图所示,加入新的 node3 节点后,key-01、key-02、key-03 不受影响,只有 key-04 的寻址被重定位到新节点 node3,受影响的数据仅仅是会寻址到新节点和前一节点之间的数据。

    • 数据倾斜与虚拟节点

    由于哈希计算的随机性,导致一致性哈希算法存在一个严重的问题:数据倾斜。即出现大量的访问请求集中在少数几个服务器节点上。如果节点太少,容易因为节点分布不均匀造成数据访问的冷热不均。也就失去了集群和负载均衡的意义。如图所示:

    如图所示,key-01、key-02、key-03 可能被映射到同一个节点 node0 上。导致 node0 负载过大,而 node1 和 node2 却很空闲的情况。这有可能导致个别服务器数据和请求压力过大和崩溃,引起集群的崩溃。

    为了解决数据倾斜的问题,一致性哈希算法引入了虚拟节点机制,即对每一个物理服务服务节点映射多个虚拟节点,将这些虚拟节点计算哈希值并映射到哈希环上,当请求找到某个虚拟节点后,将被重新映射到具体的物理节点。虚拟节点越多,哈希环上的节点就越多,数据分布就越均匀,从而避免了数据倾斜的问题。

    • 一致性哈希算法实现

    一致性哈希算法原理、动态扩缩容、数据倾斜问题介绍完,下面使用 JAVA 实现一个简易的一致性哈希算法。

    1. 数据节点

    类 Node 实现服务器节点初始化、扩容、缩容等功能,具体代码如下:

    public class Node {
        // 表示每个服务器物理节点都有 10 个虚拟节点
        private static final int VIRTUAL_NODE_NO_PER_NODE = 10;
        private final String ip;
        // 保存所有的虚拟节点
        private final List<Integer> virtualNodeHashes = new ArrayList<>(VIRTUAL_NODE_NO_PER_NODE );
        private final Map<Object, Object> cacheMap = new HashMap<>();
    
        public Node(string ip) {
            // 该方法可以判断传入参数 ip 是否为空,如果为空直接抛异常
            Objects.requireNonNull(ip);
            // 初始化 ip 和 虚拟节点 key
            this.ip = ip;
            initVirtualNodes();
        }
    
        privates void initVirtualNodes() {
            String virtualNodeKey;
            for (int i = 1; i <= VIRTUAL_NODE_NO_PER_NODE; i++) {
                virtualNodeKey = ip + "#" + i;
                virtualNodeHashes.add(HashUtils.hashcode(virtualNodeKey));
            }
        }
    
        public void addCacheItem(Object key, Object value) {
            cacheMap.put(key, value);
        }
    
    
        public Object getCacheItem(Object key) {
            return cacheMap.get(key);
        }
    
    
        public void removeCacheItem(Object key) {
            cacheMap.remove(key);
        }
    
    
        public List<Integer> getVirtualNodeHashes() {
            return virtualNodeHashes;
        }
    
        public String getIp() {
            return ip;
        }
    }

    2. 实现一致性哈希算法

    核心算法:通过 Java 中的 TreeMap 类实现哈希环和哈希查找的功能,具体代码如下:

    public class ConsistentHash {
        private final TreeMap<Integer, Node> hashRing = new TreeMap<>();
    
        public List<Node> nodeList = new ArrayList<>();
    
        /**
         * 增加节点
         * 每增加一个节点,就会在闭环上增加给定虚拟节点
         * 在本例中虚拟节点是 VIRTUAL_NODE_NO_PER_NODE = 10,每调用该方法一次,就会增加 10 个虚拟节点,这 10 个节点指向同一 Node
         * @param ip
         */
        public void addNode(String ip) {
            Objects.requireNonNull(ip);
            Node node = new Node(ip);
            nodeList.add(node);
            for (Integer virtualNodeHash : node.getVirtualNodeHashes()) {
                hashRing.put(virtualNodeHash, node);
                System.out.println("虚拟节点[" + node + "] hash:" + virtualNodeHash + ",被添加");
            }
        }
    
        /**
         * 移除节点
         * @param node
         */
        public void removeNode(Node node){
            nodeList.remove(node);
        }
    
        /**
         * 获取缓存数据
         * 先找到对应的虚拟节点,然后映射到物理节点
         * @param key
         * @return
         */
        public Object get(Object key) {
            Node node = findMatchNode(key);
            System.out.println("获取到节点:" + node.getIp());
            return node.getCacheItem(key);
        }
    
        /**
         * 添加缓存
         * 先找到hash环上的节点,然后在对应的节点上添加数据缓存
         * @param key
         * @param value
         */
        public void put(Object key, Object value) {
            Node node = findMatchNode(key);
    
            node.addCacheItem(key, value);
        }
    
        /**
         * 删除缓存数据
         */
        public void evict(Object key) {
            findMatchNode(key).removeCacheItem(key);
        }
    
        /**
         *  获得一个最近的顺时针节点
         * @param key 为给定键取Hash,取得顺时针方向上最近的一个虚拟节点对应的实际节点
         * @return 节点对象
         */
        private Node findMatchNode(Object key) {
            Map.Entry<Integer, Node> entry = hashRing.ceilingEntry(HashUtils.hashcode(key));
            if (entry == null) {
                entry = hashRing.firstEntry();
            }
            return entry.getValue();
        }
    }

    通过 TreeMap 的 ceilingEntry() 方法,实现顺时针查找下一个服务器节点的功能。TreeMap 的 ceilingEntry() 方法用于返回大于等于 给定 key 的最小 key 关联的键值映射,如果没有这样的 key,则返回 null。

    3. 哈希计算

    public class HashUtils {
    
        /**
         * FNV1_32_HASH
         * @param object obj     
         * @return hashcode
         */
        public static int hashcode(Object obj) {
            final int p = 16777619;
            int hash = (int) 2166136261L;
            String str = obj.toString();
            for (int i = 0; i < str.length(); i++)
                hash = (hash ^ str.charAt(i)) * p;
            hash += hash << 13;
            hash ^= hash >> 7;
            hash += hash << 3;
            hash ^= hash >> 17;
            hash += hash << 5;
    
            if (hash < 0)
                hash = Math.abs(hash);
            //System.out.println("hash computer:" + hash);
            return hash;
        }
    }

    4. 运行测试

    public class Main {
        public static final int STRING_COUNT = 100 * 100;
        private static ConsistentHash consistentHash = new ConsistentHash();
        private static List<String> sList = new ArrayList<>();
    
        public static void main(String[] args) {
            // 新增三个数据节点,一共生成 30 个虚拟节点
            consistentHash.addNode("10.2.1.110");
            consistentHash.addNode("10.2.1.220");
            consistentHash.addNode("10.2.1.330");
    
            // 生成需要缓存的数据
            for (int i = 0; i < STRING_COUNT; i++) {
                sList.add(RandomStringUtils.randomAlphanumeric(10));
            }
    
            // 将数据放入到缓存中
            for (String s : sList) {
                consistentHash.put(s, s);
            }
    
            for(int i = 0 ; i < 10 ; i ++) {
                int index = RandomUtils.nextInt(0, STRING_COUNT);
                String key = sList.get(index);
                String cache = (String) consistentHash.get(key);
                System.out.println("Random:"+index+",key:" + key + ",consistentHash get value:" + cache +",value is:" + key.equals(cache));
            }
    
            // 输出节点及数据分布情况
            for (Node node : consistentHash.nodeList){
                System.out.println(node);
            }
        }
    }

    运行程序,输出结果如下:

    Categories:

    Tags:

    No responses yet

    发表回复

    您的邮箱地址不会被公开。 必填项已用 * 标注