2023_Spark_Experiment 11: RDD advanced operator operations

//checkpoint:

sc.setCheckpointDir("hdfs://Master:9000/ck") // Set checkpoint

val rdd = sc.textFile("hdfs://Master:9000/input/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey( _ + _) // Perform conversion of wordcount task



rdd.checkpoint // Mark this RDD for checkpointing.

rdd.isCheckpointed

rdd.count //Trigger calculation, log display: ReliableRDDCheckpointData: Done checkpointing RDD 27 to hdfs://hadoop001:9000/ck/fce48fd4-d76f-

4322-8d23-6a48d1aed7b5/rdd-27, new parent is RDD 28

rdd.isCheckpointed // res61: Boolean = true

rdd.getCheckpointFile // Option[String] = Some(hdfs://Master:9000/ck/b9a5add8-18d8-4056-9e8e-271d9522a29c/rdd-4)

coalesce:

As we all know, there are two operators repartition and coalesce in spark’s RDD programming. According to public information, both are operators that adjust the number of spark partitions.

The repartition will go through shuffle, which is actually the coalesce(shuffle=true) call.

coalesce, default shuffle=false, will not go through shuffle.

Currently, only the coalesce operator is considered. Let’s take a look at the official definition:

The general meaning is: if you want to go from 1000 partitions to 100 partitions, and without shuffle, almost evenly distribute 10 parent partitions to 1 child partition.

First, let me tell you my simple understanding: without shuffle, it means that the coalesce operator is in the same stage before and after. The parallelism degree of the iterative execution of tasks from the beginning of the stage to the coalesce operator is 1000, and the parallelism degree of the iterative execution of tasks from the beginning of the coalesce operator to the end of the stage is 100.

val rdd1 = sc.parallelize(1 to 10, 10)

//Repartition, divide into two 2, no shuffle will occur

val rdd2 = rdd1.coalesce(2, false)



// Get the new RDD partition number

rdd2.partitions.length

def func1(index:Int,iter:Iterator[Int]):Iterator[String] = {

iter.toList.map(x=>"[PartID:" + index + ",value=" + x + "]").iterator

}

// View the results after partitioning:

rdd2.mapPartitionsWithIndex(func1).collect



repartition:

val rdd1 = sc.parallelize(1 to 10, 4)

val rdd2 = rdd1.repartition(5)

collect, toArray

Convert RDD to Scala array.

collectAsMap

Similar to collect and toArray. collectAsMap converts a key-value RDD into a Scala map.

Note: If there are the same keys in the map, only the last value will be saved.

# Create a 2-partition RDD

scala> var z = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12) , ("dog", 12), ("mouse", 2)), 2)

z: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[129] at parallelize at <console>:21

# Output the data of all partitions

scala> z.collect

res44: Array[(String, Int)] = Array((cat,2), (cat,5), (mouse,4), (cat,12), (dog,12), (mouse,2))



#Convert to dictionary

scala> z.collectAsMap

res45: scala.collection.Map[String,Int] = Map(dog -> 12, cat -> 12, mouse -> 2)

scala>


collectAsMap

val rdd = sc.parallelize(List(("a", 1), ("b", 2)))

rdd.collectAsMap

//res2: scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)

combineByKey compared to aggregateByKey:

  1. 1. Similarities:

  • Both can map key values for intra-partition calculations and inter-partition calculations respectively.

2. Differences:

  • combineByKey has three parameter lists and does not require an initial value, while aggregateByKey has only two parameter lists and requires an initial value.

Schematic diagram of calculation in aggregateByKey partition

//aggregateByKey has function granularity and has two parameter lists

//The first parameter list needs to pass a parameter, expressed as an initial value

// Mainly when encountering the first key, perform intra-partition calculation with value

//The second parameter list needs to pass 2 parameters

//The first parameter represents the calculation within the partition

//The second parameter represents the calculation between partitions



rdd.aggregateByKey(zeroValue = 0)(

(x, y) => math.max(x, y),

(x, y) => x + y

).collect().foreach(println)

?

CombineByKey calculation diagram in the partition

?
//combineByKey method requires three parameters:

//The first parameter means: perform structural conversion on the first data of the same key to implement the operation

//Second parameter: Calculation rules within the partition

//The third parameter: calculation rules between partitions

val newRDD: RDD[(String,(Int,Int))] = rdd.combineByKey(v=>(v,1),
(t:(Int,Int),v:Int)=> {t._1 + v, t._2 + v},
(t1:(Int,Int),t2:(Int,Int))=>{t1._1 + t2._1,t1._2 + t2._2})

The core difference between combineByKey and aggregateByKey is that there is a slight difference in the initial calculation within the group.

combineByKey // is a method under this PairRDDFunctions class

?
val rdd1 = sc.textFile("hdfs://Master:9000/input/word.txt").flatMap(_.split(" ")).map((_, 1))

val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) // Equivalent to reduceByKey(_ + _), result: Array[(String, Int)] =

Array((is,1), (Giuyang,1), (love,2), (capital,1), (Guiyang,1), (I,2), (of,1), (Guizhou,2), (the,1))

rdd2.collect

val rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) // Combine each key Add 10 to each value, the result: Array[(String, Int)]

rdd3.collect

val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf" ,"bear","bee"), 3)

val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)

val rdd6 = rdd5.zip(rdd4)

?

1. Rdd action operator

1. [countByKey] counts the number of keys stored in tuples in RDD. If the keys are the same, it will be counted + 1. A Map will be generated through this key. The key in the Map is the original key, and the value is the number of original keys;

2. [countByValue] counts the number of elements stored in Rdd. Each tuple in RDD will be regarded as a value. If the elements in this tuple are the same, value + 1 in the Map will be generated;

3. [filterByRange] filters the elements in RDD and returns the data with the specified content. This function acts on an RDD of key-value pairs, filters the elements in the RDD, and returns the elements whose keys are in the specified range;

4. [flatMapValues] mainly flattens the values existing in the tuple;

countByKey // Count the number of occurrences of each key

val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2 ), ("c", 1)))

rdd1.countByKey

rdd1.countByValue // countByValue returns the number of occurrences of each value

filterByRange // [filterByRange] Filter the elements in rdd and return the content data in the specified range

val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2 ), ("a", 1)))

val rdd2 = rdd1.filterByRange("b", "d")

rdd2.collect

flatMapValues

val rdd3 = sc.parallelize(List(("a", "1 2"), ("b", "3 4")))

rdd3.flatMapValues(_.split(" "))

foldByKey

Function prototype:

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

Function: Fold and merge RDD[K,V] according to K, use zeroValue as the initial parameter, and call func to get V.

Then V is called according to func according to Key.

example:

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B" ,2)))

rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at <console>:27

scala> rdd1.foldByKey(0)(_ + _).collect

res3: Array[(String, Int)] = Array((A,2), (B,3))

Description: Apply 0 to _ + _, Array((“A”,0 + 0),(“A”,2 + 0)) and then further process to get Array((“A”, 0 + 2)) and finally get Array((“A”,2))

foldByKey

Function prototype:

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

Function: Fold and merge RDD[K,V] according to K, use zeroValue as the initial parameter, and call func to get V.

Then V is called according to func according to Key.

example:

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B" ,2)))

rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at <console>:27

scala> rdd1.foldByKey(0)(_ + _).collect

res3: Array[(String, Int)] = Array((A,2), (B,3))

Description: Apply 0 to _ + _, Array((“A”,0 + 0),(“A”,2 + 0)) and then further process to get Array((“A”, 0 + 2)) and finally get Array((“A”,2))

foldByKey

val rdd1 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2)

val rdd2 = rdd1.map(x => (x.length, x))

val rdd3 = rdd2.foldByKey("")(_ + _)

val rdd = sc.textFile("hdfs://Master:9000/input/word.txt").flatMap(_.split(" ")).map((_, 1))

rdd.foldByKey(0)(_ + _)

foreachPartition // foreachPartition is the action operator of spark-core. The comment in the source code of this operator is: Applies a function func to each partition of this RDD. (Apply function func to each partition of this RDD)

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)

rdd1.foreachPartition(x => println(x.reduce(_ + _)))

keyBy

val rdd1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)

val rdd2 = rdd1.keyBy(_.length)

rdd2.collect

keys values

val rdd1 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2 )

val rdd2 = rdd1.map(x => (x.length, x))

rdd2.keys.collect

rdd2.values.collect