Autonomous Optimization of Spark RDD Lazy Computing

Original/Zhu Jiqian

The data in RDD (Resilient Distributed Data Set) is just like the final definition, which can only be read and cannot be modified. If you want to convert or operate RDD, you need to create a new RDD to save the result. Therefore, operators for conversion and action are needed.

Spark runs inertly. During the RDD conversion phase, only the conversion logic will be recorded but not executed. Only when an action operator is encountered, the real operation will be triggered. If there is no action operator in the entire life cycle, then the RDD The conversion code for will not run.

Such lazy calculation is actually beneficial. It needs to optimize the entire DAG (directed acyclic graph) when it encounters an action operator. The following are some optimization instructions–

The content of the sample part of this article is as follows, which can be verified based on these data–

Amy Harris, 39, male, 18561, cost-effective, household goods, Tmall, WeChat payment, 10, discounts, brand loyalty
Lori Willis,33,Female,14071,Functional,Household Supplies,Suning.com,Cash on Delivery,1,Discount,Daily Use
Jim Williams,61,Male,14145,Fashion,Auto Parts,Taobao,WeChat Pay,3,Free gifts,Gifts
Anthony Perez,19,Female,11587,Fashion,Jewelry,Pinduoduo,Alipay,5,Free gifts,Recommended products
Allison Carroll,28,Male,18292,Environmental Protection and Sustainability,Beauty and Skin Care,Vipshop,Credit Card,8,Free Gifts,Daily Use
Robert Rice,47,Male,5347,Fashion,Books,Audio,Pinduoduo,WeChat Pay,8,With coupons,Hobbies
Jason Bradley,25,Male,9480,Value for money,Auto parts,Pinduoduo,Credit card,5,Discount,Promotion discount
Joel Small, 18, female, 15586, social influence, food and drink, Amazon, Alipay, 5, no coupons, daily use
Stephanie Austin,33,Male,7653,Comfort,Auto Parts,Amazon,UnionPay,3,No coupons,follow the trend
Kathy Myers,33,Male,18159,Comfort,Beauty & Skin Care,Amazon,Cash on Delivery,4,No Coupons,Recommended Products
Gabrielle Mccarty, 57, male, 19561, environmentally friendly and sustainable, maternal and child products, Netease Kaola, Alipay, 5, free gifts, daily use
Joan Smith, 43, female, 11896, brand pursuit, books and audiovisuals, Amazon, Alipay, 4, free gifts, product recommendations
Monica Garcia,19,male,16665,fashion trends,electronic products,JD.com,cash on delivery,7,free gifts,product recommendations
Christopher Faulkner, 55, male, 3621, social influence, beauty and skin care, Suning.com, Alipay, 7, no coupons, daily use

1. Reduce unnecessary calculation

The lazy calculation of RDD can avoid unnecessary calculation by optimizing the execution plan, and at the same time, it can push down the filtering operation to the data source or before other conversion operations, reducing the amount of data to be processed, and then achieving calculation optimization.

For example, when executing the following spark code,

 def main(args: Array[String]): Unit = {<!-- -->
    val conf = new SparkConf().setMaster("local[*]").setAppName("count")
    val ss = SparkSession.builder().config(conf).getOrCreate()
    val filePath: String = "transaction_data.csv"
    val lineRDD = ss.sparkContext.textFile(filePath)
    val value = lineRDD.map {<!-- --> x => {<!-- -->
      println(s"print $x")
      x. split(",")
    } }
    value.take(10).foreach(println)
    ss. stop()
  }

If Spark is not in the case of lazy calculation, when the code runs to this line of val lineRDD = ss.sparkContext.textFile(filePath) in sequence, it will load all the tens of thousands of data in the transaction_data.csv file, and then do the calculation .

In the case of lazy calculation, the previous conversion will not be performed until the line of code value.take(10).foreach(println) is run and the foreach action operator is encountered. At this time, it will be based on the conversion of RDD Make an optimization by yourself – in this example, it will only take out the first 5 lines from transaction_data.csv based on the line of code lineRDD.take(5), avoiding taking out all the tens of thousands of data in the file.

The printing results are as follows. It is found that lineRDD.map indeed only processes the first 5 pieces of data–

Print Amy Harris, 39, male, 18561, cost-effective, home furnishings, Tmall, WeChat payment, 10, discounts, brand loyalty
Print Lori Willis, 33, female, 14071, functional, household items, Suning.com, cash on delivery, 1, discount, daily use
Print Jim Williams, 61, male, 14145, fashion trend, auto parts, Taobao, WeChat payment, 3, free gifts, gift giving
Print Anthony Perez,19,female,11587,fashion trends,jewelry,Pinduoduo,Alipay,5,free gifts,product recommendations
Print Allison Carroll, 28, male, 18292, environmentally friendly and sustainable, beauty and skin care, Vipshop, credit card, 8, free gifts, daily use
[Ljava.lang.String;@3c87e6b7
[Ljava.lang.String;@77bbadc
[Ljava.lang.String;@3c3a0032
[Ljava.lang.String;@7ceb4478
[Ljava.lang.String;@7fdab70c

2. Operation merging and optimization

When Spark executes the action operator, it will automatically merge RDD operations with continuous conversion into a more efficient execution plan, which can reduce the generation and transmission of unnecessary RDD data in the middle and improve the overall calculation efficiency. It’s like, there is a winding road in front of you, but because you have a map in your hand, you know how the road goes, so you can try to find out if there is a better road based on this map. diameter of.

Let’s use a code example to illustrate, if you need to count the number of people with a salary of more than 10,000.

The running code is to read tens of thousands of data from transaction_data.csv, and then divide each row of data into arrays according to “,”, and then filter out the data satisfying the salary greater than 10,000 based on each array, and finally make count statistics Number of people who meet the conditions.

The following is the most redundant code. Each step converts and generates a new RDD, which is continuous with each other. These RDDs will occupy memory space and increase a lot of unnecessary calculations.

def main(args: Array[String]): Unit = {<!-- -->
  val conf = new SparkConf().setMaster("local[*]").setAppName("count")
  val ss = SparkSession.builder().config(conf).getOrCreate()
  val filePath: String = "transaction_data.csv"
  val lineRDD = ss.sparkContext.textFile(filePath)
  val array = lineRDD. map(_. split(","))
  //Filter the data of salary 10000
  val valueRdd = array. filter(x => x. apply(3). toInt > 10000)
  // Count the number of people whose salary is above 10000
  val count = valueRdd. count()
  ss. stop()
}

Spark may optimize these continuous RDDs, merge them into a single conversion operation, and directly map and filter the original RDD–

val value = ss.sparkContext.textFile(filePath).map(_.split(",")).filter(x =>{<!-- -->x.apply(3).toInt > 10000 })
value. count()

This optimization also avoids multiple loop traversals, and each mapped array only needs to be traversed once.

You can set only one partition through coalesce(1) to make the code run serially, and then add printing to verify the effect–

val value = ss.sparkContext.textFile(filePath).coalesce(1).map(x =>{<!-- -->
  println(s"Split print $x")
  x. split(",")
}).filter(x =>
  {<!-- -->
    println(s"filter print ${<!-- -->x.apply(0)}")
    x.apply(3).toInt > 10000
  }
 )
value.count()

Print part of the results, and found that the mapping array and filtering are completed without traversing each time. There is no need to traverse every time like the previous multiple RDDs, so that a certain optimization effect can be achieved–

Split Print Amy Harris,39,Male,18561,Cost-effective,Household Supplies,Tmall,WeChat Pay,10,Discounts,Brand Loyalty
Filter Print Amy Harris
Split Printing Lori Willis,33,Female,14071,Functional,Household Supplies,Suning Tesco,Cash on Delivery,1,Discount,Daily Use
Filter Print Lori Willis
Split printing Jim Williams,61,Male,14145,Fashion,Auto parts,Taobao,WeChat payment,3,Free gifts,Gifts
Filter Print Jim Williams
Split printing Anthony Perez,19,Female,11587,Fashion,Jewelry,Pinduoduo,Alipay,5,Free gifts,Recommended products
filter print Anthony Perez
Split printing Allison Carroll,28,Male,18292,Environmentally friendly and sustainable,Beauty and skin care,Vipshop,Credit card,8,Free gifts,Daily use
Filter Print Allison Carroll
Split printing Robert Rice, 47, male, 5347, fashion trends, books and audiovisuals, Pinduoduo, WeChat payment, 8, with coupons, hobbies
Filter Print Robert Rice

This also reminds us that when encountering continuously converted RDDs, we can actually do code optimization on our own to avoid intermediate optimizeable RDDs and traversal operations.

3. Narrow dependency optimization

When RDD performs lazy calculation, it will perform narrow dependency optimization as much as possible.

If there is narrow dependence, there will be wide dependence. What is the difference between the two?

Narrow dependency means that each partition of the parent RDD can calculate the corresponding child RDD partition only through a simple conversion operation, and does not involve data exchange across multiple partitions, that is, each partition between the parent and child is one-to-one of.

The transformations such as map and filter mentioned above are all narrowly dependent operations.

For example, array.filter(x => x.apply(3).toInt > 10000), the parent RDD has three partitions, then the three partitions will execute array.filter(x => x.apply(3).toInt > 10000) Pass the filtered data to the partition corresponding to the child RDD–
image

Wide dependency means that each partition of the parent RDD will distribute the data of the same partition to different sub-partitions through cross-region calculations, which involves shuffle reshuffle operations, and there will be large calculations. The partition between the parent and the child is one To many. It can be seen that the data in the same partition of the parent RDD will transfer the same key to the same partition in the case of wide dependence, which means that if there are multiple different keys in the same parent RDD, it may be Distributed to multiple different sub-partitions, and then a shuffle reshuffling operation occurs.

image

Therefore, RDD will optimize narrow dependencies as much as possible, and avoid shuffle reshuffle operations without cross-region calculations, and transfer parent partitions to child partitions one-to-one. At the same time, another advantage of narrow dependency is that when a child partition loses data abnormally, it only needs to recalculate the corresponding parent partition data instead of calculating all the data in the parent partition.