RPC framework-Gitee code (please give me a Starred and support it)
RPC framework-GitHub code (please give me a Starred and support it)
Heartbeat dynamic online and offline
- 9. Heartbeat dynamic online and offline
-
- a. Realize dynamic offline heartbeat
- b. Dynamic sensing service is online
- c. Dynamic sensing service goes offline
- d. Implement reload balancing reLoadBalance
9. Heartbeat dynamic online and offline
a. Realize dynamic heartbeat offline
Under the core module com.dcyrpc.loadbalancer.impl
package
Modify the run()
method of the static inner class MyTimerTask
of the MinResponseTimeLoadBalancer
class
- Add the number of times to retry the connection
- Through the while loop, when a problem occurs, you need to retry first. After retrying 3 times, the invalid address will be removed from the service list.
@Override public void run() {<!-- --> // Clear the response time map every time it is started. DcyRpcBootstrap.ANSWER_TIME_CHANNEL_CACHE.clear(); // Traverse all channels Map<InetSocketAddress, Channel> channelCache = DcyRpcBootstrap.CHANNEL_CACHE; for (Map.Entry<InetSocketAddress, Channel> entry : channelCache.entrySet()) {<!-- --> //Number of retries int tryTimes = 3; while(tryTimes > 0) {<!-- --> Channel channel = entry.getValue(); long start = System.currentTimeMillis(); //Construct heartbeat request DcyRpcRequest dcyRpcRequest = DcyRpcRequest.builder() .requestId(DcyRpcBootstrap.ID_GENERATOR.getId()) .compressType(CompressorFactory.getCompressor(DcyRpcBootstrap.COMPRESS_TYPE).getCode()) .serializeType(SerializerFactory.getSerializer(DcyRpcBootstrap.SERIALIZE_TYPE).getCode()) .requestType(RequestType.HEART_BEAT.getId()) .timeStamp(start) .build(); CompletableFuture<Object> completableFuture = new CompletableFuture<>(); DcyRpcBootstrap.PENDING_REQUEST.put(dcyRpcRequest.getRequestId(), completableFuture); channel.writeAndFlush(dcyRpcRequest).addListener((ChannelFutureListener) promise -> {<!-- --> if (!promise.isSuccess()) {<!-- --> completableFuture.completeExceptionally(promise.cause()); } }); long endTime = 0L; try {<!-- --> completableFuture.get(1, TimeUnit.SECONDS); endTime = System.currentTimeMillis(); } catch (InterruptedException | ExecutionException | TimeoutException e) {<!-- --> // Once a problem occurs, you need to try again first tryTimes--; log.error("An exception occurred with the host with the address [{}]. Retrying [{}] times...", channel.remoteAddress(), 3 - tryTimes); // After retrying 3 times, remove the invalid address from the service list if (tryTimes == 0) {<!-- --> DcyRpcBootstrap.CHANNEL_CACHE.remove(entry.getKey()); } //Try to wait for some time and try again try {<!-- --> Thread.sleep(10*(new Random().nextInt(5))); } catch (InterruptedException ex) {<!-- --> throw new RuntimeException(ex); } continue; } Long time = endTime - start; //Use TreeMap for caching DcyRpcBootstrap.ANSWER_TIME_CHANNEL_CACHE.put(time, channel); log.info("And the response time of the [{}] server is [{}]", entry.getKey(), time); break; } } }
b. Dynamic sensing service is online
Dynamically go online through Zookeeper’s Watcher mechanism:
- 1. When the caller pulls the service list, register a watcher to pay attention to changes in the service node.
- 2. The watcher mechanism will be triggered when the service provider goes online or offline (the node changes)
- 3. Notify the caller to perform dynamic online and offline operations.
Under the com.dcyrpc
package under the core module, create the watcher
package
Create the UpAndDownWatcher
class under this package:
- Implement Watcher interface
- Processing new nodes: If a node is found to be not in the cache (proved to be a new node), establish a connection first and then cache it
/** * Dynamic online and offline Watcher */ @Slf4j public class UpAndDownWatcher implements Watcher {<!-- --> @Override public void process(WatchedEvent event) {<!-- --> if (event.getType() == Event.EventType.NodeChildrenChanged) {<!-- --> log.info("It is detected that the service [{}] has a node going online/offline, and the service list will be re-pulled..", event.getPath()); String serviceName = getServiceName(event.getPath()); // Re-fetch the service list Registry registry = DcyRpcBootstrap.getInstance().getRegistry(); List<InetSocketAddress> addressList = registry.lookup(serviceName); // Process new nodes //The newly added node will be in addressList, not in CHANNEL_CACHE for (InetSocketAddress address : addressList) {<!-- --> if (!DcyRpcBootstrap.CHANNEL_CACHE.containsKey(address)) {<!-- --> // Establish a connection based on the address and cache it Channel channel = null; try {<!-- --> channel = NettyBootstrapInitializer.getBootstrap().connect(address).sync().channel(); } catch (InterruptedException e) {<!-- --> throw new RuntimeException(e); } DcyRpcBootstrap.CHANNEL_CACHE.put(address, channel); } } } } private String getServiceName(String path) {<!-- --> String[] split = path.split("/"); return split[split.length - 1]; } }
In the com.dcyrpc.discovery.impl
package under the core module, modify the lookup()
method of the ZookeeperRegistry
class
- Added Watcher mechanism: If only the sub-nodes have changed, re-pull the sub-nodes of the service
// Slightly... // 2. Get its child nodes from zk, List<String> children = ZookeeperUtils.getChildren(zooKeeper, serviceNode, new UpAndDownWatcher()); // slightly...
c. Dynamic sensing service is offline
Under the com.dcyrpc
package under the core module, modify the process()
method of the UpAndDownWatcher
class: add new code
- Processing offline nodes: must not be in addressList, may be in CHANNEL_CACHE
// Slightly... // Handle offline nodes // The offline node must not be in addressList, it may be in CHANNEL_CACHE for (Map.Entry<InetSocketAddress, Channel> entry : DcyRpcBootstrap.CHANNEL_CACHE.entrySet()) {<!-- --> if (!addressList.contains(entry.getKey())) {<!-- --> DcyRpcBootstrap.CHANNEL_CACHE.remove(entry.getKey()); } }
d. Implement reLoadBalance
When a node goes online or offline, the nodes need to be rebalanced.
-
Under the
com.dcyrpc
package under the core module, modify theprocess()
method of theUpAndDownWatcher
class: add new code- Re-load balancing: Get a load balancer and re-loadBalance
// Slightly... // Obtain the load balancer and re-loadBalance LoadBalancer loadBalancer = DcyRpcBootstrap.LOAD_BALANCER; loadBalancer.reLoadBalance(serviceName, addressList);
Remove all reBalance()
methods originally defined in the LoadBalancer
interface
In the LoadBalancer
interface, create the reLoadBalance()
interface
/** * When the sensing node goes online and offline dynamically, we need to re-balance the load. * @param serviceName service name */ void reLoadBalance(String serviceName, List<InetSocketAddress> addressList);
In the AbstractLoadBalancer
abstract class, implement this method
@Override public synchronized void reLoadBalance(String serviceName, List<InetSocketAddress> addressList) {<!-- --> // Generate a new selector based on the new service list cache.put(serviceName, getSelector(addressList)); }