Spark Core—number of RDD partitions

1. Why should we discuss the number of partitions in RDD?

The reason is that each Task acts on each partition. If the number of partitions is changed, the number of Task tasks will be indirectly changed, which will in turn change the execution efficiency of the entire task in Spark.

2. Without shuffle operation, manually change the number of RDD partitions:

1. The number of partitions of the first RDD is determined by the slices of the file. Without shuffle, the number of partitions of the first RDD is consistent with the number of partitions of the parent RDD by default.

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo18Partition {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Demo21Partitions")
    //Change the parallelism of the stage
    //conf.set("spark.default.parallelism", "3")

    val sc: SparkContext = new SparkContext(conf)
    val stuRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")
    val partitions: Int = stuRDD.getNumPartitions
    println(partitions)
  }

}

3. Shuffle operations:
1. By default, the number is consistent with the number of parent RDDs
2. You can determine the number of RDD partitions in the operator through the numPartitions parameter or by customizing a Partitioner.
3. You can specify the spark.default.parallelism parameter, and the default number of RDD partitions is determined by this parameter.
4. The priority relationship between the three:

Manually specified numPartitions>spark.default.parallelism>defaults to the number of parent RDDs

1. By default, it is still consistent with the number of parent RDDs.

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo18Partition {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Demo18Partition")
    
    val sc: SparkContext = new SparkContext(conf)
    val stuRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")
    val partitions: Int = stuRDD.getNumPartitions
    println(s"The current number of partitions is:${partitions}")
  }
}

2. You can determine the number of RDD partitions in the operator through the numPartitions parameter or by customizing a Partitioner.


import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

object Demo19PartitionShuffle {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo19PartitionShuffle")
    conf.setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    val partitioner: MyPartitioner = new MyPartitioner(12)
    val stuRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")
    val clazz: RDD[(String, Int)] = stuRDD.map(kv => {(kv.split(",")(4), 1)})
    val partitionRDD1: RDD[(String, Int)] = clazz.reduceByKey(partitioner,(k1, k2) => {k1 + k2})
      partitionRDD1.foreach(println)
    val partitionRDD2: RDD[(String, Int)] = clazz.reduceByKey((k1, k2) => {
      k1 + k2
    },12)
    partitionRDD2.foreach(println)



    println(s"The number of partitions obtained by setting the numPartitions parameter to 12 is: ${partitionRDD1.getNumPartitions}")
    println(s"The number of partitions obtained through the custom partitioner Partitioner is: ${partitionRDD2.getNumPartitions}")
  }
}

//Custom partitioner needs to inherit Partitioner:
class MyPartitioner(num:Int) extends Partitioner{
  //Specify the number of partitions:
  override def numPartitions: Int =num
  override def getPartition(key: Any): Int ={
    valclazz: String = key.toString
    clazz match {
      case "Liberal Arts Class 1" =>0
      case "Class 2 of Liberal Arts" =>1
      case "Class 3 of Liberal Arts"=>2
      case "Liberal Arts Class 4"=>3
      case "Liberal Arts Class 5"=>4
      case "Liberal Arts Class 6" =>5
      case "Science Class 1"=>6
      case "Science Class 2"=>7
      case "Science Class 3" =>8
      case "Science Class 4"=>9
      case "Science Class 5"=>10
      case "Science Class 6"=>11
    }
  }
}

3. You can specify the spark.default.parallelism parameter, and the default number of RDD partitions is determined by this parameter.

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo20SparkDefaultParallelism {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName(this.getClass.getSimpleName)
    conf.set("spark.default.parallelism","3")
    val sc: SparkContext = new SparkContext(conf)



    val stuRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")
    val clazzRDD: RDD[(String, Int)] = stuRDD.map(kv => (kv.split(",")(4), 1))
    val countRDD: RDD[(String, Int)] = clazzRDD.reduceByKey((k1, k2) => {
      k1 + k2
    })
    
    countRDD.foreach(println)
    println(s"Number of partitions after specifying the spark.default.parallelism parameter: ${countRDD.getNumPartitions}")
  }

}

4. You can use repartition to repartition or coalesce to change the number of partitions.

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo21RepartitionCoalesce {

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
    conf.setAppName(this.getClass.getSimpleName)
    conf.setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    val stuRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")

    /**
     * Use repartition to repartition, you can increase or decrease partitions
     * def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
     * coalesce(numPartitions, shuffle = true)
     * }
     * From the bottom of the source code, we can learn that repartition actually changes shuffle to true based on coalesce, and changes the number of partitions through shuffle.
     */
    val clazzRDD: RDD[(Array[String], Int)] = stuRDD.map(kv => (kv.split(","), 1))
    val repartitionPartitions: Int = clazzRDD.getNumPartitions
    val newRepartitionPartitions: RDD[(Array[String], Int)] = clazzRDD.repartition(2)

    println(s"Number of partitions when repartition is not specified: ${repartitionPartitions}")
    println(s"Specify the number of partitions when repartitioning: ${newRepartitionPartitions.getNumPartitions}")


    /**
     * The default value of coalesce is false, so it can only reduce partitions but not increase them.
     * def coalesce(numPartitions: Int, shuffle: Boolean = false
     */

    val newCoalescePartitions: RDD[(Array[String], Int)] = clazzRDD.coalesce(2)
    val coalescePartitions: Int = clazzRDD.getNumPartitions
    println(s"No coalesce is specified to change the number of partitions: ${coalescePartitions}")
    println(s"When changing the number of partitions through coalesce: ${newCoalescePartitions.getNumPartitions}")

  }
}

?

1. In Saprk, when the parallelism of tasks in spark exceeds 2, the minimum number of partitions of RDD is required to be 2. If there is only one file, it usually corresponds to one slice, so it will correspond to one partition. At this time, if the task If there are two degrees of parallelism in the system, then in order to meet the conditions of the minimum number of partitions, one partition number will be automatically changed into two partition numbers.

2. If there is more than one file, it means that the number of slices at this time is greater than or equal to 2, then the number of partitions of the first RDD is determined according to the number of slices.