Use akka to simulate Spark’s Master and Worker communication

Use akka to simulate Spark’s Master and Worker communication

Article directory

  • Use akka to simulate Spark’s Master and Worker communication
    • Master and Worker communication process
    • message class
    • Master implementation
    • Worker implementation
    • Summarize

Spark is a big data processing framework based on memory computing. It provides an independent deployment mode (Standalone), which can run Spark applications in its own cluster. In this mode, Spark has two roles: Master and Worker. The Master is the controller of the cluster and is responsible for managing the registration, logout, and status changes of Workers, as well as scheduling the operation of Drivers and Executors. Worker is the worker of the cluster, responsible for running the Driver and Executor, and reporting its status and resource information to the Master.

In this article, we will use the Akka framework to simulate the communication process between Spark’s Master and Worker. Akka is a concurrent and distributed programming framework based on the Actor model. It provides a simple and powerful abstraction, allowing us to use asynchronous messages to build high-performance, scalable, and fault-tolerant systems.

Master and Worker communication process

We will use Akka to simulate the following steps:

  1. Start the Master and Worker, and establish a connection.
  2. Worker registers its own information (memory, number of cores, etc.) with Master.
  3. After receiving the registration information of the Worker, the Master replies with a successful registration message.
  4. After the Worker receives the message of successful registration, it starts a scheduled task and periodically sends a heartbeat packet (3 seconds) to the Master.
  5. After the Master receives the heartbeat packet, it updates the status information of the Worker.
  6. The Master starts a scheduled task, checks whether the Worker times out (30 seconds), and deletes the Worker information if it times out.

Message class

First, we need to define some message types for passing data between Master and Worker:

//Use the sample class because it is convenient for serialization. The simulation is on one machine, but not on multiple machines, so it needs to be serialized
/**
 *Sample class, representing the registration information of the slave node
 * @param slave_id Id of slave node
 * @param lastUpdateTime The last update heartbeat time
 * @param cores The number of cores of the slave node
 * @param memory slave node memory size
 */
case class RegisterClass(val slave_id: String, var lastUpdateTime: Long, val cores: Int, val memory: String)

/**
 * Sample object, indicating successful registration
 */
case object RegisterSuccess

/**
 * Sample class, representing heartbeat
 * @param slave_id ID of slave node sending heartbeat
 */
case class HeartBeat(val slave_id: String)

/**
 * Sample object, indicating that the master node detects whether the slave node sends a heartbeat overtime
 */
case object CheckTimeOut

Master implementation

Then, we need to define a Master class, which inherits from the Actor trait, and implements the receive method for processing received messages. The Master class also needs a preStart method to perform some initialization operations before starting, such as starting a scheduled task to check whether the Worker has timed out.
Packages that need to be imported

import akka.actor.{<!-- -->Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.concurrent.duration.Duration
//import scala.collection.mutable.ListBuffer

class Master

class Master extends Actor {<!-- -->
  //val buffer: ListBuffer[RegisterClass] = ListBuffer[RegisterClass]()
  // Define a mutable mapping variable named slaves, and initialize this mapping to be empty
  var slaves: mutable.Map[String, RegisterClass] = mutable.Map[String, RegisterClass]()

  // Override the receive method to define the message types that Actor can handle
  override def receive: Receive = {<!-- -->
    // If a RegisterClass message is received, assign it to x
    case x: RegisterClass => {<!-- -->
      // Put the slave_id and RegisterClass objects in x into the slaves map
      slaves. put(x. slave_id, x)
      // Print out the number of currently registered workers
      println(s"worker:${<!-- -->x.slave_id} is registering.......................... ........... current workers ${<!-- -->slaves. size}")
      // Reply a RegisterSuccess message to the sender
      sender() ! RegisterSuccess
    }
    // If a message of type HeartBeat is received
    case x: HeartBeat => {<!-- -->
      // Get slave actor information from Map
      slaves.get(x.slave_id) match {<!-- -->
        // If there is information about the slave actor
        case Some(value) => {<!-- -->
          // Update the last update time of the slave actor
          value.lastUpdateTime = System.currentTimeMillis()
          // Store the updated information in the Map
          slaves. put(x. slave_id, value)
          // Print a sentence, indicating that the slave actor is running normally
          println(s"${<!-- -->x.slave_id} is responsive and functional. ")
          // Print a sentence, indicating the number of slave actors currently online
          println(s"Current online workers is ${<!-- -->slaves.size}")
        }
        // If there is no information about the slave actor
        case None => {<!-- -->
          // Print a sentence, indicating that the slave actor does not exist
          println(s"${<!-- -->x.slave_id} does not exist !!!")
        }
      }
    }
    // If a CheckTimeOut message is received
    case CheckTimeOut => {<!-- -->
      // if the Map is not empty
      if (slaves. nonEmpty) {<!-- -->
        // Filter the information in the Map
        slaves = slaves. filter(tuple => {<!-- -->
          // If the last update time of the slave actor is more than 30 seconds from the current time
          if (System.currentTimeMillis() - tuple._2.lastUpdateTime > 30000) {<!-- -->
            // Print a sentence, indicating that the slave actor has timed out, and delete the information from the Map
            println(s"${<!-- -->tuple._1} is timeout , removed from mater !!!!!!!!!!!!!!!!!!!!!!!!!!! !!!")
            false
          } else true
        })
      }
    }
  }


  // Override the preStart method, which is called when the Actor starts
  override def preStart(): Unit = {<!-- -->
    // Import the dispatcher in context to execute scheduled tasks
    import context. dispatcher
    // Use the system in the context to call the scheduler method to create a scheduled task
    context.system.scheduler.schedule(
      // The initial delay time of the scheduled task is 10 seconds
      Duration(10, TimeUnit. SECONDS),
      // The execution interval of the scheduled task is 10 seconds
      Duration(10, TimeUnit. SECONDS),
      // The receiver of the timed task is self, which is the current Actor
      self,
      // The message sent by the scheduled task is CheckTimeOut, which is used to check the timeout
      CheckTimeOut )
  }
}

Finally, we need to define a Master object to start the Master and create a Master instance. We need to configure the address and port of the Master, and create a configuration object based on these parameters. We then need to use this configuration object to create an ActorSystem and use it to create a Master instance.
object Master

object Master {<!-- -->
  def main(args: Array[String]): Unit = {<!-- -->
    // Define a configuration string, including the actor's provider, hostname and port number
    val conf =
      """
        |akka.actor.provider = akka.remote.RemoteActorRefProvider
        |akka.remote.netty.tcp.hostname=localhost
        |akka.remote.netty.tcp.port=8888
        |""".stripMargin
    // Parse the configuration string to get a configuration object
    val config = ConfigFactory. parseString(conf)
    // Create an actor system based on the configuration object, named Hadoop
    val actorSystem = ActorSystem("Hadoop",config)
    // Create a slave actor in the actor system, named master
    actorSystem. actorOf(Props(new Master),"master")
  }
}

Worker implementation

Similarly, we also need to define a Worker class, which inherits from the Actor trait, and implements the receive method, which is used to process the received message and start the scheduled task to send the heartbeat method. So there is another way to write a method to start a scheduled task to send a heartbeat. The Worker class also needs a preStart method to perform some initialization operations before starting, such as registering its own information with the Master, and starting a scheduled task to send a heartbeat packet.
Packages that need to be imported

import akka.actor.{<!-- -->Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration

class Worker

class Slave extends Actor {<!-- -->
  // Define a slave_id variable, assign the value "slave1", indicating the id of the slave node
  val slave_id = "slave1"
  // Define a cores variable, assign a value of 30, indicating the number of cores of the slave node
  val cores = 30
  // Define a memory variable, assign the value "64G", indicating the memory size of the slave node
  val memory = "64G"
  // Override the receive method, which returns a partial function of Receive type
  override def receive: Receive = {<!-- -->
    // Match the RegisterSuccess message, indicating that the registration is successful
    case RegisterSuccess =>
      // Print a message, indicating that the registration of the slave node is successful
      println(s"${<!-- -->slave_id} register successfully!!!!!!!!!")
      // Call the sendHeartBeat method to send a heartbeat message to the sender
      sendHeartBeat()
  }
  // This is a sendHeartBeat method, it has no parameters and no return value
  def sendHeartBeat():Unit={<!-- -->
    /** initialDelay: FiniteDuration, // This is a finite duration, indicating the delay time for sending the heartbeat for the first time
    interval: FiniteDuration, // This is a finite duration, indicating the interval between sending heartbeats
    receiver: ActorRef, // This is an actor reference, indicating the actor receiving the heartbeat
    message: Any) // This is a value of any type, representing the content of the heartbeat
     */
    // Import context.dispatcher, which is an executor for executing scheduled tasks
    import context. dispatcher
    // Call the context.system.scheduler.schedule method to create a scheduled task
    context.system.scheduler.schedule(
      // Set the delay time for sending the first heartbeat to 3 seconds
      Duration(3,TimeUnit. SECONDS),
      // Set the interval for sending heartbeats to 3 seconds
      Duration(3,TimeUnit. SECONDS),
      // Set the actor that receives the heartbeat as sender, that is, the actor that calls this method
      sender(),
      // Set the content of the heartbeat to HeartBeat(slave_id), where slave_id is a variable representing the id of the slave node
      HeartBeat(slave_id))
  }
  // Override the preStart method, it has no parameters and return value
  override def preStart(): Unit = {<!-- -->
    // Create a RegisterClass object, which contains the id of the slave node, registration time, core number and memory size
    val register = RegisterClass(slave_id, System. currentTimeMillis(), cores, memory)
    // Create a proxy object, which is an actor selector, used to select the remote master actor
    val proxy = context.actorSelection("akka.tcp://Hadoop@localhost:8888/user/master")
    // Send a register message to the master actor through the proxy, indicating a request for registration
    proxy ! register
  }
}

Finally, we need to define a Worker object to start the Worker and create a Worker instance. We configure the addresses and ports of Worker and Master, as well as parameters such as the number of cores and memory of Worker, and create a configuration object based on these parameters. We then need to use this configuration object to create an ActorSystem and use it to create a Worker instance.
object Worker

object Slave {<!-- -->
  def main(args: Array[String]): Unit = {<!-- -->
    // Define a configuration string, including the actor's provider, hostname and port number
    val conf =
      """
        |akka.actor.provider = akka.remote.RemoteActorRefProvider
        |akka.remote.netty.tcp.hostname=localhost
        |akka.remote.netty.tcp.port=6666
        |""".stripMargin
    // Parse the configuration string to get a configuration object
    val config = ConfigFactory. parseString(conf)
    // Create an actor system based on the configuration object, named Hadoop2
    val actorSystem = ActorSystem("Hadoop2",config)
    // Create a slave actor in the actor system, named slave1
    actorSystem. actorOf(Props(new Slave),"slave1")
  }
}

Run master

Run the first slave

View master

Run the second slave

Click Edit Configurations

Click Modify options

Select Allow

then click apply

Change to slave2, port to 6662

View master

Run three slaves

Modify the same as slave2

View master

Close slave3

Wait 30 seconds to view the results

operation result
We can run the Master and Slave objects separately and observe the console output. We can see the following results:

Master starts, after Slave1 starts,

Master's console prints “worker:slave1 is registering… current workers 1”

Slave1's console prints “slave1 register successfully!!!”

Indicating that slave1 registered successfully, then the master console prints out

“slave1 is responsive and functional.
Current online workers is 1
slave1 is responsive and functional.
Current online workers is 1
slave1 is responsive and functional.
Current online workers is 1

…”

Indicates that slave1 sends heartbeat detection on time

slave2 starts

Master's console prints “worker:slave2 is registering… current workers 2”

Slave2's console prints “slave2 register successfully!!!”

indicates that slave2 registration is successful, and then the master console prints out

“slave1 is responsive and functional.
Current online workers is 2
slave2 is responsive and functional.
Current online workers is 2
slave1 is responsive and functional.
Current online workers is 2
slave2 is responsive and functional.
Current online workers is 2

…”

Indicates that slave1 and slave2 send heartbeat detection on time

slave3 start

Master's console prints out “worker:slave3 is registering… current workers 3”

Slave3's console prints “slave3 register successfully!!!”

Indicating that slave3 registered successfully, then the master console prints out

“slave1 is responsive and functional.
Current online workers is 3
slave2 is responsive and functional.
Current online workers is 3
slave3 is responsive and functional.
Current online workers is 3
slave1 is responsive and functional.
Current online workers is 3
slave2 is responsive and functional.
Current online workers is 3
slave3 is responsive and functional.
Current online workers is 3
…”

Indicates that slave1, slave2 and slave3 send heartbeat detection on time

slave3 off

Master's console prints out “slave1 is responsive and functional.
Current online workers is 3
slave2 is responsive and functional.
Current online workers is 3
slave1 is responsive and functional.
Current online workers is 3
slave2 is responsive and functional.
Current online workers is 3
slave1 is responsive and functional.
Current online workers is 3
slave2 is responsive and functional.
Current online workers is 3
…”
It can be seen that only slave1 and slave2 are sending heartbeats at this time, but Current online workers is 3, indicating that master has not detected that slave3 is closed .
After the master heartbeat mechanism detection comes out, the master console prints out
“slave3 is timeout, removed from mater!!!
slave1 is responsive and functional.
Current online workers is 2
slave2 is responsive and functional.
Current online workers is 2
slave1 is responsive and functional.
Current online workers is 2
slave2 is responsive and functional.
Current online workers is 2
…”

Indicates that slave1 and slave2 send heartbeat detection on time, slave3 has timed out, delete

Summary

This article describes how to use Akka to simulate the communication process between Spark’s Master and Worker. First define some message types, then implement the Master and Worker classes respectively, and use ActorSystem to create and manage them. We also use objects like ActorRef, ActorSelection, Props, etc. to send and receive messages. We simulated the steps of Worker registering with Master, Master replying that the registration is successful, Worker sending a heartbeat, Master updating status information, and Master checking whether Worker has timed out. Akka is a powerful framework that can help us build highly concurrent, distributed and fault-tolerant systems.