3.8 Typical cases of Spark RDD

1. Use RDD to calculate the total score and average score

(1) Preparation

1. Start the HDFS service

2. Start the Spark service

3. Create a score file locally

4. Upload the score file to HDFS

(2) Complete the task
1. Complete tasks in Spark Shell
(1) Read the score file and generate RDD

(2) Define a list of binary grades

(3) Use RDD to populate the list of binary groups

(4) Create an RDD based on a list of binary grades

(5) Reduce the rdd button to get rdd1, and calculate the total score

(6) Map rdd1 to rdd2, calculate the total score and average score

2. Complete the task in IntelliJ IDEA
(1) Open the RDD project and create an object for calculating the total score and average score

package net.cxf.rdd.day07

import org.apache.spark.{<!-- -->SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer

/**
 * Function: Statistical total score and average score
 * Author: cxf
 * Date: May 11, 2023
 */
object CalculateSumAvg {<!-- -->
  def main(args: Array[String]): Unit = {<!-- -->
    // Create Spark configuration object
    val conf = new SparkConf()
      .setAppName("CalculateSumAvg ") // Set application name
      .setMaster("local[*]") // Set the location of the master node (local debugging)
    // Create a Spark container based on the Spark configuration object
    val sc = new SparkContext(conf)
    // Read the grade file and generate RDD
    val lines = sc.textFile("hdfs://master:9000/scoresumavg/input/scores.txt")
    // Define a list of 2-tuple grades
    val scores = new ListBuffer[(String, Int)]()
    // Use RDD to populate the list of 2-tuple grades
    lines.collect.foreach(line => {<!-- -->
      val fields = line. split(" ")
      scores.append((fields(0), fields(1).toInt))
      scores.append((fields(0), fields(2).toInt))
      scores.append((fields(0), fields(3).toInt))
    })
    // Create an RDD based on a list of 2-tuple grades
    val rdd = sc. makeRDD(scores);
    // Reduce the rdd key to get rdd1, and calculate the total score
    val rdd1 = rdd.reduceByKey(_ + _)
    // Map rdd1 to rdd2, calculate the total score and average score
    val rdd2 = rdd1.map(score => (score._1, score._2, (score._2 / 3.0).formatted("%.2f")))
    // Output the content of rdd2 on the console
    rdd2. collect. foreach(println)
    // Save the rdd2 content to the specified location in HDFS
    rdd2.saveAsTextFile("hdfs://master:9000/scoresumavg/output")
    // Shut down the Spark container
    sc. stop()
  }
}

Run the program to see the result

View HDFS result files

2. Use RDD to count daily new users

(1) Preparation
1. Create user files locally

2. Upload user files to HDFS specified location

(2) Complete the task
1. Complete tasks in Spark Shell
(1) Read the file and get RDD

(2) Reversing, swapping the element order of the tuples in the RDD

(3) RDD key grouping after inversion

(4) Take the minimum value of the date set after grouping, and the count is 1

(5) Count the keys to get the number of new users per day


(6) Let the output results be sorted by date in ascending order

2. Complete the task in IntelliJ IDEA
(1) Open the RDD project to create statistics and add user objects

package net.cxf.rdd.day07
import org.apache.spark.{<!-- -->SparkConf, SparkContext}
/**
 * Function: count new users
 * Author: cxf
 * Date: May 24, 2023
 */
object CountNewUsers {<!-- -->
  def main(args: Array[String]): Unit = {<!-- -->
    // Create Spark configuration object
    val conf = new SparkConf()
      .setAppName("CountNewUsers") // Set the application name
      .setMaster("local[*]") // Set the location of the master node (local debugging)
    // Create a Spark container based on the Spark configuration object
    val sc = new SparkContext(conf)
    // read the file and get RDD
    val rdd1 = sc.textFile("hdfs://master:9000/newusers/input/users.txt")
    // Invert, exchange the order of elements in the tuples in the RDD
    val rdd2 = rdd1. map(
      line => {<!-- -->
        val fields = line. split(",")
        (fields(1), fields(0))
      }
    )
    // RDD key grouping after inversion
    val rdd3 = rdd2.groupByKey()
    // Take the minimum value of the date set after grouping, and the count is 1
    val rdd4 = rdd3.map(line => (line._2.min, 1))
    // Count the keys to get the number of new users per day
    val result = rdd4.countByKey()
    // Let the statistical results be sorted by date in ascending order
    val keys = result.keys.toList.sorted
    keys.foreach(key => println(key + "Add user:" + result(key)))
    // Stop the Spark container
    sc. stop()
  }
}

Run the program to see the result

3. Use RDD to implement group leaderboard

(1) Preparation
1. Create a score file locally

2. Upload the score file to the specified directory on HDFS

(2) Complete the task
(1) Read the score file to get RDD

(2) Use the mapping operator to generate an RDD composed of two tuples

(3) Group by key to get a new RDD composed of two tuples

(4) Sort by value, take the first three

(5) Output the result in the specified format

2. Complete the task in IntelliJ IDEA
(1) Open the RDD project and create a group leaderboard singleton object

package net.cxf.rdd.day07

import org.apache.spark.{<!-- -->SparkConf, SparkContext}
/**
 * Function: score group leaderboard
 * Author: cxf
 * Date: May 24, 2023
 */
object GradeTopN {<!-- -->
  def main(args: Array[String]): Unit = {<!-- -->
    // Create Spark configuration object
    val conf = new SparkConf()
      .setAppName("GradeTopN") // Set the application name
      .setMaster("local[*]") // Set the location of the master node (local debugging)
    // Create a Spark container based on the Spark configuration object
    val sc = new SparkContext(conf)
    // implement the group leaderboard
    val top3 = sc.textFile("hdfs://master:9000/topn/input/grades.txt")
      .map(line => {<!-- -->
        val fields = line. split(" ")
        (fields(0), fields(1))
      }) // Map each row of grades into two tuples (name, grade)
      .groupByKey() // group by key
      .map(item => {<!-- -->
        val name = item._1
        val top3 = item._2.toList.sortWith(_ > _).take(3)
        (name, top3)
      }) // Sort by value, take the first three
    // Output group leaderboard results
    top3.collect.foreach(line => {<!-- -->
      val name = line._1
      val scores = line._2.mkString(" ")
      println(name + ": " + scores)
    })
    // Stop the Spark container and end the task
    sc. stop()
  }
}

Run the program to see the result