Spark – Kernel Scheduling for RDD

1. DAG definition

The core of Spark is implemented based on RDD, and SparkScheduler is an important part of Spark core implementation, and its role is task scheduling. Spark’s task scheduling is how to organize tasks to process the data of each partition in RDD, and is built based on the dependencies of RDD strong> DAG, based onDAGdivisionStage, sends the tasks in each Stage to the specified node for execution. Based on Spark’s task scheduling principle, resource utilization can be reasonably planned to complete task calculations efficiently with the least resources possible.

Take word frequency statistics program WordCount as an example, DAG diagram:

DAG: Directed acyclic graph, DAG has direction and does not form a closed loop.

  • Directed: Has a direction;
  • Loopless: No closed loops.

1.1, Job and Action

Action: The return value is not an operator of RDD. Its function is a trigger switch, which will execute a string of rdd dependency chains before the action operator.

Conclusion

1 Action will generate 1 DAG. If there are 3 Action in the code, 3 DAG will be generated. .

That is: a DAG generated by an Action will generate a JOB when the program is running.
So: 1 ACTION = 1 DAG = 1 JOB

If 3 Action are written in a code, then the code will generate 3 JOB when run, each JOB has its own >DAGA code is run and is called : in Spark Application

Hierarchical relationship: 1Application, there can be multiple JOBEachJOBcontains aDAG , and each JOB is composed of a Actionproduced.

1.2, DAG and partitions

DAG is the logical execution graph of Spark code. The final function of this DAG is to build a physical Spark detailed execution plan. Therefore, since Spark is distributed (multi-partition), there is also a relationship between DAG and partitions.

# Assume that all RDDs are executed in 3 partitions
rdd1=sc.textFile()
rdd2 =rdd1.flatMap()
rdd3=rdd2.map()
rdd4=rdd3.reduceByKey()
rdd4.action()

2. DAG’s wide and narrow dependencies and stage division

2.1, wide and narrow dependence

The relationship between before and after SparkRDD is divided into:

  • Narrow dependency: A partition of the parent RDD sends all data to a partition of the child RDD;

  • Wide dependency (shuffle): A partition of the parent RDD sends data to the child RDD multiple partitions.

2.2. Stage division

For Spark, different DAG stages will be divided according to DAG and wide dependencies.

Division basis: From back to front, a stage is divided when wide dependencies are encountered, which is called stage.

As shown in the figure, you can see that in DAG, based on wide dependencies, DAG is divided into 2 stage; in stage internally must be: narrow dependencies.

3. Memory iterative calculation

As shown in the figure, based on DAG with partitions and stage division. The logically optimal task can be obtained from the figure, and a task is allocated to a thread for specific execution. Then the iterative calculations of rdd1 rdd2 rdd3 in task1 in the above figure are all done by one task (thread). This line at this stage , is a pure memory calculation.

As shown in the figure above, task1 task2 task3 forms three parallel memory computing pipelines.

Spark is limited by global parallelism by default. Except for some operators with special partitioning conditions, most operators will follow the requirements of global parallelism to plan the number of their own partitions. If the global parallelism is 3, in fact most operator partitions are 3.

In Spark, we generally recommend setting only the global parallelism and not setting the parallelism on the operators; except for some sorting operators, just let the calculation operators open partitions by default.

Interview questions

3.1. How does Spark perform memory calculations? What is the role of DAG? What is the role of stage division?

  • Spark will generate a DAG graph;
  • The DAG graph will be divided into stages based on partitions and wide and narrow dependencies;
  • The interior of a stage is all narrow dependencies. Within the narrow dependencies, if a 1:1 partition correspondence between the front and back is formed, many memory iterative calculation pipelines can be generated;
  • These memory generation calculation pipelines are specific executions of Task;
  • A Task is a specific thread. When a task is run in a thread, it is calculated in memory.

3.2. Why is Spark faster than MapReduce?

  • Spark has rich operators, but MapReduce operators are scarce (Map and Reduce). The programming model of MapReduce is very It is difficult to process complex tasks in a set of MR; many complex tasks require writing multiple MapReduces to connect multiple MRs in series. Disk interaction data.
  • Spark can perform memory iteration. A DAG is formed between operators. After dividing the stages based on dependencies, a memory iteration pipeline is formed within the stage. However, MapReduce‘s Map and Reduce is still through the hard disk.

Spark dominates the programming model (enough operators);
Operator interaction and calculation can use as much memory calculation as possible instead of disk iteration.

4. Spark parallelism

Spark parallelism: How many tasks are running at the same time;
Parallelism: Parallel capability setting.

For example, setting the parallelism degree to 6 actually means that 6 task are running in parallel. Under the premise of having 6 task running in parallel, the partition of rdd It is planned into 6 partitions.

4.1. How to set the degree of parallelism

It can be set in the code and in the configuration file and in the client parameters of the submitter, with priority from high to low:

  • in code

  • Client submits parameters

  • in configuration file

  • Default (1, but not all will run with 1, most of the time it is based on the number of shards to read the file as the default parallelism)

Parameters for global parallelism configuration: spark.default.parallelism

4.2. Global parallelism

  • In configuration file:
# Set in conf/spark-defaults.conf
spark.default.parallelism 100
  • In client submission parameters:
bin/spark-submit --conf "spark.default.parallelism=100"
  • Set in code:
conf = SaprkConf()
conf.set("spark.default.parallelism", "100")

The global parallelism is a recommended setting. Do not change the partition for RDD, as it may affect the construction of the memory iteration pipeline, or generate additional Shuffle.

4.3. Parallel settings for RDD (not recommended)

It can only be written in code, operator:

  • repartition operator
  • coalesceoperator
  • partitionBy operator

4.4. How to plan parallelism in a cluster

Conclusion: Set to CPU 2 ~ 10 times of the total cores, for example, the cluster can use CPU code>The number of cores is 100, and we recommend a parallelism of 200~1000.

Make sure it is an integer multiple of the CPU core. The minimum is 2 times, and the maximum is generally 10 times or higher (appropriate amount).

4.5. Why should it be set at least 2 times?

  • One core of CPU can only do one thing at a time;
  • Therefore, in the case of 100 cores, setting 100 parallels can make CPU 100% efficient;
  • Under this setting, if the pressure of task is uneven, a certain task will be executed first. This causes a certain CPU core to be idle;
  • Therefore, we increase the number of Task (parallel) allocations, for example, 800 in parallel. At the same time, only 100 are running and 700 are waiting;
  • But it can be ensured that a certain task has finished running. task will be added later to prevent cpu from being idle and maximize the use of cluster resources.

When planning parallelism, only look at the total number of CPU cores in the cluster.

5. Spark task scheduling

5.1, Spark program scheduling process

  • Driver is built;
  • Build SparkContext (execution environment entry object);
  • Build logical Task allocation based on DAG Scheduler (DAGscheduler);
  • Based on TaskScheduler (Task scheduler), allocate logical Task to each Executor to work and monitor them;
  • Worker (Executor) is managed and monitored by TaskScheduler, follows their instructions, and reports progress regularly.

1, 2, 3, and 4 are all the work of Driver, and 5 is the work of Worker.

5.2, Driver scheduling process

  • Logical DAG is generated;
  • Partition DAG is generated;
  • Task division;
  • Assign Task to Executor and monitor its work.

5.3, Two components in Driver

  • DAG scheduler: Process the logical DAG graph and finally obtain the logical Task division;
  • Task Scheduler: Based on the output of DAG Scheduler, plan these logical tasks and which physical executors should be placed code>, and monitor and manage their operation.

6. Spark hierarchical relationship

  • A Spark environment can run multiple Application;
  • When a code is run, it will become a Application;
  • There can be multiple Job inside Application;
  • Each Job is generated by an Action, and each Job has its own DAG execution graph;
  • A Job's DAG graph will be divided into different stages based on wide and narrow dependencies;
  • Based on the number of partitions in different stages, multiple parallel memory iteration pipelines are formed;
  • Each memory iteration pipeline forms a Task (the DAG scheduler divides the Job into specific task tasks , the task divided into a Job is logically called the taskset of this job).

7. Spark Shuffle

Spark will divide a Job into multiple Stage during the DAG scheduling phase, and the upstream Stage will do a map work, the downstream Stage does reduce work, which is essentially the MapReduce computing framework. Shuffle is a bridge between map and reduce. It will map corresponds to reduce input, involving serialization, deserialization, cross-node network IO and disk reading and writing IO, etc.

Spark's Shuffle is divided into two stages: Write and Read, which belong to two different Stage. The former is The last step of ParentStage, which is the first step of Child Stage.

The main body that executes Shuffle is the concurrent task in Stage. These tasks are divided into two types: ShuffleMapTask and ResultTask. ShuffleMapTask performs Shuffle, and ResultTask is responsible for returning calculation results. There is only the last Stage in a Job >Use ResultTask, others are ShuffleMapTask. If you want to analyze it according to the map side and the reduce side, ShuffleMapTask can be both a map side task and a reduce side task, because Shuffle in Spark can be serialized; ResultTask can only serve as reduce side role of the task.

8. Hash Shuffle

8.1. Stage division

  • shuffle write:mapper stage, the final result of the previous stage is written out;
  • shuffle read:reduce stage, the next stage pulls the previous stage for merging

Unoptimized hashShuffleManager : HashShuffle is based on taskhashcode%ReduceTask of the key value of the code> calculation result is used to determine which partition to put into it. This ensures that the same data must be put into one partition, Hash Shuffle The process is as follows:

Several files are generated based on the downstream task. The buffer file is first generated and then written to the disk file, and then the block files are merged. The number of disk files generated by an unoptimized shuffle write operation is extremely staggering. The following solutions are proposed.

Optimized hashShuffleManager : During the shuffle write process, task Instead of creating a disk file for each task of the downstream stage. At this time, the concept of shuffleFileGroup will appear. Each shuffleFileGroup will correspond to a batch of disk files. The number of disk files for each Group is related to the number of downstream tasks in >stage is the same.

8.2. Comparison before and after optimization

  • Without optimization:

    • Number of task in the upstream: m
    • Number of downstream task: n
    • Number of upstream executors: k (m>=k)
    • Total disk files: mn
  • *After optimization:

    • Number of task in the upstream: m
    • Number of downstream task: n
    • Number of upstream executors: k (m>=k)
    • Total disk files: kn

9. Sort Shuffle Manager

The operating mechanism of SortShuffleManager is mainly divided into two types, one is the ordinary operating mechanism and the other is the bypass operating mechanism. When the number of shuffle write tasks is less than or equal to the value of the spark.shuffle.sort.bypassMergeThreshold parameter (default is 200), bypass will be enabled mechanism.

  • In this mode, the data will first be written into a memory data structure (default 5M). At this time, different data structures may be selected according to different shuffle operators. If it is a shuffle operator of an aggregation class such as reduceByKey, then the Map data structure will be selected, while passing the Map Perform aggregation and write to memory at the same time; if it is an ordinary shuffle operator such as join, then the Array data structure will be used and written directly. Memory.

  • Every time a piece of data is written into the memory data structure, it will be judged whether it has reached a certain critical threshold. If the critical threshold is reached, then an attempt will be made to overflow the data in the memory data structure to disk, and then clear the memory data structure.

  • Sort: Before overflowing to the disk file, the existing data in the memory data structure will be sorted according to key.

  • Overflow writing: After sorting, the data will be written to the disk file in batches. The default batch number is 10,000, which means that the sorted data will be written to the disk file in batches of 10,000 data per batch.

  • merge: During the process of a task writing all data into the memory data structure, multiple disk overflow operations will occur, and multiple temporary files will be generated. Finally, all previous temporary disk files will be merged into one disk file. This is the merge process.

Since a task only corresponds to one disk file, it means that the task is the stage of the Reduce side. The data prepared by code>task is all in this file, so a separate index file will be written, which identifies the start of the data of each downstream task in the file. offset and end offset.

9.1, bypass operating mechanism

The triggering conditions of the bypass operating mechanism are as follows:

  • The number of shuffle map tasks is less than the value of the spark.shuffle.sort.bypassMergeThreshold=200 parameter.
  • A shuffle operator that is not a map combine aggregation (for example, reduceByKey has a map combie).

  • At this time, task will create a temporary disk file for each reduce side’s task and store the data according to key Perform hash, and then write key into the corresponding disk file according to the hash value of key. Of course, when writing to a disk file, it is also written to the memory buffer first, and then overflows to the disk file after the buffer is full. Finally, all temporary disk files are also merged into one disk file and a separate index file is created.
  • The disk writing mechanism of this process is actually exactly the same as the unoptimized HashShuffleManager, because an astonishing number of disk files are created, except that a disk file is merged at the end. Therefore, the small number of final disk files also makes this mechanism perform better than the unoptimized HashShuffleManager.

The difference between this mechanism and the ordinary SortShuffleManager operating mechanism is:

  • The disk writing mechanism is different;

  • No sorting will be done. In other words, the biggest benefit of enabling this mechanism is that during the shuffle write process, there is no need to sort the data, thus saving this part of the performance overhead.

9.2, Summary

  • SortShuffle is also divided into normal mechanism and bypass mechanism
  • The ordinary mechanism completes the sorting in the memory data structure (default is 5M), which will generate 2M small disk files.
  • And when the number of shuffle map tasks is less than the value of the spark.shuffle.sort.bypassMergeThreshold parameter. Or when the operator is not an aggregate shuffle operator (such as reduceByKey), the SortShuffle‘s bypass mechanism will be triggered. , SortShuffle's bypass mechanism does not perform sorting, which greatly improves its performance.

10. Shuffle configuration options

Shuffle stage division:

  • shuffle write:mapper stage, the final result of the previous stage is written out;
  • shuffle read: reduce stage, the next stage pulls the previous stage for merging.

Details

Spark’s shuffle tuning: Mainly adjusts the size of the buffer, the number of pull retries and the waiting time, memory proportional allocation, whether to perform sorting operations, etc.

  • spark.shuffle.file.buffer

    • Parameter description: This parameter is used to set the buffer buffer size of shuffle write task‘s BufferedOutputStream (default is 32K) . Before writing data to the disk file, it will be written into the buffer buffer first. After the buffer is full, it will be overwritten to the disk;
    • Tuning suggestions: If the memory resources available for the job are sufficient, you can increase the size of this parameter appropriately (such as 64k), thereby reducing disk file overflow during the shuffle write process. The number of disk IO times can be reduced, thereby improving performance. In practice, it has been found that if this parameter is properly adjusted, performance will be improved by 1% to 5%.
  • spark.reducer.maxSizeInFlight

    • Parameter description: This parameter is used to set the buffer size of the shuffle read task, and this buffer buffer determines how much data can be pulled each time . (Default 48M);
    • Tuning suggestions: If the memory resources available for the job are sufficient, you can increase the size of this parameter appropriately (for example, 96m), thereby reducing the number of data pulls, and thus the number of network transmissions. thereby improving performance. In practice, it has been found that if this parameter is properly adjusted, performance will be improved by 1% to 5%.
  • spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait

    • spark.shuffle.io.maxRetries: shuffle read task pulls its own data from the node where shuffle write task is located. If the network is abnormal, If the pull fails, it will be automatically retried. This parameter represents the maximum number of retries. (Default is 3 times);
    • spark.shuffle.io.retryWait: This parameter represents the waiting interval for each retry to pull data. (default is 5s);
    • Tuning suggestions: General tuning is to increase the number of retries without adjusting the time interval.
  • spark.shuffle.memoryFraction

    • Parameter description: This parameter represents the proportion of memory in Executor memory allocated to shuffle read task for aggregation operations.
  • spark.shuffle.manager

    • Parameter description: This parameter is used to set the type of shufflemanager (default is sort). Spark1.5x will have three options in the future:
      • Hash:The default value of spark1.x version, HashShuffleManager;
      • Sort: The default value of the spark2.x version, a normal mechanism, when the number of shuffle read tasks is less than or equal to. spark.shuffle.sort.bypassMergeThreshold parameter, automatically turns on the bypass mechanism
  • spark.shuffle.sort.bypassMergeThreshold

    • Parameter description: When ShuffleManager is SortShuffleManager, if the number of shuffle read tasks is less than this threshold (default is 200 ), then the sorting operation will not be performed during the shuffle write process;
    • Tuning suggestions: When you use SortShuffleManager, if you really do not need the sorting operation, it is recommended to increase this parameter.