Introduction to RDD
RDD (Resilient Distributed Dataset) is called a distributed data set and is the most basic abstract class in Spark. It represents an immutable and partitionable collection whose elements can be calculated in parallel. In Spark, all operations on data include creating RDDs, transforming (operators) existing RDDs, and calling RDD operations for evaluation (execution). Each RDD is divided into multiple partitions, which run on different nodes in the cluster. RDD can contain any type of object in Python, Java, Scala, and even user-defined objects. RDD has the characteristics of the data flow model: automatic fault tolerance, location-aware scheduling and scalability. RDD allows users to explicitly cache the working set in memory when executing multiple queries. Subsequent queries can reuse the working set, which greatly improves query speed.
RDD Programming Basics
1.RDD creation
1.1 Create RDD through parallel collection
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Test { def main(args: Array[String]): Unit = { //Create SparkConf object local[1] represents local mode, 1 partition val conf: SparkConf = new SparkConf().setAppName("Test").setMaster("local[1]") val sc = new SparkContext(conf) val arr: Array[Int] = Array(0, 1, 2, 3, 4) //Create rdd based on collection //Create RDD from a collection. Spark mainly provides two functions: parallelize and makeRDD. val rdd1: RDD[Int] = sc.parallelize(arr) val rdd2: RDD[Int] = sc.makeRDD(arr) }
1.2 Load data from the file system to create RDD
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Test { def main(args: Array[String]): Unit = { //Create SparkConf object local[1] represents local mode, 1 partition val conf: SparkConf = new SparkConf().setAppName("Test").setMaster("local[1]") val sc = new SparkContext(conf) //Read file data, the number of RDD partitions that can be generated in textFile val path1 = "D:\text\data" val rdd1: RDD[String] = sc.textFile(path1,2) //You can also load data from HDFS val path2 = "hadoop01:9000/test/" val rdd2 = sc.textFile(path2, 2) } }
2.RDD operation
RDD supports two operations: transformation (TransFormation (abstract lazy loading)) operation and action (action) operation. RDD transformation operations are operations that return a new RDD, such as map() and filter(), while action operations are operations that return results to the driver program or write results to an external system. Such as count() and first().
Spark adopts a lazy calculation mode, and the RDD will only be actually calculated the first time it is used in an action operation. Spark can optimize the entire computing process. By default, Spark’s RDDs are recomputed every time you perform an action on them. If you want to reuse the same RDD in multiple actions, you can use RDD.persist() to let Spark cache the RDD.
2.1 Conversion operation (TransFormation)
2.1.1 map(func)
Returns a new RDD, which is composed of each input element converted by the func function
//Create rdd based on array val rdd1: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6)) //Perform *10 on the elements in rdd val rdd2: RDD[Int] = rdd1.map(x => x * 10) //Print elements in rdd println(rdd2.collect().toBuffer) //Output results //ArrayBuffer(10, 20, 30, 40, 50, 60)
2.1.2 glom
Form each partition into an array and form a new RDD type [Array[T]]
//Create RDD based on the array and specify the number of partitions as 3 val rdd: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6),3) //Return the result as an array set val arr: Array[Array[Int]] = rdd.glom().collect() //Loop and print the elements in arr, it is not difficult to find that there are 3 sets for (elem <- arr) { println(elem.toBuffer) } /* result ArrayBuffer(1, 2) ArrayBuffer(3, 4) ArrayBuffer(5, 6) */
2.1.3 flatMap(func)
Similar to map, but each input element can be mapped to 0 or more output elements (so func should return a sequence rather than a single element)
//Create rdd based on array val rdd: RDD[String] = sc.parallelize(Array("hello world", "hi spark", "scala")) val rdds: RDD[String] = rdd.flatMap(x => x.split(" ")) println(rdds.collect().toBuffer) //Output result ArrayBuffer(hello, world, hi, spark, scala) /* The process of flatMap: RDD("hello world", "hi spark", "scala") First do map() => RDD(Array("hello world"),Array("hi spark"),Array("scala")) In progress flatten => RDD("hello","world",hi","sparrk","scala")) After flattening, our data becomes a one-dimensional collection data structure (RDD) */
2.1.4 partitionBy
Perform partitioning operation on RDD. If the original partitionRDD is consistent with the existing partitionRDD, no partitioning will be performed. Otherwise, ShuffleRDD will be generated.
//Create RDD based on array and set 2 partitions; val rdd = sc.parallelize(List("hello", "world", "hi", "sparrk", "scala", "doing", "hi" , "jason"), 2) //Repartition to 4; val rep = rdd.repartition(4) rep.foreachPartition(pair => { println() println("th" + TaskContext.get.partitionId + "partition" ) pair.foreach(p => { print(p + "\t") }) }) /* Output result: Partition 0 world jason Partition 1 hi scala 2nd partition spark doing 3rd partition hello hi */
2.1.5 reduceByKey(func, [numTasks])
Called on a (K, V) RDD, return a (K, V) RDD, use the specified reduce function to aggregate the values of the same key together, the number of reduce tasks can be passed through the second optional parameters to set.
//Create RDD based on collection val rdd: RDD[(String, Int)] = sc.parallelize(List(("jack", 1), ("tom", 5), ("jack", 5),(\ "tom", 6), ("jack", 7))) //Aggregate value based on key value val rdds: RDD[(String, Int)] = rdd.reduceByKey((v1, v2) => v1 + v2) rdds.foreach(x => { //foreach() function is used to traverse the collection println(x._1 + " " + x._2) }) /* Output result: Tom 11 Jack 13 */
2.1.6 groupByKey
groupByKey also operates on each key, but only generates one sequence.
//Create RDD based on collection val rdd: RDD[(String, Int)] = sc.parallelize(List(("jack", 1), ("tom", 5), ("jack", 5), (\ "tom", 6), ("jack", 7))) //Iterable[Int] iterator val rdds: RDD[(String, Iterable[Int])] = rdd.groupByKey() rdds.collect().foreach(x => { println("Name:" + x._1 + "Score" + x._2) }) /* Output results Name: tom score CompactBuffer(5, 6) Name: jack Score CompactBuffer(1, 5, 7) */