Consistent Hash Algorithm of Load Balancing Algorithm in Dubbo

Consistent hashing algorithm of load balancing algorithm in Dubbo

Hash algorithm

Assume such a scenario, we apply for a server to cache 1 million data. At this time, it is a single-machine environment, and all requests hit this server. Later, the business volume increased, and our data also increased from 1 million to 3 million. The original single-machine environment could not accommodate so much data, so we decided to expand the capacity to three servers. But there is a question at this time. How can these three million data be evenly distributed among these three servers? Assuming we allocate randomly, then when we need to query a certain cache item, we need to first go to the first machine to search. If it is found, it will be returned to the user. If not, continue to the second machine to search. If it is not found yet, continue. Go to the third machine to search. In this way, the performance of querying the cache item will be significantly reduced. Is there a way to directly locate the correct machine for query based on the key of the cache item?

An effective method is to locate it through a hash algorithm. For the same key, the hash value calculated every time should be consistent. Suppose we have three machines. We can use the hash algorithm to obtain the hash value of the key, and then take the modulus of 3. The result can only be 0, 1 or 2. We use these three results as the numbers of the three machines respectively. In this way, every time there is a request to query the cache, we can follow the following steps:

  1. Hash the key to get a hash value, then take modulus 3 to get a machine number
  2. Go to the corresponding machine to query. If the cache item is found, the result is returned directly; if it is not found, it means the cache item does not exist.

The above solution seems to have solved the problem, but it has great limitations in a distributed scenario. Because in a production environment, the following situations will inevitably occur:

  1. The server crashed and was unable to provide services and was offline.
  2. Flexible expansion is required during peak business periods
  3. Business downturns require flexible capacity reduction

These scenarios will cause the number n of background servers to change, so our original formula hash(key) % n will change the calculated value because n changes. In this way In the future, a large number of cache failures will occur, causing a cache avalanche. If this solution is used for load balancing, it will also cause a large number of requests to be routed to the wrong server, and the corresponding service cannot be found, resulting in service exceptions. Therefore, there is a consistent hash algorithm to solve this problem and ensure that when the backend removes or adds a server, the changes in the inventory request and the storage request should be as small as possible Correct mapping between servers.

Consistent Hash Algorithm

Basic introduction

The consistent hashing algorithm was first proposed by David Karger et al. in the paper Consistent Hashing and Random Trees: Distributed Caching Protocols for Relieving Hot Spots on the World Wide Web published in 1997. The design goal of this algorithm is to solve the hot problem in the Internet, that is, how to distribute data to different nodes as evenly as possible to avoid certain nodes bearing excessive load.

Its main idea is to organize the entire hash value space into a virtual ring, allowing each server node to occupy a position on the ring. When there is a new request, the request is mapped to a location on the ring through the hash algorithm, and then the nearest server is found clockwise for processing. In this way, when a server node joins or exits, it will only affect a small part of the data in its vicinity without having a major impact on the entire system. His general steps are:

  1. As shown in the figure below, we virtualize a hash ring with 232-1 nodes, and then calculate the hash value of the server address (ip + port), for 232-1 modulo, calculate its position on the hash ring, assuming we have three server nodes: S1, S2, S3, the mapping relationship is as shown in the figure.
  2. When a new service request arrives, the identifier of the request is also hashed to obtain its position on the ring.
  3. Find the node closest to the request clockwise from the ring, and assign the request to this node for processing. For example, requests R1 and R2 are assigned to the server S1 for processing.

How to solve the problems caused by the increase and decrease of server nodes

If we add a node to the hash ring, for example, we add the S4 node, then the requests that should be routed to S1 and the requests routed to S2 will not be affected and can still be routed correctly, but some requests that were originally routed to S3 will not be affected. will be routed to the new node S4. In this way, what we mentioned earlier can be achieved as little as possible and change the mapping relationship between the existing service request and the server that handles the request.

The same is true for reducing one node. For example, in the figure below, if we take node S3 offline, the requests that should be routed to S1 and the requests that should be routed to S2 will not be affected. They can still be routed correctly. Only the routes that should be routed are affected. Requests to S3 will be routed to S1 because S3 is offline.

What is a virtual node

In the previous illustration, we can see that the three nodes S1, S2, and S3 are evenly distributed on the hash ring. However, in fact, when we calculate it through the hash algorithm, we may not achieve the ideal effect. We may get something like this distribution effect. This will cause a large number of requests to be routed to the S3 node. Therefore, the concept of virtual node was proposed.

The so-called virtual nodes are not real physical nodes, but a copy of the physical node. They have the same IP and port number as their corresponding real physical nodes, so they are called virtual nodes. As shown in the figure below, we have three physical nodes, S1, S2, and S3. Then two virtual nodes are virtualized for each physical node and distributed on the hash ring. Suppose that a request comes and falls in the range from S3-2 to S2-2 through hash calculation. Then it will be routed to the server node S2-2, and S2-2 is actually a virtual node of S2, so in fact It is the physical node S2 that handles this request. We can intuitively see from the figure that after the introduction of virtual nodes, the distribution of requests will be further balanced, and the more virtual nodes are introduced, the better the balance will be.

Implementation of Dubbo consistent hash load balancing

The dubbo consistent hash load balancing strategy is implemented in the ConsistentHashLoadBalance class, which inherits the AbstractLoadBalance abstract class and implements the method doSelect()Method, the function of this method is to select one from multiple service providers to call. There are two important parameters here Invoker and Invocation. Invoker can be understood as an encapsulation of service providers. All service providers In dubbo, it will be converted into Invoker. So List> invokers is actually the list of all service providers we requested this time. Invocation we understand as an encapsulation of this request session, which includes the request parameters, methods and variables

For the concepts of Invoker and Invocation, please refer to: Code Architecture | Apache Dubbo, Implementation Details | Apache Dubbo

/**
 * A thread-safe map that caches all Selectors (can be understood as a selector, used to select one from many service providers to achieve load balancing),
 * key: signature of service provider
 * value: Selector corresponding to a specific service
 */
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

/**
 * Select a call from the invokers
 */
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {<!-- -->
    String methodName = RpcUtils.getMethodName(invocation);

    // 1. Find the corresponding selector from the previously mentioned map based on the service provider's signature
    String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
    int invokersHashCode = invokers.hashCode();
    ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);

    // 1.1 not found? Then initialization, this is a lazy loading process
    if (selector == null || selector.identityHashCode != invokersHashCode) {<!-- -->
        selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, invokersHashCode));
        selector = (ConsistentHashSelector<T>) selectors.get(key);
    }

    // 1.2 Select a specific service provider through selector to process the request
    return selector.select(invocation);
}

As mentioned earlier, if the corresponding Selector is not found, then we need to initialize it. Here is the logic of how to initialize a consistent hash load balancing policy.

private static final class ConsistentHashSelector<T> {<!-- -->
\t
    // Use TreeMap to simulate a hash ring
    private final TreeMap<Long, Invoker<T>> virtualInvokers;

    private final int replicaNumber;

    private final int identityHashCode;

    private final int[] argumentIndex;

    // Initialize Selector logic
    ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {<!-- -->
        this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
        this.identityHashCode = identityHashCode;
        URL url = invokers.get(0).getUrl();
        //The number of copies of the physical node, default 160
        this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
        String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
        argumentIndex = new int[index.length];
        for (int i = 0; i < index.length; i + + ) {<!-- -->
            argumentIndex[i] = Integer.parseInt(index[i]);
        }

        // Initialize the hash ring and evenly distribute all invokers to the ring
        for (Invoker<T> invoker : invokers) {<!-- -->
            String address = invoker.getUrl().getAddress();
            //Create virtual nodes. Each physical machine node should have replicaNumber virtual nodes, but two for loops are used here, probably to enhance the randomness of the hash.
            for (int i = 0; i < replicaNumber / 4; i + + ) {<!-- -->
                byte[] digest = Bytes.getMD5(address + i);
                for (int h = 0; h < 4; h + + ) {<!-- -->
                    long m = hash(digest, h);
                    virtualInvokers.put(m, invoker);
                }
            }
        }
    }

    // Specific selection logic, calculate the md5 value for the request and calculate a hash
    public Invoker<T> select(Invocation invocation) {<!-- -->
        byte[] digest = Bytes.getMD5(RpcUtils.getMethodName(invocation));
        return selectForKey(hash(digest, 0));
    }

    // Use the ceilingEntry method to simulate selecting the closest clockwise service node from the hash ring
    private Invoker<T> selectForKey(long hash) {<!-- -->
        Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
        if (entry == null) {<!-- -->
            entry = virtualInvokers.firstEntry();
        }
        return entry.getValue();
    }

    private long hash(byte[] digest, int number) {<!-- -->
        return (((long) (digest[3 + number * 4] & amp; 0xFF) << 24)
                | ((long) (digest[2 + number * 4] & amp; 0xFF) << 16)
                | ((long) (digest[1 + number * 4] & amp; 0xFF) << 8)
                | (digest[number * 4] & amp; 0xFF))
                 & 0xFFFFFFFFFL;
    }
}