Spark [RDD Programming (4) Comprehensive Case]

Case 1-The value of TOP N data

Input data:

1,1768,50,155
2,1218,600,211
3,2239,788,242
4,3101,28,599
5,4899,290,129
6,3110,54,1201
7,4436,259,877
8,2369,7890,27

Processing code:

def main(args: Array[String]): Unit = {
    //Create SparkContext object
    val conf:SparkConf = new SparkConf()
    conf.setAppName("test1").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    var index: Int = 0
    //Create an RDD object by loading data from the local file system
    val rdd: RDD[String] = sc.textFile("data/file1.txt")
    rdd.filter(line=>line.split(",").length == 4)
      .map(line=>line.split(",")(2))
      .map(word=>(word.toInt,1))
      .sortByKey(false)
      .map(kv=>kv._1).take(5)
      .foreach(key=>{
          index + = 1
          println(index + s"\t$key")
        }
      )

    //Close the SparkContext object
    sc.stop()
  }

Code analysis:

  • sc.textFile("data/file1.txt"): Create RDD objects by loading local files
    
  • rdd.filter(line=>line.split(",").length == 4): Ensure data integrity
  • map(line=>line.split(",")(2)): Separate a line of strings by commas to form an Array array and remove the third character in the array
  • map(word=>(word.toInt,1)): Because our sortByKey method operates on key-value pairs, we must convert the value we took out above into a key in the form of (value, x) value pair. 
  • sortByKey(false): Set the parameter to false to sort in descending order. 
  • map(kv=>kv._1).take(5): Take out the top five. 

operation result:

1 7890
2 788
3 600
4 290
5 259

Case 2-File sorting

Requirements: Input three files (one number per line), and output one file. The text format in the file is (serial number value).

rdd.map(num => (num.toInt,1))
      .partitionBy(new HashPartitioner(1))
      .sortByKey().map(t=>{
      index + = 1
      (index,t._1)
    }).foreach(println) //Only calling the action operation statement will trigger the real calculation from beginning to end

We will find that if we do not call the foreach action but directly output in the conversion operation, no result will be output, so we must call the action.

Moreover, we must merge the partitions, because in a distributed environment, only by merging multiple partitions into one partition can the overall result be ordered. (Although we are testing locally and the data source is a file in a directory, we must also consider the situation in a distributed environment)

operation result:

(1,1)
(2,4)
(3,5)
(4,12)
(5,16)
(6,25)
(7,33)
(8,37)
(9,39)
(10,40)
(11,45)

Case 3-Secondary Sorting

Requirement: Sort the data in the format of (numeric value). If the first value is the same, compare the second value.

import com.study.spark.core.rdd
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

class SecondarySortKey(val first:Int,val second:Int) extends Ordered[SecondarySortKey] with Serializable {
  override def compare(other: SecondarySortKey): Int = {
    if (this.first - other.first != 0) {
      this.first - other.first
    }else{
      this.second-other.second
    }
  }
}
object SecondarySortKey{
  def main(args: Array[String]): Unit = {
    val conf:SparkConf = new SparkConf()
    conf.setAppName("test3").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    val rdd: RDD[String] = sc.textFile("data/sort/test03.txt")
    val rdd2: RDD[(SecondarySortKey, String)] = rdd.map(line => (new SecondarySortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt), line))
    rdd2.sortByKey(false).map(t=>t._2).foreach(println)

    sc.stop()
  }
}

Here we use a custom class and inherit the two traits Ordered and Serializable in order to implement custom sorting rules. Among them, the mix-in of the Ordered trait requires rewriting its compare method to implement our custom comparison rules, while the mix-in function of Serializable is to enable our objects to be serialized so that they can be transmitted over the network.

operation result:

8 3
5 6
5 3
4 9
4 7
3 2
1 6

Case 4-Average Grade

Three files are given for three subjects, and the average score of each student is required to be calculated.

//Read data
    val rdd: RDD[String] = sc.textFile("data/rdd/test3")
    rdd.map(line=>(line.split(" ")(0),line.split(" ")(1).toInt))
      .map(t=>(t._1,(t._2,1)))
      .reduceByKey((t1,t2)=>(t1._1 + t2._1,t1._2 + t2._2))
      .mapValues(t=>t._1/t._2.toFloat)
      .foreach(println)

operation result:

(Xiaoxin,88.333336)
(Xiaoli,88.666664)
(Xiao Ming, 89.666664)
(Xiaohong,83.666664)

Comprehensive case

Input data format: (name, course name, grades)

Aaron,OperatingSystem,100
Aaron,Python,50
Aaron,ComputerNetwork,30
Aaron,Software,94
Abbott,DataBase,18
Abbott,Python,82
Abbott,ComputerNetwork,76
Abel,Algorithm,30
Abel,DataStructure,38
Abel,OperatingSystem,38
...
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

objectRDDPractice{
  def main(args: Array[String]): Unit = {
    val conf:SparkConf = new SparkConf()
    conf.setAppName("test-last").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    val rdd: RDD[String] = sc.textFile("data/chapter5-data1.txt")
    //(1)How many students are there in this department?
    val nums: Long = rdd.map(line => line.split(",")(0)).distinct().count()
    println("This department has a total of " + nums + " students")
    //(2)How many courses does the department offer?
    val course_nums: Long = rdd.map(line => line.split(",")(1)).distinct().count()
    println("This department has a total of " + course_nums + " courses")
    //(3)What are the total score and average score of student Tom?
    val score: Double = rdd.filter(line => line.contains("Tom")).map(line => line.split(",")(2).toInt).sum()
    val avg: Double = score/rdd.filter(line => line.contains("Tom")).map(line=>line.split(",")(1)).count()
    println("Tom's total score is " + score + ", and his average score is " + avg)
    //(4) Find the number of elective courses taken by each student
    rdd.map(line=>(line.split(",")(0),line.split(",")(1))) //(student name, course name)
      .mapValues(v => (v,1)) //(student name, (course name,1))
      .reduceByKey((k,v)=>("",k._2 + v._2)) //(student name,("",1 + 1 + 1)) Total number of merged courses
      .mapValues(x => x._2) //(student name, total number of courses)
      .foreach(println)

    //(5)How many students are taking the DataBase course in this department?
    val l = rdd.filter(line => line.split(",")(1) == "DataBase").count()
    println("The number of people taking the DataBase course is " + l)

    //(6)What is the average score of each course?
    //(student, course name, grade) => total course grade/number of students in the course
    val res: RDD[(String, Float)] = rdd.map(line => (line.split(",")(1), line.split(",")(2).toInt)) //( Course name, grade)
      .combineByKey(
        score => (score, 1), //(score, 1)
        (acc: (Int, Int), score) => (acc._1 + score, acc._2 + 1), //(score 1 + score 2,1 + 1)
        (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) //(score + score,1 + 1)
      ).map({
      case (key, value) => (key, value._1 / value._2.toFloat) //(course name, total course grades/course number)
    })

    res.saveAsTextFile("data/rdd/practice")
    

    sc.stop()
  }
}

operation result:

There are 265 students in this department
There are a total of 8 courses in this department
Tom's overall grade is 154.0 and his average grade is 30.8
(Ford,3)
(Lionel,4)
(Verne,3)
(Lennon,4)
(Joshua,4)
(Marvin,3)
(Marsh,4)
(Bartholomew,5)
(Conrad,2)
(Armand,3)
(Jonathan,4)
(Broderick,3)
(Brady,5)
(Derrick,6)
(Rod,4)
(Willie,4)
(Walter,4)
(Boyce,2)
(Duncann,5)
(Elvis,2)
(Elmer,4)
(Bennett,6)
(Elton,5)
(Jo,5)
(Jim,4)
(Adonis,5)
(Abel,4)
(Peter,4)
(Alvis,6)
(Joseph,3)
(Raymondt,6)
(Kerwin,3)
(Wright,4)
(Adam,3)
(Borg,4)
(Sandy,1)
(Ben,4)
(Miles,6)
(Clyde,7)
(Francis,4)
(Dempsey,4)
(Ellis,4)
(Edward,4)
(Mick,4)
(Cleveland,4)
(Luthers,5)
(Virgil,5)
(Ivan,4)
(Alvin,5)
(Dick,3)
(Bevis,4)
(Leo,5)
(Saxon,7)
(Armstrong,2)
(Hogan,4)
(Sid,3)
(Blair,4)
(Colbert,4)
(Lucien,5)
(Kerr,4)
(Montague,3)
(Giles,7)
(Kevin,4)
(Uriah,1)
(Jeffrey,4)
(Simon,2)
(Elijah,4)
(Greg,4)
(Colin,5)
(Arlen,4)
(Maxwell,4)
(Payne,6)
(Kennedy,4)
(Spencer,5)
(Kent,4)
(Griffith,4)
(Jeremy,6)
(Alan,5)
(Andrew,4)
(Jerry,3)
(Donahue,5)
(Gilbert,3)
(Bishop,2)
(Bernard,2)
(Egbert,4)
(George,4)
(Noah,4)
(Bruce,3)
(Mike,3)
(Frank,3)
(Boris,6)
(Tony,3)
(Christ,2)
(Ken,3)
(Milo,2)
(Victor,2)
(Clare,4)
(Nigel,3)
(Christopher,4)
(Robin,4)
(Chad,6)
(Alfred,2)
(Woodrow,3)
(Rory,4)
(Dennis,4)
(Ward,4)
(Chester,6)
(Emmanuel,3)
(Stan,3)
(Jerome,3)
(Corey,4)
(Harvey,7)
(Herbert,3)
(Maurice,2)
(Merle,3)
(Les,6)
(Bing,6)
(Charles,3)
(Clement,5)
(Leopold,7)
(Brian,6)
(Horace,5)
(Sebastian,6)
(Bernie,3)
(Basil,4)
(Michael,5)
(Ernest,5)
(Tom,5)
(Vic,3)
(Eli,5)
(Duke,4)
(Alva,5)
(Lester,4)
(Hayden,3)
(Bertram,3)
(Bart,5)
(Adair,3)
(Sidney,5)
(Bowen,5)
(Roderick,4)
(Colby,4)
(Jay,6)
(Meredith,4)
(Harold,4)
(Max,3)
(Scott,3)
(Barton,1)
(Elliot,3)
(Matthew,2)
(Alexander,4)
(Todd,3)
(Wordsworth,4)
(Geoffrey,4)
(Devin,4)
(Donald,4)
(Roy,6)
(Harry,4)
(Abbott,3)
(Baron,6)
(Mark,7)
(Lewis,4)
(Rock,6)
(Eugene,1)
(Aries,2)
(Samuel,4)
(Glenn,6)
(Will,3)
(Gerald,4)
(Henry,2)
(Jesse,7)
(Bradley,2)
(Merlin,5)
(Monroe,3)
(Hobart,4)
(Ron,6)
(Archer,5)
(Nick,5)
(Louis,6)
(Len,5)
(Randolph,3)
(Benson,4)
(John,6)
(Abraham,3)
(Benedict,6)
(Marico,6)
(Berg,4)
(Aldrich,3)
(Lou,2)
(Brook,4)
(Ronald,3)
(Pete,3)
(Nicholas,5)
(Bill,2)
(Harlan,6)
(Tracy,3)
(Gordon,4)
(Alston,4)
(Andy,3)
(Bruno,5)
(Beck,4)
(Phil,3)
(Barry,5)
(Nelson,5)
(Antony,5)
(Rodney,3)
(Truman,3)
(Marlon,4)
(Don,2)
(Philip,2)
(Sean,6)
(Webb,7)
(Solomon,5)
(Aaron,4)
(Blake,4)
(Amos,5)
(Chapman,4)
(Jonas,4)
(Valentine,8)
(Angelo,2)
(Boyd,3)
(Benjamin,4)
(Winston,4)
(Allen,4)
(Evan,3)
(Albert,3)
(Newman,2)
(Jason,4)
(Hilary,4)
(William,6)
(Dean,7)
(Claude,2)
(Booth,6)
(Channing,4)
(Jeff,4)
(Webster,2)
(Marshall,4)
(Cliff,5)
(Dominic,4)
(Upton,5)
(Herman,3)
(Levi,2)
(Clark,6)
(Hiram,6)
(Drew,5)
(Bert,3)
(Alger,5)
(Brandon,5)
(Antonio,3)
(Elroy,5)
(Leonard,2)
(Adolph,4)
(Blithe,3)
(Kenneth,3)
(Perry,5)
(Matt,4)
(Eric,4)
(Archibald,5)
(Martin,3)
(Kim,4)
(Clarence,7)
(Vincent,5)
(Winfred,3)
(Christian,2)
(Bob,3)
(Enoch,3)
The number of people taking the DataBase course is 126

What is the average score of each course, output file:

(CLanguage,50.609375)
(Software,50.909092)
(Python,57.82353)
(Algorithm,48.833332)
(DataStructure,47.572517)
(DataBase,50.539684)
(ComputerNetwork,51.90141)
(OperatingSystem,54.9403)

Analysis

(1)How many students are there in this department

First, use the map conversion operation to extract all student names from the data, then use the conversion operation distinct function to remove duplicates, and finally use the action operation count to perform statistics.

 //(1)How many students are there in this department?
    val nums: Long = rdd.map(line => line.split(",")(0)).distinct().count()

(2)How many courses does the department offer in total?

Same as (1), except that we extract all course names.

 //(2)How many courses does the department offer?
    val course_nums: Long = rdd.map(line => line.split(",")(1)).distinct().count()

(3) What are student Tom’s total scores and average scores

For the total score, use the filter function filter to extract the data rows containing “Tom”, then convert a row of strings into multiple fields and extract the values of the score fields and sum them.

For grade point average, we calculate the number of subjects and divide it by the total grade.

//(3)What are the total score and average score of student Tom?
    val score: Double = rdd.filter(line => line.contains("Tom")).map(line => line.split(",")(2).toInt).sum()
    val avg: Double = score/rdd.filter(line => line.contains("Tom")).map(line=>line.split(",")(1)).count()
    println("Tom's total score is " + score + ", and his average score is " + avg)

(4) Find the number of elective courses taken by each student

First, take out the student name and course name, and use the student name as the key. The course name is converted into the form of (course name, 1) through the mapValues function. For the same student, the number of courses is accumulated through the reduceByKey function, and the key-value pair is converted through the mapValues function. The value in the form is converted to a single value – the total number of courses.

//(4) Find the number of elective courses taken by each student
    rdd.map(line=>(line.split(",")(0),line.split(",")(1))) //(student name, course name)
      .mapValues(v => (v,1)) //(student name, (course name,1))
      .reduceByKey((k,v)=>("",k._2 + v._2)) //(student name,("",1 + 1 + 1)) Total number of merged courses
      .mapValues(x => x._2) //(student name, total number of courses)
      .foreach(println)

(5)How many students are taking the DataBase course in this department

Directly use the count function to count the data rows whose field 1 is “DataBase”.

//(5)How many students are taking the DataBase course in this department?
    val l = rdd.filter(line => line.split(",")(1) == "DataBase").count()
    

(6)What is the average score of each course

Through the combineByKey function, the value (score) corresponding to each key (course) is converted into the form of (score, 1),

Then merge the values (grades, 1) of the same key (course), and accumulate the grades and times.

The same is true for the data in different partitions. The scores and times are accumulated.

Finally, output it in the required format (course name, total grade/total number of times = course average grade)

//(6)What is the average score of each course?
    //(student, course name, grade) => total course grade/number of students in the course
    val res: RDD[(String, Float)] = rdd.map(line => (line.split(",")(1), line.split(",")(2).toInt)) //( Course name, grade)
      .combineByKey(
        score => (score, 1), //(score, 1)
        (acc: (Int, Int), score) => (acc._1 + score, acc._2 + 1), //(score 1 + score 2,1 + 1)
        (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) //(score + score,1 + 1)
      ).map({
      case (key, value) => (key, value._1 / value._2.toFloat) //(course name, total course grades/course number)
    })

In addition, you can also use reduceByKey to solve the problem. The principles of the two are the same:

rdd.map(line => (line.split(",")(1), line.split(",")(2).toInt))
      .map(t => (t._1, (t._2, 1)))
      .reduceByKey((t1, t2) => (t1._1 + t2._1, t1._2 + t2._2))
      .map(t => (t._1, t._2._1 / t._2._2.toFloat)) //This line of code can be replaced with mapValues(), because we only operate on value, key does not need to change
      .foreach(println)