Use akka to simulate Spark’s Master and Worker communication
Article directory
- Use akka to simulate Spark’s Master and Worker communication
-
- Master and Worker communication process
- message class
- Master implementation
- Worker implementation
- Summarize
Spark is a big data processing framework based on memory computing. It provides an independent deployment mode (Standalone), which can run Spark applications in its own cluster. In this mode, Spark has two roles: Master and Worker. The Master is the controller of the cluster and is responsible for managing the registration, logout, and status changes of Workers, as well as scheduling the operation of Drivers and Executors. Worker is the worker of the cluster, responsible for running the Driver and Executor, and reporting its status and resource information to the Master.
In this article, we will use the Akka framework to simulate the communication process between Spark’s Master and Worker. Akka is a concurrent and distributed programming framework based on the Actor model. It provides a simple and powerful abstraction, allowing us to use asynchronous messages to build high-performance, scalable, and fault-tolerant systems.
Master and Worker communication process
We will use Akka to simulate the following steps:
- Start the Master and Worker, and establish a connection.
- Worker registers its own information (memory, number of cores, etc.) with Master.
- After receiving the registration information of the Worker, the Master replies with a successful registration message.
- After the Worker receives the message of successful registration, it starts a scheduled task and periodically sends a heartbeat packet (3 seconds) to the Master.
- After the Master receives the heartbeat packet, it updates the status information of the Worker.
- The Master starts a scheduled task, checks whether the Worker times out (30 seconds), and deletes the Worker information if it times out.
Message class
First, we need to define some message types for passing data between Master and Worker:
//Use the sample class because it is convenient for serialization. The simulation is on one machine, but not on multiple machines, so it needs to be serialized /** *Sample class, representing the registration information of the slave node * @param slave_id Id of slave node * @param lastUpdateTime The last update heartbeat time * @param cores The number of cores of the slave node * @param memory slave node memory size */ case class RegisterClass(val slave_id: String, var lastUpdateTime: Long, val cores: Int, val memory: String) /** * Sample object, indicating successful registration */ case object RegisterSuccess /** * Sample class, representing heartbeat * @param slave_id ID of slave node sending heartbeat */ case class HeartBeat(val slave_id: String) /** * Sample object, indicating that the master node detects whether the slave node sends a heartbeat overtime */ case object CheckTimeOut
Master implementation
Then, we need to define a Master class, which inherits from the Actor trait, and implements the receive method for processing received messages. The Master class also needs a preStart method to perform some initialization operations before starting, such as starting a scheduled task to check whether the Worker has timed out.
Packages that need to be imported
import akka.actor.{<!-- -->Actor, ActorSystem, Props} import com.typesafe.config.ConfigFactory import java.util.concurrent.TimeUnit import scala.collection.mutable import scala.concurrent.duration.Duration //import scala.collection.mutable.ListBuffer
class Master
class Master extends Actor {<!-- --> //val buffer: ListBuffer[RegisterClass] = ListBuffer[RegisterClass]() // Define a mutable mapping variable named slaves, and initialize this mapping to be empty var slaves: mutable.Map[String, RegisterClass] = mutable.Map[String, RegisterClass]() // Override the receive method to define the message types that Actor can handle override def receive: Receive = {<!-- --> // If a RegisterClass message is received, assign it to x case x: RegisterClass => {<!-- --> // Put the slave_id and RegisterClass objects in x into the slaves map slaves. put(x. slave_id, x) // Print out the number of currently registered workers println(s"worker:${<!-- -->x.slave_id} is registering.......................... ........... current workers ${<!-- -->slaves. size}") // Reply a RegisterSuccess message to the sender sender() ! RegisterSuccess } // If a message of type HeartBeat is received case x: HeartBeat => {<!-- --> // Get slave actor information from Map slaves.get(x.slave_id) match {<!-- --> // If there is information about the slave actor case Some(value) => {<!-- --> // Update the last update time of the slave actor value.lastUpdateTime = System.currentTimeMillis() // Store the updated information in the Map slaves. put(x. slave_id, value) // Print a sentence, indicating that the slave actor is running normally println(s"${<!-- -->x.slave_id} is responsive and functional. ") // Print a sentence, indicating the number of slave actors currently online println(s"Current online workers is ${<!-- -->slaves.size}") } // If there is no information about the slave actor case None => {<!-- --> // Print a sentence, indicating that the slave actor does not exist println(s"${<!-- -->x.slave_id} does not exist !!!") } } } // If a CheckTimeOut message is received case CheckTimeOut => {<!-- --> // if the Map is not empty if (slaves. nonEmpty) {<!-- --> // Filter the information in the Map slaves = slaves. filter(tuple => {<!-- --> // If the last update time of the slave actor is more than 30 seconds from the current time if (System.currentTimeMillis() - tuple._2.lastUpdateTime > 30000) {<!-- --> // Print a sentence, indicating that the slave actor has timed out, and delete the information from the Map println(s"${<!-- -->tuple._1} is timeout , removed from mater !!!!!!!!!!!!!!!!!!!!!!!!!!! !!!") false } else true }) } } } // Override the preStart method, which is called when the Actor starts override def preStart(): Unit = {<!-- --> // Import the dispatcher in context to execute scheduled tasks import context. dispatcher // Use the system in the context to call the scheduler method to create a scheduled task context.system.scheduler.schedule( // The initial delay time of the scheduled task is 10 seconds Duration(10, TimeUnit. SECONDS), // The execution interval of the scheduled task is 10 seconds Duration(10, TimeUnit. SECONDS), // The receiver of the timed task is self, which is the current Actor self, // The message sent by the scheduled task is CheckTimeOut, which is used to check the timeout CheckTimeOut ) } }
Finally, we need to define a Master object to start the Master and create a Master instance. We need to configure the address and port of the Master, and create a configuration object based on these parameters. We then need to use this configuration object to create an ActorSystem and use it to create a Master instance.
object Master
object Master {<!-- --> def main(args: Array[String]): Unit = {<!-- --> // Define a configuration string, including the actor's provider, hostname and port number val conf = """ |akka.actor.provider = akka.remote.RemoteActorRefProvider |akka.remote.netty.tcp.hostname=localhost |akka.remote.netty.tcp.port=8888 |""".stripMargin // Parse the configuration string to get a configuration object val config = ConfigFactory. parseString(conf) // Create an actor system based on the configuration object, named Hadoop val actorSystem = ActorSystem("Hadoop",config) // Create a slave actor in the actor system, named master actorSystem. actorOf(Props(new Master),"master") } }
Worker implementation
Similarly, we also need to define a Worker class, which inherits from the Actor trait, and implements the receive method, which is used to process the received message and start the scheduled task to send the heartbeat method. So there is another way to write a method to start a scheduled task to send a heartbeat. The Worker class also needs a preStart method to perform some initialization operations before starting, such as registering its own information with the Master, and starting a scheduled task to send a heartbeat packet.
Packages that need to be imported
import akka.actor.{<!-- -->Actor, ActorSystem, Props} import com.typesafe.config.ConfigFactory import java.util.concurrent.TimeUnit import scala.concurrent.duration.Duration
class Worker
class Slave extends Actor {<!-- --> // Define a slave_id variable, assign the value "slave1", indicating the id of the slave node val slave_id = "slave1" // Define a cores variable, assign a value of 30, indicating the number of cores of the slave node val cores = 30 // Define a memory variable, assign the value "64G", indicating the memory size of the slave node val memory = "64G" // Override the receive method, which returns a partial function of Receive type override def receive: Receive = {<!-- --> // Match the RegisterSuccess message, indicating that the registration is successful case RegisterSuccess => // Print a message, indicating that the registration of the slave node is successful println(s"${<!-- -->slave_id} register successfully!!!!!!!!!") // Call the sendHeartBeat method to send a heartbeat message to the sender sendHeartBeat() } // This is a sendHeartBeat method, it has no parameters and no return value def sendHeartBeat():Unit={<!-- --> /** initialDelay: FiniteDuration, // This is a finite duration, indicating the delay time for sending the heartbeat for the first time interval: FiniteDuration, // This is a finite duration, indicating the interval between sending heartbeats receiver: ActorRef, // This is an actor reference, indicating the actor receiving the heartbeat message: Any) // This is a value of any type, representing the content of the heartbeat */ // Import context.dispatcher, which is an executor for executing scheduled tasks import context. dispatcher // Call the context.system.scheduler.schedule method to create a scheduled task context.system.scheduler.schedule( // Set the delay time for sending the first heartbeat to 3 seconds Duration(3,TimeUnit. SECONDS), // Set the interval for sending heartbeats to 3 seconds Duration(3,TimeUnit. SECONDS), // Set the actor that receives the heartbeat as sender, that is, the actor that calls this method sender(), // Set the content of the heartbeat to HeartBeat(slave_id), where slave_id is a variable representing the id of the slave node HeartBeat(slave_id)) } // Override the preStart method, it has no parameters and return value override def preStart(): Unit = {<!-- --> // Create a RegisterClass object, which contains the id of the slave node, registration time, core number and memory size val register = RegisterClass(slave_id, System. currentTimeMillis(), cores, memory) // Create a proxy object, which is an actor selector, used to select the remote master actor val proxy = context.actorSelection("akka.tcp://Hadoop@localhost:8888/user/master") // Send a register message to the master actor through the proxy, indicating a request for registration proxy ! register } }
Finally, we need to define a Worker object to start the Worker and create a Worker instance. We configure the addresses and ports of Worker and Master, as well as parameters such as the number of cores and memory of Worker, and create a configuration object based on these parameters. We then need to use this configuration object to create an ActorSystem and use it to create a Worker instance.
object Worker
object Slave {<!-- --> def main(args: Array[String]): Unit = {<!-- --> // Define a configuration string, including the actor's provider, hostname and port number val conf = """ |akka.actor.provider = akka.remote.RemoteActorRefProvider |akka.remote.netty.tcp.hostname=localhost |akka.remote.netty.tcp.port=6666 |""".stripMargin // Parse the configuration string to get a configuration object val config = ConfigFactory. parseString(conf) // Create an actor system based on the configuration object, named Hadoop2 val actorSystem = ActorSystem("Hadoop2",config) // Create a slave actor in the actor system, named slave1 actorSystem. actorOf(Props(new Slave),"slave1") } }
Run master
Run the first slave
View master
Run the second slave
Click Edit Configurations
Click Modify options
Select Allow
then click apply
Change to slave2, port to 6662
View master
Run three slaves
Modify the same as slave2
View master
Close slave3
Wait 30 seconds to view the results
operation result
We can run the Master and Slave objects separately and observe the console output. We can see the following results:
Master starts, after Slave1 starts
,
Master's console
prints “worker:slave1 is registering… current workers 1”
Slave1's console
prints “slave1 register successfully!!!”
Indicating that slave1 registered successfully
, then the master console prints out
“slave1 is responsive and functional.
Current online workers is 1
slave1 is responsive and functional.
Current online workers is 1
slave1 is responsive and functional.
Current online workers is 1
…”
Indicates that slave1 sends heartbeat detection on time
slave2 starts
Master's console
prints “worker:slave2 is registering… current workers 2”
Slave2's console
prints “slave2 register successfully!!!”
indicates that slave2 registration is successful
, and then the master console prints out
“slave1 is responsive and functional.
Current online workers is 2
slave2 is responsive and functional.
Current online workers is 2
slave1 is responsive and functional.
Current online workers is 2
slave2 is responsive and functional.
Current online workers is 2
…”
Indicates that slave1 and slave2 send heartbeat detection on time
slave3 start
Master's console
prints out “worker:slave3 is registering… current workers 3”
Slave3's console
prints “slave3 register successfully!!!”
Indicating that slave3 registered successfully
, then the master console prints out
“slave1 is responsive and functional.
Current online workers is 3
slave2 is responsive and functional.
Current online workers is 3
slave3 is responsive and functional.
Current online workers is 3
slave1 is responsive and functional.
Current online workers is 3
slave2 is responsive and functional.
Current online workers is 3
slave3 is responsive and functional.
Current online workers is 3
…”
Indicates that slave1, slave2 and slave3 send heartbeat detection on time
slave3 off
Master's console
prints out “slave1 is responsive and functional.
Current online workers is 3
slave2 is responsive and functional.
Current online workers is 3
slave1 is responsive and functional.
Current online workers is 3
slave2 is responsive and functional.
Current online workers is 3
slave1 is responsive and functional.
Current online workers is 3
slave2 is responsive and functional.
Current online workers is 3
…”
It can be seen that only slave1 and slave2 are sending heartbeats at this time
, but Current online workers is 3
, indicating that master has not detected that slave3 is closed
.
After the master heartbeat mechanism detection
comes out, the master console
prints out
“slave3 is timeout, removed from mater!!!
slave1 is responsive and functional.
Current online workers is 2
slave2 is responsive and functional.
Current online workers is 2
slave1 is responsive and functional.
Current online workers is 2
slave2 is responsive and functional.
Current online workers is 2
…”
Indicates that slave1 and slave2 send heartbeat detection on time, slave3 has timed out
, delete
Summary
This article describes how to use Akka to simulate the communication process between Spark’s Master and Worker. First define some message types, then implement the Master and Worker classes respectively, and use ActorSystem to create and manage them. We also use objects like ActorRef, ActorSelection, Props, etc. to send and receive messages. We simulated the steps of Worker registering with Master, Master replying that the registration is successful, Worker sending a heartbeat, Master updating status information, and Master checking whether Worker has timed out. Akka is a powerful framework that can help us build highly concurrent, distributed and fault-tolerant systems.