Use Akka’s Actor to simulate Spark’s Master and Worker working mechanisms

Use Akka’s Actor to simulate Spark’s Master and Worker working mechanisms

The coordination working principle of Spark’s Master and Worker

In Apache Spark, Master and Worker communicate and remain active through a heartbeat mechanism. The following is the workflow of the heartbeat mechanism between Master and Worker:

  1. After the Worker starts, it will send a registration request to the pre-configured Master node.
  2. After receiving the registration request, the Master creates a unique identifier (Worker ID) for the Worker and saves its information in memory.
  3. Master sends a registration response containing Master URL, Worker ID and other information to Worker.
  4. After the Worker receives the registration response, it starts a timer and starts sending heartbeat messages to the Master periodically.
  5. The Worker’s heartbeat message contains information such as the current load status and available resources.
  6. After receiving the heartbeat message, the Master updates the latest heartbeat time of the Worker and dynamically adjusts the cluster as needed, such as adding new tasks or deleting failed Workers.
  7. If the Master does not receive a heartbeat message from a Worker within a period of time, it will mark the Worker as invalid and mark its corresponding resources as available for subsequent use.

The specific principles are as follows:

  • The Worker sends heartbeat messages to the Master through the network, usually using a long connection based on TCP. These heartbeat messages can contain information about worker health, resource utilization, and more.
  • The Master uses an internal heartbeat management component to process received heartbeat messages and maintain the state of each Worker. It determines whether the Worker is running normally based on the frequency and timestamp of heartbeat messages.
  • If the Master does not receive a heartbeat message from a Worker within a predetermined time, it will mark the Worker as invalid and trigger a series of failure handling mechanisms, such as reassigning tasks to other available Workers.
  • Worker sends heartbeat messages regularly to ensure that the Master can be notified in time when network failure, Worker failure or other problems occur.

Through the heartbeat mechanism, the Master can monitor the status of the Worker in real time and perform dynamic management and resource scheduling of the cluster as needed to achieve high availability and fault tolerance.

Use Akka’s Actor to simulate Spark’s Master and Worker working mechanisms

  1. The worker registers with the Master, and the Master completes the registration and replies to the worker that the registration is successful.
  2. The worker regularly sends heartbeats and is received by the Master.
  3. After the Master receives the worker’s heartbeat, it needs to update the last time the worker sent a heartbeat.
  4. Start a scheduled task for the Master, regularly detect which registered workers have not updated their heartbeats, and delete them from the hashmap.
  5. Master worker performs distributed deployment (Linux system) -> How to package the maven project -> Upload linux.
  • Create a SparkMaster class to inherit the Actor characteristics, implement the Receive method, define the corresponding companion object, create a SparkMaster-actor reference in the companion object, and start the Actor to send messages. The server Master monitors the worker’s heartbeat and finds that the worker’s heartbeat cannot be obtained within 6 seconds, and removes the abnormal Worker instance from the HashMap. If the heartbeat can be obtained normally, the heartbeat time will be updated after obtaining the heartbeat information. Regularly maintain heartbeat mechanism.

Code implementation:

class SparkMaster extends Actor {<!-- -->
  //Define a hashMap and manage workers (instances of all workers)
  val workers = mutable.Map[String, WorkerInfo]()

  override def receive: Receive = {<!-- -->

    case "start" => {<!-- -->
      println("The master server has started...")
      //Start here. .
      self ! StartTimeOutWorker
    }
    case RegisterWorkerInfo(id, cpu, ram) => {<!-- -->
      //Receive worker registration information
      if (!workers.contains(id)) {<!-- -->
        //Create WorkerInfo object
        val workerInfo = new WorkerInfo(id, cpu, ram)
        //Add to workers
        workers + = ((id, workerInfo))
        println("Server's workers=" + workers)
        //Reply a message saying registration is successful
        sender() ! RegisteredWorkerInfo
      }
    }
    case HeartBeat(id) => {<!-- -->
      //Update the heartbeat time of the corresponding worker
      //1. Get the WorkerInfo from the HashMap corresponding to the workers, and then update the worker heartbeat time
      val workerInfo = workers(id)
      workerInfo.lastHeartBeat = System.currentTimeMillis()
      println("master updated " + id + " heartbeat time...")
    }
    case StartTimeOutWorker => {<!-- -->
      println("Started the task of regularly detecting worker heartbeat")
      import context.dispatcher
      //illustrate
      //1. 0 millis no delay, execute the timer immediately
      //2. 9000 millis means executing every 3 seconds
      //3. self: means sending it to yourself
      //4. Content sent by RemoveTimeOutWorker
      context.system.scheduler.schedule(0 millis, 9000 millis, self, RemoveTimeOutWorker)
    }
    //Processing the RemoveTimeOutWorker message
    //Here it is necessary to detect which workers' heartbeats have timed out (now - lastHeartBeat > 6000) and delete them from the map
    case RemoveTimeOutWorker => {<!-- -->
      //First add all WorkerInfo of all workers
      val workerInfos = workers.values
      val nowTime = System.currentTimeMillis()
      //First delete all workerInfo that have timed out.
      workerInfos.filter(workerInfo => (nowTime - workerInfo.lastHeartBeat) > 6000)
        .foreach(workerInfo => workers.remove(workerInfo.id))
      println("There are currently " + workers.size + " workers alive")
    }
  }
}

object SparkMaster {<!-- -->
  def main(args: Array[String]): Unit = {<!-- -->

    //Here we analyze that there are 3 host, port, sparkMasterActor
    if (args.length != 3) {<!-- -->
      println("Please enter the parameter host port sparkMasterActor name")
      sys.exit()
    }

    val host = args(0)
    val port = args(1)
    val name = args(2)

    //Create ActorSystem first
    val config = ConfigFactory.parseString(
      s"""
         |akka.actor.provider="akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname=${<!-- -->host}
         |akka.remote.netty.tcp.port=${<!-- -->port}
            """.stripMargin)
    val sparkMasterSystem = ActorSystem("SparkMaster", config)
    //Create SparkMaster -actor
    val sparkMasterRef = sparkMasterSystem.actorOf(Props[SparkMaster], s"${<!-- -->name}")
    //Start SparkMaster
    sparkMasterRef ! "start"
  }
}

  • Define the SparkWorker class to inherit the Actor characteristics, implement the Receive method, and implement the request to send registration information to the master in the method. After obtaining the message that the server Master has successfully registered, define a scheduled task to send a heartbeat packet to the Master.
class SparkWorker(masterHost:String,masterPort:Int,masterName:String) extends Actor{<!-- -->
  //masterProxy is the proxy/reference ref of Master
  var masterPorxy :ActorSelection = _
  val id = java.util.UUID.randomUUID().toString

  override def preStart(): Unit = {<!-- -->
    println("preStart() call")
    //Initialize masterPorxy
    masterPorxy = context.actorSelection(s"akka.tcp://SparkMaster@${<!-- -->masterHost}:${<!-- -->masterPort}/user/${<!-- -- >masterName}")
    println("masterProxy=" + masterPorxy)
  }
  override def receive:Receive = {<!-- -->
    case "start" => {<!-- -->
      println("worker started")
      //Send a registration message
      masterPorxy ! RegisterWorkerInfo(id, 16, 16 * 1024)
    }
    case RegisteredWorkerInfo => {<!-- -->
      println("workerid= " + id + " Registration successful~")
      //When the registration is successful, define a timer and send SendHeartBeat to yourself at certain intervals.
      import context.dispatcher
      //illustrate
      //1. 0 millis no delay, execute the timer immediately
      //2. 3000 millis means executing every 3 seconds
      //3. self: means sending it to yourself
      //4. Content sent by SendHeartBeat
      context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat)

    }
    case SendHeartBeat =>{<!-- -->
      println("worker = " + id + "Send heartbeat to master")
      masterPorxy !HeartBeat(id)
    }
  }
}

object SparkWorker {<!-- -->
  def main(args: Array[String]): Unit = {<!-- -->

    if (args.length != 6) {<!-- -->
      println("Please enter the parameters workerHost workerPort workerName masterHost masterPort masterName")
      sys.exit()
    }

    val workerHost = args(0)
    val workerPort = args(1)
    val workerName = args(2)
    val masterHost = args(3)
    val masterPort = args(4)
    val masterName = args(5)
    val config = ConfigFactory.parseString(
      s"""
         |akka.actor.provider="akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname=${<!-- -->workerHost}
         |akka.remote.netty.tcp.port=${<!-- -->workerPort}
            """.stripMargin)

    //Create ActorSystem
    val sparkWorkerSystem = ActorSystem("SparkWorker",config)

    //Create a reference/agent for SparkWorker
    val sparkWorkerRef = sparkWorkerSystem.actorOf(Props(new SparkWorker(masterHost, masterPort.toInt,masterName)), s"${<!-- -->workerName}")

    //Start actor
    sparkWorkerRef ! "start"
  }
}

  • Define the RegisterWorkerInfo sample class for sending registration information, the WorkerInfo message class, define the message sample object RegisteredWorkerInfo for successful registration, the heartbeat information sample class HeartBeat, and confirm the sending of heartbeat information sample object SendHeartBeat, and the sample object that triggers the timeout work StartTimeOutWorker, remove the sample object RemoveTimeOutWorker of the timeout worker.

code show as below:

//worker registration information //MessageProtocol.scala
case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)


// This is WorkerInfo, this information will be saved to the master in the future hm (this hashmap is used to manage workers)
// This WorkerInfo will be expanded in the future (such as increasing the last heartbeat time of the worker)
class WorkerInfo(val id: String, val cpu: Int, val ram: Int) {<!-- -->
  var lastHeartBeat : Long = System.currentTimeMillis()
}

// When the worker is successfully registered, the server returns a RegisteredWorkerInfo object
case object RegisteredWorkerInfo

//The worker sends a message to itself by the timer every certain period of time
case object SendHeartBeat
//The worker is triggered by the timer at regular intervals, and the protocol message discovered by the master
case class HeartBeat(id: String)

//The master sends itself a message that triggers the check timeout worker
case object StartTimeOutWorker
// The master sends a message to itself to detect the worker, and if the heartbeat times out.
case object RemoveTimeOutWorker

running result:

Through this case, we can deeply understand the communication mechanism of Spark’s Master and Worker. In order to facilitate future learning of Spark’s underlying source code, the naming method is consistent with the source code. (For example: the communication message class naming is the same); at the same time, it also deepens the understanding of Spark’s underlying source code. This improves our understanding of the master-slave service heartbeat detection mechanism (HeartBeat) and facilitates the secondary development of spark source code in the future.

Spark’s wordCount diagram

  • Through diagrams, you can understand how RDD encapsulates data and computing logic, RDD’s chain operations, and the data flow process.

RDD understanding

  • The data processing method of RDD is similar to IO stream, and there is also a decorator design pattern.
  • The RDD data will actually perform business logic operations only when the collect method is called. All previous packages are functional extensions.
  • RDD does not save data, but IO can temporarily save part of the data.

RDD (Resilient Distributed Dataset) is called a elastic distributed data set and is the most basic data in Spark.
Process model. The code is an abstract class, which represents a flexible, immutable, partitionable, and elements inside that can be parallelized.
A collection of calculations.
? Flexibility
1) Storage flexibility: automatic switching between memory and disk;
2) Error resilience: data loss can be automatically recovered;
3) Calculation flexibility: retry mechanism for calculation errors;
4) Flexibility of sharding: it can be re-sharded as needed. (Sharding can be understood as partitioning)

? Distributed: data is stored on different nodes in the big data cluster
? Dataset: RDD encapsulates calculation logic and does not save data
? Data abstraction: RDD is an abstract class and requires concrete implementation by subclasses
? Immutable: RDD encapsulates calculation logic and cannot be changed. If you want to change it, you can only generate a new RDD and encapsulate the calculation logic in the new RDD.
? Partitionable, parallel computing

Assume that based on the Yarn-Client mode, the Driver runs on the client host. Here is an illustration of the basic principle of the collaboration between the Driver and the Executor of each computing node: It is assumed here that there is only one RDD.
Summarize:
RDD is an elastic data set that encapsulates computing logic. It is an abstract data model for memory-based multi-partition parallel computing.

Spark On Yarn-Client Mode
The Driver module used for monitoring and scheduling will be started on the client’s local host and executed on the client, not in Yarn, so it is generally used for testing.

? Driver runs on the local machine where the task is submitted

? After the Driver is started, it will communicate with the ResourceManager to apply for starting the ApplicationMaster.

? ResourceManager allocates containers, starts ApplicationMaster on the appropriate NodeManager, and is responsible for applying for Executor memory from ResourceManager.

? ResourceManager will allocate the container after receiving the resource request from ApplicationMaster, and then ApplicationMaster will start the Executor process on the NodeManager specified by the resource allocation.

? After the Executor process is started, it will reversely register with the Driver. After all Executor registrations are completed, the Driver starts executing the main function.

? Later, when the Action operator is executed, a Job is triggered and the stages are divided according to the wide dependencies. Each stage generates a corresponding TaskSet, and the number is determined by the number of partitions of the last RDD of each stage; then it will be The task is distributed to each Executor for execution, and the execution results are eventually output. If you need to return the results of each Executor to the Driver aggregation, you need to use an accumulator.