SOFAJRaft Registration Center-Heartbeat Detection Implementation

Article directory

    • 1 Introduction
    • 2.Heartbeat flow chart
      • overall process
      • Heartbeat renewal & heartbeat detection
    • 3. Implementation steps
      • 3.1 Client
      • 3.2 Server
        • 3.2.1 HeartBeatRpcProcessor
        • 3.2.2 HeartBeatRequestHandler
        • 3.2.3 ServiceDiscoveryRequestHandlerFactory adds onBeat method
        • 3.2.4 ServiceDiscoveryCheckBeatThread heartbeat detection thread
        • 3.2.5 ServiceDiscoveryRequestHandlerFactory adds checkBeat method
        • 3.2.6 ServiceDiscoveryServer starts the heartbeat detection thread when the server starts
    • 4. Test

1. Preface

  • After learning from the previous article Implementing a Registration Center Based on SOFAJRaft_The Romantic Blog-CSDN Blog, we learned how to implement a registration center based on JRaft and learned about the programming model of JRaft.
  • Just follow the steps and examples on the official website and improve the code step by step.
  • This article is mainly to improve the heartbeat function of the registration center.

ps: Some code defects in previous articles have also been corrected.

The complete code address of this article: https://github.com/huajiexiewenfeng/eval-discovery

2. Heartbeat flow chart

Overall process

image.png
The entire process can be divided into three sub-processes

  • Service registration: client sends a register registration request to the server
    • Processed by RegistrationRpcProcessor
      • Trigger the onApply method of the ServiceDiscoveryStateMachine state machine
        • The bottom layer uses ServiceDiscoveryRequestHandlerFactory#storage to store registration information
  • Send heartbeat: When the client sends the register request, a heartbeat thread will be started.
    • This thread sends a beat heartbeat request to the server every 5 seconds
      • Processed by HeartBeatRpcProcessor
        • Trigger the onApply method of the ServiceDiscoveryStateMachine state machine
          • The underlying ServiceDiscoveryRequestHandlerFactory#onBeat handles heartbeats
  • Heartbeat detection: When the registration center server starts, a heartbeat detection thread will be started to detect the survival status of the instance every 5 seconds.
    • Underlying ServiceDiscoveryRequestHandlerFactory#checkBeat

Heartbeat renewal &Heartbeat detection

image.png
heartbeat renewal

  • Each time a heartbeat request is sent, the server-side heartbeat service instance Map collection is updated.
  • Data structure: Map>
    • serviceName
      • serviceId: current time now
  • The current time now will be updated every time the contract is renewed.

Heartbeat detection

  • Traverse the heartbeat service instance Map collection
  • Get instance renewal information
  • Compared with the current time, if time > expiration time
  • Delete instance information

3. Implementation steps

3.1 Client

Let’s first modify JRaftServiceDiscovery and add a heartbeat thread in the register method. After the transformation, the core code is as follows:

public class JRaftServiceDiscovery implements ServiceDiscovery {<!-- -->
  ...
  @Override
  public void register(ServiceInstance serviceInstance) {<!-- -->
    logger.info(" register serviceInstance,{}", serviceInstance);
    // Call RPC
    ServiceDiscoveryOuter.Registration registration = buildRegistration(serviceInstance, false);
    try {<!-- -->
      serviceDiscoveryClient.invoke(registration);
    } catch (Throwable e) {<!-- -->
      e.printStackTrace();
    }
    // After successful registration, start the heartbeat thread service
    ServiceDiscoveryHeartBeatThread beatThread = new ServiceDiscoveryHeartBeatThread(serviceDiscoveryClient,serviceInstance);
    beatThread.setDaemon(true);
    beatThread.start();
  }
  ...
}

ServiceDiscoveryHeartBeatThread

  • The core logic of heartbeat is similar to register
    • Encapsulate HeartBeat object
    • Send RPC request
public class ServiceDiscoveryHeartBeatThread extends Thread {<!-- -->

  private final ServiceDiscoveryClient serviceDiscoveryClient;
  private final ServiceInstance serviceInstance;

  private static final Logger logger = LoggerFactory
      .getLogger(ServiceDiscoveryHeartBeatThread.class);

  public ServiceDiscoveryHeartBeatThread(ServiceDiscoveryClient serviceDiscoveryClient,
      ServiceInstance serviceInstance) {<!-- -->
    super("client-service-instance-beat");
    this.serviceDiscoveryClient = serviceDiscoveryClient;
    this.serviceInstance = serviceInstance;
  }

  @Override
  public void run() {<!-- -->
    while (true) {<!-- -->
      // Call RPC
      ServiceDiscoveryOuter.HeartBeat heartBeat = buildHeartBeat(serviceInstance);
      try {<!-- -->
        serviceDiscoveryClient.invoke(heartBeat);
        Thread.sleep(5000);
      } catch (Throwable e) {<!-- -->
        logger.error("Fail to send heartbeat for a service instance: " + serviceInstance, e);
      }
    }
  }

  private HeartBeat buildHeartBeat(ServiceInstance serviceInstance) {<!-- -->
    return HeartBeat.newBuilder()
        .setHost(serviceInstance.getHost())
        .setId(serviceInstance.getId())
        .setPort(serviceInstance.getPort())
        .setServiceName(serviceInstance.getServiceName())
        .build();
  }
}

3.2 Server

3.2.1 HeartBeatRpcProcessor

First improve the HeartBeatRpcProcessor. This class is the processing class after the request is sent to the server. Just refer to the code of RegistrationRpcProcessor. Most of the codes are the same.

public class HeartBeatRpcProcessor implements RpcProcessor<ServiceDiscoveryOuter.HeartBeat> {<!-- -->
  ...
  @Override
  public void handleRequest(RpcContext rpcContext, HeartBeat heartBeat) {<!-- -->
    ServiceInstance serviceInstance = convertServiceInstance(heartBeat);

    String serviceName = heartBeat.getServiceName();

    final Kind kind = Kind.BEAT;

    ServiceDiscoveryOperation op = new ServiceDiscoveryOperation(kind, serviceInstance);

    final ServiceDiscoveryClosure closure = new ServiceDiscoveryClosure(op) {<!-- -->
      @Override
      public void run(Status status) {<!-- -->
        if (!status.isOk()) {<!-- -->
          logger.warn("Closure status is : {} at the {}", status, rpcProcessorService.getNode());
          return;
        }
        rpcContext.sendResponse(response(status));
        logger.info("'{}' has been handled ,serviceName : '{}' , result : {} , status : {}",
            kind, serviceName, getResult(), status);
      }
    };

    this.rpcProcessorService.applyOperation(closure);
  }
  ...
}
3.2.2 HeartBeatRequestHandler

This class is the message processing class in ServiceDiscoveryStateMachine#onApply. We will continue to improve it.

public class HeartBeatRequestHandler implements ServiceDiscoveryRequestHandler {<!-- -->

  private static final Logger logger = LoggerFactory.getLogger(HeartBeatRequestHandler.class);

  private ServiceDiscoveryRequestHandlerFactory factory;

  public HeartBeatRequestHandler(
      ServiceDiscoveryRequestHandlerFactory factory) {<!-- -->
    this.factory = factory;
  }

  @Override
  public void doHandle(ServiceDiscoveryClosure closure, ServiceInstance serviceInstance) {<!-- -->
    if (null == serviceInstance) {<!-- -->
      return;
    }
    factory.onBeat(serviceInstance);
    logger.info("{} has been renewed at the node", serviceInstance);
  }
}
3.2.3 ServiceDiscoveryRequestHandlerFactory adds onBeat method
ublic class ServiceDiscoveryRequestHandlerFactory {<!-- -->
  ...
  private final Object monitor = new Object();

  /**
   * Mapping of service name and service instance list (List) serverName:<serverId:instance>, using serverId for deduplication
   */
  private final Map<String, Map<String, ServiceInstance>> serviceNameToInstancesStorage = new ConcurrentHashMap<>();

  /**
   * Mapping between service name and service instance list (List + heartbeat time) serverName:<serverId:now>
   */
  private final Map<String, Map<String, Instant>> serviceNameToInstantsMap = new ConcurrentHashMap<>();
  ...
  public void onBeat(ServiceInstance serviceInstance) {<!-- -->
    String serviceName = serviceInstance.getServiceName();
    String id = serviceInstance.getId();
    synchronized (monitor) {<!-- -->
      Map<String, ServiceInstance> serviceInstancesMap = serviceNameToInstancesStorage
          .computeIfAbsent(serviceName, n -> new LinkedHashMap<>());
      if (!serviceInstancesMap.containsKey(id)) {<!-- -->
        //Invalid heartbeat request
        logger.info("{} beat is invalid", serviceInstance);
        return;
      }
      Map<String, Instant> instantMap = serviceNameToInstantsMap
          .computeIfAbsent(serviceName, n -> new LinkedHashMap<>());
      //Renew contract
      instantMap.put(id, Instant.now());
    }
  }
}
3.2.4 ServiceDiscoveryCheckBeatThread heartbeat detection thread
public class ServiceDiscoveryCheckBeatThread extends Thread {<!-- -->

  private final ServiceDiscoveryRequestHandlerFactory factory;
  private final ServiceDiscoveryStateMachine stateMachine;

  private static final Logger logger = LoggerFactory
      .getLogger(ServiceDiscoveryCheckBeatThread.class);

  public ServiceDiscoveryCheckBeatThread(ServiceDiscoveryRequestHandlerFactory factory,
      ServiceDiscoveryStateMachine stateMachine) {<!-- -->
    super("service-instance-beat-check");
    this.factory = factory;
    this.stateMachine = stateMachine;
  }

  @Override
  public void run() {<!-- -->
    while (true) {<!-- -->
      try {<!-- -->
        if (stateMachine.isLeader()) {<!-- -->
          factory.checkBeat();
        }
        Thread.sleep(5000);
      } catch (Throwable e) {<!-- -->
        logger.error("error on check beat", e);
      }
    }
  }

}
3.2.5 ServiceDiscoveryRequestHandlerFactory adds checkBeat method
 public void checkBeat() {<!-- -->
    final Instant now = Instant.now();
    synchronized (monitor) {<!-- -->
      // Traverse all service instance collections
      for (Map.Entry<String, Map<String, ServiceInstance>> serviceInstanceMap : this.serviceNameToInstancesStorage
          .entrySet()) {<!-- -->
        String serviceName = serviceInstanceMap.getKey();
        // Get the service renewal time set corresponding to the current service instance
        Map<String, Instant> instantMap = this.serviceNameToInstantsMap.get(serviceName);
        if (CollectionUtils.isEmpty(instantMap)) {<!-- -->
          // Remove all instances of the current service
          serviceInstanceMap.getValue().clear();
          continue;
        }
        for (Map.Entry<String, Instant> instantService : instantMap.entrySet()) {<!-- -->
          if (instantService.getValue().plus(expired, ChronoUnit.SECONDS).isBefore(now)) {<!-- -->
            //No heartbeat received for more than 30 seconds
            logger.info(
                "The current instance [{}] has not received a heartbeat request for more than 30 seconds",
                instantService);
            removeInstance(serviceName, instantService.getKey());
          }
        }
      }
    }
  }

If no heartbeat is received for 30 seconds, remove the instance

 private void removeInstance(String serviceName, String id) {<!-- -->
    Map<String, ServiceInstance> serviceInstancesMap = getServiceInstancesMap(serviceName);
    ServiceInstance serviceInstance = serviceInstancesMap.get(id);
    if (null != serviceInstance) {<!-- -->
      //Send logout message synchronized to followers
      logger.info("send DeRegistration {}", serviceInstance);
      sendDeRegistrationRpc(serviceInstance);
    }
  }

  private void sendDeRegistrationRpc(ServiceInstance serviceInstance) {<!-- -->
    String serviceName = serviceInstance.getServiceName();

    final Kind kind = Kind.DEREGISTRATION;

    ServiceDiscoveryOperation op = new ServiceDiscoveryOperation(kind, serviceInstance);

    final ServiceDiscoveryClosure closure = new ServiceDiscoveryClosure(op) {<!-- -->
      @Override
      public void run(Status status) {<!-- -->
        if (!status.isOk()) {<!-- -->
          logger.warn("Closure status is : {} at the {}", status, rpcProcessorService.getNode());
          return;
        }
        logger.info("'{}' has been handled ,serviceName : '{}' , result : {} , status : {}",
            kind, serviceName, getResult(), status);
      }
    };

    this.rpcProcessorService.applyOperation(closure);
  }
3.2.6 ServiceDiscoveryServer starts the heartbeat detection thread when the server starts
public class ServiceDiscoveryServer {<!-- -->

  private RaftGroupService raftGroupService;
  private Node node;
  private ServiceDiscoveryStateMachine fsm;

  public ServiceDiscoveryServer(final String dataPath, final String groupId, final PeerId serverId,
      final NodeOptions nodeOptions) throws IOException {<!-- -->
    ...
    this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer);
    // start raft node
    this.node = this.raftGroupService.start();
    ServiceDiscoveryRequestHandlerFactory instanceFactory = ServiceDiscoveryRequestHandlerFactory
        .getInstance();
    instanceFactory.init();
    instanceFactory.setRpcProcessorService(rpcProcessorService);
    //Start the heartbeat detection thread
    ServiceDiscoveryCheckBeatThread beatThread = new ServiceDiscoveryCheckBeatThread(
        instanceFactory, fsm);
    beatThread.setDaemon(true);
    beatThread.start();
  }
  ...
}

4. Test

Follow the method of the previous article to implement the registration center based on SOFAJRaft_I don’t understand the romantic blog-CSDN blog and start them separately.

  • ServiceDiscoveryServer1 – Registration center server 1
  • ServiceDiscoveryServer2 – Registration Center Server 2
  • ServiceDiscoveryServer3 – Registration Center Server 3
  • MyApplication – Spring Web Application

After the startup is completed, observe the log printing of the three servers.
You can see that the Beat heartbeat request is printed every 5 seconds.
image.png
Close MyApplication and simulate heartbeat timeout (service offline) situation
Leader node log:
image.png
Follower node log
image.png