Spark [RDD Programming (3) Key-value Pair RDD]

Introduction

Key-value pair RDD means that each RDD element is a key-value pair of type (key, value). It is a common RDD and can be applied to many scenarios.

Because after all, through our previous study of Hadoop, we can see that the processing of data is basically unified batch processing in the form of key-value pairs, because in the MapReduce model, the connection between Mapper and Reducer is through the key Connected with values to create a relationship.

Creation of RDD of key-value pairs

In fact, it is the creation of an RDD, which is nothing more than creation through parallel collections and creation through the file system, and then the file system is divided into local file system and HDFS.

Commonly used key-value pair RDD conversion operations

1. reduceByKey(func)

Same usage as in the previous article.

2. groupByKey(func)

Same usage as in the previous article.

3. keys

Return all keys in the key-value pair RDD to form a new RDD.

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object KV_RDD {

  def main(args: Array[String]): Unit = {
    //Create SparkContext object
    val conf = new SparkConf()
    conf.setAppName("kv_rdd").setMaster("local")
    val sc:SparkContext = new SparkContext(conf)

    //Create RDD through parallel collection
    val arr = Array(("Spark",1),("Hadoop",1),("Spark",1),("Flink",1))
    val rdd: RDD[(String, Int)] = sc.parallelize(arr)

    val res: RDD[String] = rdd.keys
    res.foreach(println)

    //Close SparkContext
   sc.stop()
  }
}

Output result:

Spark
Hadoop
Spark
Flink

4. values

Return all keys in the key-value pair RDD to form a new RDD.

//Create RDD through parallel collection
    val arr = Array(("Spark",1),("Hadoop",1),("Spark",1),("Flink",1))
    val rdd: RDD[(String, Int)] = sc.parallelize(arr)

    val res: RDD[Int] = rdd.values
    res.foreach(println)

operation result:

1
1
1
1

5. sortByKey (Boolean asce)

Returns an RDD sorted by key (lexicographically).

//Create RDD through parallel collection
    val arr = Array(("Spark",1),("Hadoop",1),("Spark",1),("Flink",1))
    val rdd: RDD[(String, Int)] = sc.parallelize(arr)

    val res: RDD[(String,Int)] = rdd.sortByKey()
    res.foreach(println)

operation result:

(Flink,1)
(Hadoop,1)
(Spark,1)
(Spark,1)

Set ascending/descending order

By default, our sortByKey() method sorts in ascending order. If you want to sort in descending order, you can pass in a false value.

//Create RDD through parallel collection
    val arr = Array(("Spark",1),("Hadoop",1),("Spark",1),("Flink",1))
    val rdd: RDD[(String, Int)] = sc.parallelize(arr)
    //descending order
    val res: RDD[(String,Int)] = rdd.sortByKey(false)
    res.foreach(println)

operation result:

(Spark,1)
(Spark,1)
(Hadoop,1)
(Flink,1)

6. sortBy()

Can be sorted based on other fields.

//Create RDD through parallel collection
    val arr = Array(("Spark",1),("Hadoop",5),("Hive",2),("Flink",3))
    val rdd: RDD[(String, Int)] = sc.parallelize(arr)

    //Sort in ascending order by value
    val res: RDD[(String,Int)] = rdd.sortBy(kv=>kv._2,true)
    res.foreach(println)

operation result:

(Spark,1)
(Hive,2)
(Flink,3)
(Hadoop,5)

7. mapValues(func)

The RDDs we processed before were all of text or numeric type. The func function in our map (func) previously processed the elements of the entire RDD. But here it is replaced by mapValues (func). The func function here processes all the values in our (key, value), and the key will not change.

//Create RDD through parallel collection
    val arr = Array(("Spark",1),("Hadoop",5),("Hive",2),("Flink",3))
    val rdd: RDD[(String, Int)] = sc.parallelize(arr)

    //All values + 1
    val res: RDD[(String,Int)] = rdd.mapValues(value=>value + 1)
    res.foreach(println)

operation result:

(Spark,2)
(Hadoop,6)
(Hive,3)
(Flink,4)

8. join()

Inner joins, (K, V1) and (K, V2) are performed to generate inner joins (K, (V1, V2)).

//Create RDD through parallel collection
    val arr1 = Array(("Spark",1),("Hadoop",5),("Hive",2),("Flink",3))
    val arr2 = Array(("Spark","fast"),("Hadoop","good"))
    val rdd1: RDD[(String,Int)] = sc.parallelize(arr1)
    val rdd2: RDD[(String,String)] = sc.parallelize(arr2)

    //All values + 1
// val res: RDD[(String,(Int,Int))] = rdd1.join(rdd2)
  val res: RDD[(String, (Int, String))] = rdd1.join(rdd2)
    res.foreach(println)

operation result:

(Spark,(1,fast))
(Hadoop,(5,good))

We can see that the elements of the returned RDD all satisfy K of the connection table rdd2.

9. combineByKey()

This function has many parameters. Here is an introduction:

  1. createCombiner: used to convert each element in the RDD into a value of type C (V=>C). This function is called when a key is encountered for the first time and is used to create an accumulator.
  2. mergeValue: used to merge each value in the RDD into an existing accumulator. This function will be called when a value with the same key is encountered.
  3. mergeCombiners: used to merge accumulator values in different partitions. This function merges the accumulator values of each partition after processing each partition.

Case – Statistics of the company’s total revenue and average revenue in three quarters

//Create RDD through parallel collection
    val arr = Array(("company-1",88),("company-1",96),("company-1",85),("company-2", 94),("company-2",86),("company-2",74),("company-3",86),("company-3",88) ,("company-3",92))

    val rdd: RDD[(String, Int)] = sc.parallelize(arr,3)

    val res: RDD[(String,Int,Float)] = rdd.combineByKey(
      income=>(income,1),
      (acc:(Int,Int),income)=>(acc._1 + income, + acc._2 + 1),
      (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1 + acc2._1,acc1._2 + acc2._2)
    ).map({
      case (key,value) => (key,value._1,value._1/value._2.toFloat)
    })
    //Redistribute partitions and merge 3 partitions into 1
    res.repartition(1).saveAsTextFile("data/kv_rdd/")

The contents of the -part-00000 file in the running results:

(company-3,266,88.666664)
(company-1,269,89.666664)
(company-2,254,84.666664)

Among them, the first column is the quarter name. The second column is total income, and the third column is average income.

Parameter analysis

The function of the first parameter is: when the RDD element we take out is the key encountered for the first time, then we create a combiner function createCombiner(), which is responsible for converting our key-value pairs (K: quarter name, V: revenue V in Amount): The income amount is converted to the format of C format (total income amount, 1), where 1 represents the current accumulated income for one month.

The second parameter is the merge value function mergeValue(). Its function is: if the same key is encountered, such as “company-1”, then the value of the same key is defined in mergeValue(). operate.

The function of the third parameter is: since we have opened multiple partitions, we need to sum up the data of different partitions in the end. What is defined in this function is to compare the key values of two C format operations performed on.

Finally, we performed a pattern matching. For the data returned in the form of (k, v), where k refers to the quarter name, v is a key-value pair (total income, number of months), we convert it to (quarter name, total income, average income).

Partition 1:
1-Call the createCombiner() function
(company-1,88) => (company-1,(88,1))
2-Call the mergeValue() function
(company-1,96) => (company-1,(184,2))
Partition 2:
1-Call the createCombiner() function
(company-1,85) => (company-1,(85,1))

3-Call mergeCombiners() function
(company-1,(184,2)) + (company-1,(85,1)) => (company-1,(269,3))

10. flatMapValues (fubc)

The operation of flatMapValues(func) is similar to mapValues(func). They all operate on RDDs of key-value pair type. mapValues (func) processes the value of (key, value) through the function func, while the key remains unchanged. FlatMapValues (func) processes the value first through the function func, and then the processed value and key form a series of new key-value pairs.

Input data:

("k1","hadoop,spark,flink")
("k2","hadoop,hive,hbase")

deal with

//Create RDD through parallel collection
    val arr = Array(("k1","hadoop,spark,flink"),("k2","hadoop,hive,hbase"))
    val rdd: RDD[(String, String)] = sc.parallelize(arr)

    //flatMapValues(func)
    //val res: Array[(String, String)] = rdd.flatMapValues(value => value.split(",")).collect()
    //mapValues(func)
    val res: Array[(String, Array[String])] =rdd.mapValues(value => value.split(",")).collect()

value.split(",")).collect()
    res.foreach(println)

operation result:

(k1,hadoop)
(k1,spark)
(k1,flink)
(k2,hadoop)
(k2,hive)
(k2,hbase)

And the RDD collection after our mapValues (func) is executed is:

(k1,Array("hadoop","spark","flink"))
(k2,Array("hadoop","hive","hbase"))

Obviously, our flatMapValues (func) performs an additional flattening operation, forming a series of key-value pairs from the elements and keys in the collection one by one.