Spark-RDD programming (1)

Introduction to RDD

RDD (Resilient Distributed Dataset) is called a distributed data set and is the most basic abstract class in Spark. It represents an immutable and partitionable collection whose elements can be calculated in parallel. In Spark, all operations on data include creating RDDs, transforming (operators) existing RDDs, and calling RDD operations for evaluation (execution). Each RDD is divided into multiple partitions, which run on different nodes in the cluster. RDD can contain any type of object in Python, Java, Scala, and even user-defined objects. RDD has the characteristics of the data flow model: automatic fault tolerance, location-aware scheduling and scalability. RDD allows users to explicitly cache the working set in memory when executing multiple queries. Subsequent queries can reuse the working set, which greatly improves query speed.

RDD Programming Basics

1.RDD creation

1.1 Create RDD through parallel collection

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test {
  def main(args: Array[String]): Unit = {
    //Create SparkConf object local[1] represents local mode, 1 partition
    val conf: SparkConf = new SparkConf().setAppName("Test").setMaster("local[1]")
    val sc = new SparkContext(conf)
    val arr: Array[Int] = Array(0, 1, 2, 3, 4)
    //Create rdd based on collection
    //Create RDD from a collection. Spark mainly provides two functions: parallelize and makeRDD.
    val rdd1: RDD[Int] = sc.parallelize(arr)
    val rdd2: RDD[Int] = sc.makeRDD(arr)
  }

1.2 Load data from the file system to create RDD

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test {
  def main(args: Array[String]): Unit = {
    //Create SparkConf object local[1] represents local mode, 1 partition
    val conf: SparkConf = new SparkConf().setAppName("Test").setMaster("local[1]")
    val sc = new SparkContext(conf)

    //Read file data, the number of RDD partitions that can be generated in textFile
    val path1 = "D:\text\data"
    val rdd1: RDD[String] = sc.textFile(path1,2)
    //You can also load data from HDFS
    val path2 = "hadoop01:9000/test/"
    val rdd2 = sc.textFile(path2, 2)
    
  }
}

2.RDD operation

RDD supports two operations: transformation (TransFormation (abstract lazy loading)) operation and action (action) operation. RDD transformation operations are operations that return a new RDD, such as map() and filter(), while action operations are operations that return results to the driver program or write results to an external system. Such as count() and first().

Spark adopts a lazy calculation mode, and the RDD will only be actually calculated the first time it is used in an action operation. Spark can optimize the entire computing process. By default, Spark’s RDDs are recomputed every time you perform an action on them. If you want to reuse the same RDD in multiple actions, you can use RDD.persist() to let Spark cache the RDD.

2.1 Conversion operation (TransFormation)

2.1.1 map(func)

Returns a new RDD, which is composed of each input element converted by the func function

 //Create rdd based on array
    val rdd1: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6))
    //Perform *10 on the elements in rdd
    val rdd2: RDD[Int] = rdd1.map(x => x * 10)
    //Print elements in rdd
    println(rdd2.collect().toBuffer)
    
    //Output results
    //ArrayBuffer(10, 20, 30, 40, 50, 60)
2.1.2 glom

Form each partition into an array and form a new RDD type [Array[T]]

 //Create RDD based on the array and specify the number of partitions as 3
    val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6),3)
    //Return the result as an array set
    val arr: Array[Array[Int]] = rdd.glom().collect()
    //Loop and print the elements in arr, it is not difficult to find that there are 3 sets
    for (elem <- arr) {
      println(elem.toBuffer)
    }
    /*
    result
    ArrayBuffer(1, 2)
    ArrayBuffer(3, 4)
    ArrayBuffer(5, 6)
     */
2.1.3 flatMap(func)

Similar to map, but each input element can be mapped to 0 or more output elements (so func should return a sequence rather than a single element)

 //Create rdd based on array
    val rdd: RDD[String] = sc.parallelize(Array("hello world", "hi spark", "scala"))
    val rdds: RDD[String] = rdd.flatMap(x => x.split(" "))
    println(rdds.collect().toBuffer) //Output result ArrayBuffer(hello, world, hi, spark, scala)

    /*
    The process of flatMap:
    RDD("hello world", "hi spark", "scala")
    First do map() =>
    RDD(Array("hello world"),Array("hi spark"),Array("scala"))
    In progress flatten =>
    RDD("hello","world",hi","sparrk","scala"))
    After flattening, our data becomes a one-dimensional collection data structure (RDD)
     */
2.1.4 partitionBy

Perform partitioning operation on RDD. If the original partitionRDD is consistent with the existing partitionRDD, no partitioning will be performed. Otherwise, ShuffleRDD will be generated.

 //Create RDD based on array and set 2 partitions;
    val rdd = sc.parallelize(List("hello", "world", "hi", "sparrk", "scala", "doing", "hi" , "jason"), 2)

    //Repartition to 4;
    val rep = rdd.repartition(4)
    rep.foreachPartition(pair => {
      println()
      println("th" + TaskContext.get.partitionId + "partition" )
      pair.foreach(p => {
        print(p + "\t")
      })
    })
    /*
    Output result:
    Partition 0
    world jason
    Partition 1
    hi scala
    2nd partition
    spark doing
    3rd partition
    hello hi
     */
2.1.5 reduceByKey(func, [numTasks])

Called on a (K, V) RDD, return a (K, V) RDD, use the specified reduce function to aggregate the values of the same key together, the number of reduce tasks can be passed through the second optional parameters to set.

 //Create RDD based on collection
    val rdd: RDD[(String, Int)] = sc.parallelize(List(("jack", 1), ("tom", 5), ("jack", 5),(\ "tom", 6), ("jack", 7)))
    //Aggregate value based on key value
    val rdds: RDD[(String, Int)] = rdd.reduceByKey((v1, v2) => v1 + v2)
    rdds.foreach(x => { //foreach() function is used to traverse the collection
      println(x._1 + " " + x._2)
    })
    /*
    Output result:
    Tom 11
    Jack 13
     */
2.1.6 groupByKey

groupByKey also operates on each key, but only generates one sequence.

 //Create RDD based on collection
    val rdd: RDD[(String, Int)] = sc.parallelize(List(("jack", 1), ("tom", 5), ("jack", 5), (\ "tom", 6), ("jack", 7)))
    //Iterable[Int] iterator
    val rdds: RDD[(String, Iterable[Int])] = rdd.groupByKey()
    rdds.collect().foreach(x => {
      println("Name:" + x._1 + "Score" + x._2)
    })
    /*
    Output results
    Name: tom score CompactBuffer(5, 6)
    Name: jack Score CompactBuffer(1, 5, 7)
     */