Handwritten RPC framework–9. Heartbeat dynamic online and offline

RPC framework-Gitee code (please give me a Starred and support it)
RPC framework-GitHub code (please give me a Starred and support it)

Heartbeat dynamic online and offline

  • 9. Heartbeat dynamic online and offline
    • a. Realize dynamic offline heartbeat
    • b. Dynamic sensing service is online
    • c. Dynamic sensing service goes offline
    • d. Implement reload balancing reLoadBalance

9. Heartbeat dynamic online and offline

a. Realize dynamic heartbeat offline

Under the core module com.dcyrpc.loadbalancer.impl package

Modify the run() method of the static inner class MyTimerTask of the MinResponseTimeLoadBalancer class

  • Add the number of times to retry the connection
  • Through the while loop, when a problem occurs, you need to retry first. After retrying 3 times, the invalid address will be removed from the service list.
@Override
public void run() {<!-- -->

    // Clear the response time map every time it is started.
    DcyRpcBootstrap.ANSWER_TIME_CHANNEL_CACHE.clear();

    // Traverse all channels
    Map<InetSocketAddress, Channel> channelCache = DcyRpcBootstrap.CHANNEL_CACHE;
    for (Map.Entry<InetSocketAddress, Channel> entry : channelCache.entrySet()) {<!-- -->
        //Number of retries
        int tryTimes = 3;
        while(tryTimes > 0) {<!-- -->

            Channel channel = entry.getValue();

            long start = System.currentTimeMillis();
            //Construct heartbeat request
            DcyRpcRequest dcyRpcRequest = DcyRpcRequest.builder()
                    .requestId(DcyRpcBootstrap.ID_GENERATOR.getId())
                    .compressType(CompressorFactory.getCompressor(DcyRpcBootstrap.COMPRESS_TYPE).getCode())
                    .serializeType(SerializerFactory.getSerializer(DcyRpcBootstrap.SERIALIZE_TYPE).getCode())
                    .requestType(RequestType.HEART_BEAT.getId())
                    .timeStamp(start)
                    .build();

            CompletableFuture<Object> completableFuture = new CompletableFuture<>();
            DcyRpcBootstrap.PENDING_REQUEST.put(dcyRpcRequest.getRequestId(), completableFuture);
            channel.writeAndFlush(dcyRpcRequest).addListener((ChannelFutureListener) promise -> {<!-- -->
                if (!promise.isSuccess()) {<!-- -->
                    completableFuture.completeExceptionally(promise.cause());
                }
            });

            long endTime = 0L;
            try {<!-- -->
                completableFuture.get(1, TimeUnit.SECONDS);
                endTime = System.currentTimeMillis();
            } catch (InterruptedException | ExecutionException | TimeoutException e) {<!-- -->
                // Once a problem occurs, you need to try again first
                tryTimes--;
                log.error("An exception occurred with the host with the address [{}]. Retrying [{}] times...", channel.remoteAddress(), 3 - tryTimes);

                // After retrying 3 times, remove the invalid address from the service list
                if (tryTimes == 0) {<!-- -->
                    DcyRpcBootstrap.CHANNEL_CACHE.remove(entry.getKey());
                }

                //Try to wait for some time and try again
                try {<!-- -->
                    Thread.sleep(10*(new Random().nextInt(5)));
                } catch (InterruptedException ex) {<!-- -->
                    throw new RuntimeException(ex);
                }
                continue;
            }

            Long time = endTime - start;

            //Use TreeMap for caching
            DcyRpcBootstrap.ANSWER_TIME_CHANNEL_CACHE.put(time, channel);

            log.info("And the response time of the [{}] server is [{}]", entry.getKey(), time);
            break;
        }
    }

}

b. Dynamic sensing service is online

Dynamically go online through Zookeeper’s Watcher mechanism:

  • 1. When the caller pulls the service list, register a watcher to pay attention to changes in the service node.
  • 2. The watcher mechanism will be triggered when the service provider goes online or offline (the node changes)
  • 3. Notify the caller to perform dynamic online and offline operations.

Under the com.dcyrpc package under the core module, create the watcher package

Create the UpAndDownWatcher class under this package:

  • Implement Watcher interface
  • Processing new nodes: If a node is found to be not in the cache (proved to be a new node), establish a connection first and then cache it
/**
 * Dynamic online and offline Watcher
 */
@Slf4j
public class UpAndDownWatcher implements Watcher {<!-- -->
    @Override
    public void process(WatchedEvent event) {<!-- -->
        if (event.getType() == Event.EventType.NodeChildrenChanged) {<!-- -->
            log.info("It is detected that the service [{}] has a node going online/offline, and the service list will be re-pulled..", event.getPath());

            String serviceName = getServiceName(event.getPath());
            // Re-fetch the service list
            Registry registry = DcyRpcBootstrap.getInstance().getRegistry();
            List<InetSocketAddress> addressList = registry.lookup(serviceName);

            // Process new nodes
            //The newly added node will be in addressList, not in CHANNEL_CACHE
            for (InetSocketAddress address : addressList) {<!-- -->
                if (!DcyRpcBootstrap.CHANNEL_CACHE.containsKey(address)) {<!-- -->
                    // Establish a connection based on the address and cache it
                    Channel channel = null;
                    try {<!-- -->
                        channel = NettyBootstrapInitializer.getBootstrap().connect(address).sync().channel();
                    } catch (InterruptedException e) {<!-- -->
                        throw new RuntimeException(e);
                    }
                    DcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);
                }
            }
        }
    }

    private String getServiceName(String path) {<!-- -->
        String[] split = path.split("/");
        return split[split.length - 1];
    }
}

In the com.dcyrpc.discovery.impl package under the core module, modify the lookup() method of the ZookeeperRegistry class

  • Added Watcher mechanism: If only the sub-nodes have changed, re-pull the sub-nodes of the service
// Slightly...
// 2. Get its child nodes from zk,
List<String> children = ZookeeperUtils.getChildren(zooKeeper, serviceNode, new UpAndDownWatcher());
// slightly...

c. Dynamic sensing service is offline

Under the com.dcyrpc package under the core module, modify the process() method of the UpAndDownWatcher class: add new code

  • Processing offline nodes: must not be in addressList, may be in CHANNEL_CACHE
// Slightly...
// Handle offline nodes
// The offline node must not be in addressList, it may be in CHANNEL_CACHE
for (Map.Entry<InetSocketAddress, Channel> entry : DcyRpcBootstrap.CHANNEL_CACHE.entrySet()) {<!-- -->
    if (!addressList.contains(entry.getKey())) {<!-- -->
        DcyRpcBootstrap.CHANNEL_CACHE.remove(entry.getKey());
    }
}

d. Implement reLoadBalance

When a node goes online or offline, the nodes need to be rebalanced.

  • Under the com.dcyrpc package under the core module, modify the process() method of the UpAndDownWatcher class: add new code

    • Re-load balancing: Get a load balancer and re-loadBalance
// Slightly...
// Obtain the load balancer and re-loadBalance
LoadBalancer loadBalancer = DcyRpcBootstrap.LOAD_BALANCER;
loadBalancer.reLoadBalance(serviceName, addressList);

Remove all reBalance() methods originally defined in the LoadBalancer interface

In the LoadBalancer interface, create the reLoadBalance() interface

/**
 * When the sensing node goes online and offline dynamically, we need to re-balance the load.
 * @param serviceName service name
 */
void reLoadBalance(String serviceName, List<InetSocketAddress> addressList);

In the AbstractLoadBalancer abstract class, implement this method

@Override
public synchronized void reLoadBalance(String serviceName, List<InetSocketAddress> addressList) {<!-- -->
    // Generate a new selector based on the new service list
    cache.put(serviceName, getSelector(addressList));
}