Big data development-concurrent programming from Scala to Akka

Big data development-concurrent programming from Scala to Akka_jDW32G3c87fjEBtYNE7Z7f

Big data development-concurrent programming from Scala to Akka

[4-1 Scala homework.pdf](file/4-1 Scala homework_UVqqJLwoIp.pdf)

1. Recursive implementation of bottle cap and bottle-to-bottle algorithm

1.1 Requirement Description

Each bottle of beer is 2 yuan, and 3 empty bottles or 5 bottle caps can be exchanged for 1 bottle of beer. How many bottles of beer can you drink for 100 yuan? (No beer borrowing allowed)
Idea: Use a recursive algorithm to buy it all at once, and then recursively calculate the number of beers that can be exchanged for bottle caps and empty bottles.

1.2 Code Implementation

 def extralPeer(bot: Int, cap: Int): Int = {
    val count = bot / 3 + cap / 5
    if (count <= 0)
      return 0

    val modBot = bot % 3
    val modCap = cap % 5
    count + extralPeer(count + modBot, count + modCap)
  }

2.Scala implements guessing game

2.1 The requirements are as follows

  1. Select a fighting character
  2. Start the battle, the user punches, compares with the opponent, and prompts the victory or defeat information
  3. Points are calculated at the end of the guessing game. One point is added for a draw, two points are added for a win, and no points are added for a failure.
  4. Loop the battle. When “n” is entered, the battle is terminated and the battle results are displayed.
  5. Show score after game is over

2.2 Implementation logic and main code

  1. Create a user class User, define the attributes of the class (name, score) and the method of the class (showFist())
  2. Create the computer class Computer, define the attributes of the class (name, score) and the method of the class (showFist())
  3. Realize computer punching randomly
  4. Create a game class Game and define the attributes of the class (players of Party A, players of Party B, number of battles)
  5. Write initialization method and game start method

Main code:

2.3 Effect Demonstration

Code: https://github.com/hulichao/bigdata-code/tree/master/src/main/scala/com/hoult/scala/job/game

def extralPeer(bot: Int, cap: Int): Int = {
  val count = bot / 3 + cap / 5
  
  if (count <= 0)
  
    return 0
  
  val modBot = bot % 3
  
  val modCap = cap % 5
  
  count + extralPeer(count + modBot, count + modCap)
  }

3. User location duration statistics sql

3.1 Requirements Description

The following data needs to be processed: Fields: user ID, location ID, start time, stay duration (minutes)
4 lines of sample data: ? UserA,LocationA,8,60 UserA,LocationA,9,60 UserB,LocationB,10,60 UserB,LocationB,11,80 Sample
The meaning of the data is: User UserA stayed at LocationA for 60 minutes starting from 8 o’clock
Processing requirements: 1. Merge multiple consecutive records of the same user at the same location 2. Merger principle: take the earliest start time and stay
Cumulative sum of duration

3.2 Sql implementation

select user_id, location_id, min(start_time) as start_time, sum(stay_time) as stay_time from t1 group by user_id, location_id

4.Communication between Actors

Write two Actors, namely AActor and BActor. The two Actors can send messages to each other.

4.1 Main Implementation

First use a start command to start AActor, then send yourself a message to start the process behind go, and then A,BSend messages to each other in a loop

Main code: https://github.com/hulichao/bigdata-code/tree/master/src/main/scala/com/hoult/scala/job/actor

object Demo {
  private val MyFactory = ActorSystem("myFactory")
  // Create an actor through the MyFactory.actorOf method;
  //Only send a message to A and let their two actors start a conversation
  private val bActorRef = MyFactory.actorOf(Props[BActor], "bAcator")
  private val aActorRef = MyFactory.actorOf(Props(new AActor(bActorRef)), "aAcator")
  def main(args: Array[String]): Unit = {
    var flag = true
    while (flag) {
      val consoleLine: String = StdIn.readLine()
      //pass! to send message
      aActorRef ! consoleLine
      if (consoleLine.equals("Bye")) {
        flag = false
        println("The program is about to end!")
      }
      //Sleep for 100 milliseconds
      Thread.sleep(100)
    }
  }
}

5. Simulate communication between Master and Worker processes in Spark

https://www.jianshu.com/p/43cf21b424ec

In order to deepen the understanding of the master-slave service heartbeat detection mechanism (HeartBeat), the communication between master and slave is simulated.

  1. The Worker registers with the Master, the Master completes the registration and replies to the Worker that the registration is successful (registration function)
  2. Worker sends heartbeat regularly and receives it when Master
  3. After the Master receives the Worker’s heartbeat, it needs to update the last time the Worker sent a heartbeat.
  4. Start a scheduled task for the Master, regularly detect which registered Workers have not updated their heartbeats, and delete them from the hashmap

5.1 Code Implementation

Key points: Both Worker and Master are started by sending start. The protocol is implemented using template classes, so several case classes are added. In the processing logic, pay attention to which types of messages the client should send and which types the server should accept. Class messages, you can use both together, code: https://github.com/hulichao/bigdata-code/tree/master/src/main/scala/com/hoult/scala/job/spark

Main implementation:

//master message processing
val workers = mutable.Map[String, WorkerInfo]()
  override def receive: Receive = {
    case "start" => {
      println("master running....")
      // Check timeout worker
      self ! StartTimeOutWorker
    }

    case StartTimeOutWorker => {
      println("start check timeout worker...")
      // Define a timer to check whether the worker heartbeat has timed out every once in a while.
      import context.dispatcher
      context.system.scheduler.schedule(0 millis, 9000 millis, self, RemoveTimeOutWorker)
    }
    case RemoveTimeOutWorker => {
      // Get all workerInfo in workers
      val workerInfos = workers.values
      // Get the current time
      val currentTime = System.currentTimeMillis()
      // Find the worker that has timed out for 6 seconds
      workerInfos.filter(info => (currentTime - info.lastHeartBeatTime) > 6000)
        .foreach(workers -= _.id)
      println(s"===> workers.size = ${workers.size}")
    }

    case RegisterWorkerInfo(id, cpu, ram) => {
      //Determine whether it has been registered
      if (!workers.contains(id)) {
        val info =
        // adding data
        workers + = (id -> new WorkerInfo(id, cpu, ram)) //worker list added
        println("workers => registration:" + workers)
        //Reply message if registration is successful
        sender() ! RegisteredWorkerInfo
      }

    }
    case HeartBeat(id) => {
      //Update the heartbeat time corresponding to workinfo
      // Get workerinfo from workers
      val workerInfo = workers(id)
      //Update the last heartbeat time
      workerInfo.lastHeartBeatTime = System.currentTimeMillis()
      println(s"master updated worker ${id}'s heartbeat")
    }
//worker message processing
override def receive: Receive = {

    case "start" => {
      println("worker running...")
      //Send registration information
      masterProxy ! RegisterWorkerInfo(id, 16, 16 * 1024)
    }
    case RegisteredWorkerInfo => {
      println(s"worker ${id} registered!")

      //Define a timer and tell yourself to send heartbeats every once in a while
      import context.dispatcher
      // 0 millis: execute immediately
      // 3000 millis: executed every 3 seconds
      // self: receive the object and send it to yourself
      // SendHeartBeat: send content
      context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat)
    }
    //Send heartbeat
    case SendHeartBeat => {
      println(s"worker ${id} send heartbeat to master")
      masterProxy !HeartBeat(id)
    }

5.3 Demo