1. DAG
definition
The core of Spark is implemented based on RDD
, and SparkScheduler
is an important part of Spark core implementation, and its role is task scheduling. Spark’s task scheduling is how to organize tasks to process the data of each partition in RDD
, and is built based on the dependencies of RDD
strong> DAG
, based onDAG
division strong>Stage
, sends the tasks in each Stage
to the specified node for execution. Based on Spark’s task scheduling principle, resource utilization can be reasonably planned to complete task calculations efficiently with the least resources possible.
Take word frequency statistics program WordCount as an example, DAG diagram:
DAG: Directed acyclic graph, DAG
has direction and does not form a closed loop.
- Directed: Has a direction;
- Loopless: No closed loops.
1.1, Job and Action
Action: The return value is not an operator of RDD
. Its function is a trigger switch, which will execute a string of rdd
dependency chains before the action
operator.
Conclusion
1 Action
will generate 1 DAG
. If there are 3 Action
in the code, 3 DAG
will be generated. .
That is: a DAG
generated by an Action
will generate a JOB
when the program is running.
So: 1 ACTION = 1 DAG = 1 JOB
If 3 Action
are written in a code, then the code will generate 3 JOB
when run, each JOB
has its own >DAG
A code is run and is called : in Spark Application
Hierarchical relationship: 1Application
, there can be multiple JOB
EachJOB
contains aDAG
, and each JOB
is composed of a Action
produced.
1.2, DAG
and partitions
DAG
is the logical execution graph of Spark code. The final function of this DAG
is to build a physical Spark detailed execution plan. Therefore, since Spark is distributed (multi-partition), there is also a relationship between DAG
and partitions.
# Assume that all RDDs are executed in 3 partitions rdd1=sc.textFile() rdd2 =rdd1.flatMap() rdd3=rdd2.map() rdd4=rdd3.reduceByKey() rdd4.action()
2. DAG’s wide and narrow dependencies and stage division
2.1, wide and narrow dependence
The relationship between before and after SparkRDD
is divided into:
-
Narrow dependency: A partition of the parent
RDD
sends all data to a partition of the childRDD
; -
Wide dependency (
shuffle
): A partition of the parentRDD
sends data to the childRDD
multiple partitions.
2.2. Stage division
For Spark, different DAG
stages will be divided according to DAG
and wide dependencies.
Division basis: From back to front, a stage is divided when wide dependencies are encountered, which is called stage
.
As shown in the figure, you can see that in DAG
, based on wide dependencies, DAG
is divided into 2 stage
; in stage
internally must be: narrow dependencies.
3. Memory iterative calculation
As shown in the figure, based on DAG
with partitions and stage division. The logically optimal task
can be obtained from the figure, and a task
is allocated to a thread for specific execution. Then the iterative calculations of rdd1 rdd2 rdd3
in task1
in the above figure are all done by one task
(thread). This line at this stage , is a pure memory calculation.
As shown in the figure above, task1 task2 task3
forms three parallel memory computing pipelines.
Spark is limited by global parallelism by default. Except for some operators with special partitioning conditions, most operators will follow the requirements of global parallelism to plan the number of their own partitions. If the global parallelism is 3, in fact most operator partitions are 3.
In Spark, we generally recommend setting only the global parallelism and not setting the parallelism on the operators; except for some sorting operators, just let the calculation operators open partitions by default.
Interview questions
3.1. How does Spark perform memory calculations? What is the role of DAG? What is the role of stage division?
- Spark will generate a
DAG
graph; - The
DAG
graph will be divided into stages based on partitions and wide and narrow dependencies; - The interior of a stage is all narrow dependencies. Within the narrow dependencies, if a 1:1 partition correspondence between the front and back is formed, many memory iterative calculation pipelines can be generated;
- These memory generation calculation pipelines are specific executions of
Task
; - A
Task
is a specific thread. When a task is run in a thread, it is calculated in memory.
3.2. Why is Spark faster than MapReduce?
- Spark has rich operators, but
MapReduce
operators are scarce (Map
andReduce
). The programming model ofMapReduce
is very It is difficult to process complex tasks in a set ofMR
; many complex tasks require writing multipleMapReduce
s to connect multipleMR
s in series. Disk interaction data. - Spark can perform memory iteration. A
DAG
is formed between operators. After dividing the stages based on dependencies, a memory iteration pipeline is formed within the stage. However,MapReduce
‘sMap
andReduce
is still through the hard disk.
Spark dominates the programming model (enough operators);
Operator interaction and calculation can use as much memory calculation as possible instead of disk iteration.
4. Spark parallelism
Spark parallelism: How many tasks
are running at the same time;
Parallelism: Parallel capability setting.
For example, setting the parallelism degree to 6 actually means that 6
task
are running in parallel. Under the premise of having 6task
running in parallel, the partition ofrdd
It is planned into 6 partitions.
4.1. How to set the degree of parallelism
It can be set in the code and in the configuration file and in the client parameters of the submitter, with priority from high to low:
-
in code
-
Client submits parameters
-
in configuration file
-
Default (1, but not all will run with 1, most of the time it is based on the number of shards to read the file as the default parallelism)
Parameters for global parallelism configuration: spark.default.parallelism
4.2. Global parallelism
- In configuration file:
# Set in conf/spark-defaults.conf spark.default.parallelism 100
- In client submission parameters:
bin/spark-submit --conf "spark.default.parallelism=100"
- Set in code:
conf = SaprkConf() conf.set("spark.default.parallelism", "100")
The global parallelism is a recommended setting. Do not change the partition for
RDD
, as it may affect the construction of the memory iteration pipeline, or generate additionalShuffle
.
4.3. Parallel settings for RDD (not recommended)
It can only be written in code, operator:
repartition
operatorcoalesce
operatorpartitionBy
operator
4.4. How to plan parallelism in a cluster
Conclusion: Set to CPU
2 ~ 10 times of the total cores, for example, the cluster can use CPU code>The number of cores is 100, and we recommend a parallelism of 200~1000.
Make sure it is an integer multiple of the
CPU
core. The minimum is 2 times, and the maximum is generally 10 times or higher (appropriate amount).
4.5. Why should it be set at least 2 times?
- One core of
CPU
can only do one thing at a time; - Therefore, in the case of 100 cores, setting 100 parallels can make
CPU
100% efficient; - Under this setting, if the pressure of
task
is uneven, a certaintask
will be executed first. This causes a certainCPU
core to be idle; - Therefore, we increase the number of
Task
(parallel) allocations, for example, 800 in parallel. At the same time, only 100 are running and 700 are waiting; - But it can be ensured that a certain
task
has finished running.task
will be added later to preventcpu
from being idle and maximize the use of cluster resources.
When planning parallelism, only look at the total number of CPU
cores in the cluster.
5. Spark task scheduling
5.1, Spark program scheduling process
Driver
is built;- Build
SparkContext
(execution environment entry object); - Build logical
Task
allocation based onDAG Scheduler
(DAG
scheduler); - Based on
TaskScheduler
(Task
scheduler), allocate logicalTask
to eachExecutor
to work and monitor them; Worker (Executor)
is managed and monitored byTaskScheduler
, follows their instructions, and reports progress regularly.
1, 2, 3, and 4 are all the work of
Driver
, and 5 is the work ofWorker
.
5.2, Driver scheduling process
- Logical
DAG
is generated; - Partition
DAG
is generated; Task
division;- Assign
Task
toExecutor
and monitor its work.
5.3, Two components in Driver
- DAG scheduler: Process the logical
DAG
graph and finally obtain the logicalTask
division; - Task Scheduler: Based on the output of
DAG Scheduler
, plan these logicaltasks
and which physicalexecutors
should be placed code>, and monitor and manage their operation.
6. Spark hierarchical relationship
- A Spark environment can run multiple
Application
; - When a code is run, it will become a
Application
; - There can be multiple
Job
insideApplication
; - Each
Job
is generated by anAction
, and eachJob
has its ownDAG
execution graph; - A
Job
'sDAG
graph will be divided into different stages based on wide and narrow dependencies; - Based on the number of partitions in different stages, multiple parallel memory iteration pipelines are formed;
- Each memory iteration pipeline forms a
Task
(theDAG
scheduler divides theJob
into specifictask
tasks , thetask
divided into aJob
is logically called thetaskset
of thisjob
).
7. Spark Shuffle
Spark will divide a Job
into multiple Stage
during the DAG
scheduling phase, and the upstream Stage
will do a map
work, the downstream Stage
does reduce
work, which is essentially the MapReduce
computing framework. Shuffle
is a bridge between map
and reduce
. It will map
reduce
input, involving serialization, deserialization, cross-node network IO
and disk reading and writing IO
, etc.
Spark's Shuffle
is divided into two stages: Write
and Read
, which belong to two different Stage
. The former is The last step of ParentStage
, which is the first step of Child Stage
.
The main body that executes Shuffle
is the concurrent task in Stage
. These tasks are divided into two types: ShuffleMapTask
and ResultTask
. ShuffleMapTask
performs Shuffle
, and ResultTask
is responsible for returning calculation results. There is only the last Stage
in a Job
>Use ResultTask
, others are ShuffleMapTask
. If you want to analyze it according to the map
side and the reduce
side, ShuffleMapTask
can be both a map
side task and a reduce
side task, because Shuffle
in Spark can be serialized; ResultTask
can only serve as reduce
side role of the task.
8. Hash Shuffle
8.1. Stage division
- shuffle write:
mapper
stage, the final result of the previousstage
is written out; - shuffle read:
reduce
stage, the nextstage
pulls the previousstage
for merging
Unoptimized hashShuffleManager
: HashShuffle
is based on task The
hashcode%ReduceTask
of the key
value of the code> calculation result is used to determine which partition to put into it. This ensures that the same data must be put into one partition, Hash Shuffle
The process is as follows:
Several files are generated based on the downstream task
. The buffer file is first generated and then written to the disk file, and then the block
files are merged. The number of disk files generated by an unoptimized shuffle write
operation is extremely staggering. The following solutions are proposed.
Optimized hashShuffleManager
: During the shuffle write
process, task
Instead of creating a disk file for each task
of the downstream stage
. At this time, the concept of shuffleFileGroup
will appear. Each shuffleFileGroup
will correspond to a batch of disk files. The number of disk files for each Group
is related to the number of downstream tasks
in >stage is the same.
8.2. Comparison before and after optimization
-
Without optimization:
- Number of
task
in the upstream: m - Number of downstream
task
: n - Number of upstream
executors
: k (m>=k) - Total disk files: mn
- Number of
-
*After optimization:
-
- Number of
task
in the upstream: m - Number of downstream
task
: n - Number of upstream
executors
:k (m>=k)
- Total disk files: kn
- Number of
9. Sort Shuffle Manager
The operating mechanism of SortShuffleManager
is mainly divided into two types, one is the ordinary operating mechanism and the other is the bypass
operating mechanism. When the number of shuffle write tasks
is less than or equal to the value of the spark.shuffle.sort.bypassMergeThreshold
parameter (default is 200), bypass
will be enabled mechanism.
-
In this mode, the data will first be written into a memory data structure (default 5M). At this time, different data structures may be selected according to different
shuffle
operators. If it is ashuffle
operator of an aggregation class such asreduceByKey
, then theMap
data structure will be selected, while passing theMap
Perform aggregation and write to memory at the same time; if it is an ordinaryshuffle
operator such asjoin
, then theArray
data structure will be used and written directly. Memory. -
Every time a piece of data is written into the memory data structure, it will be judged whether it has reached a certain critical threshold. If the critical threshold is reached, then an attempt will be made to overflow the data in the memory data structure to disk, and then clear the memory data structure.
-
Sort: Before overflowing to the disk file, the existing data in the memory data structure will be sorted according to
key
. -
Overflow writing: After sorting, the data will be written to the disk file in batches. The default
batch
number is 10,000, which means that the sorted data will be written to the disk file in batches of 10,000 data per batch. -
merge: During the process of a
task
writing all data into the memory data structure, multiple disk overflow operations will occur, and multiple temporary files will be generated. Finally, all previous temporary disk files will be merged into one disk file. This is themerge
process.
Since a task
only corresponds to one disk file, it means that the task
is the stage
of the Reduce
side. The data prepared by code>task is all in this file, so a separate index file will be written, which identifies the start of the data of each downstream
and task
in the file. offsetend offset
.
9.1, bypass operating mechanism
The triggering conditions of the bypass
operating mechanism are as follows:
- The number of
shuffle map tasks
is less than the value of thespark.shuffle.sort.bypassMergeThreshold=200
parameter. - A
shuffle
operator that is not amap combine
aggregation (for example,reduceByKey
has amap combie
).
- At this time,
task
will create a temporary disk file for eachreduce
side’stask
and store the data according tokey
Performhash
, and then writekey
into the corresponding disk file according to thehash
value ofkey
. Of course, when writing to a disk file, it is also written to the memory buffer first, and then overflows to the disk file after the buffer is full. Finally, all temporary disk files are also merged into one disk file and a separate index file is created. - The disk writing mechanism of this process is actually exactly the same as the unoptimized
HashShuffleManager
, because an astonishing number of disk files are created, except that a disk file is merged at the end. Therefore, the small number of final disk files also makes this mechanism perform better than the unoptimizedHashShuffleManager
.
The difference between this mechanism and the ordinary SortShuffleManager
operating mechanism is:
-
The disk writing mechanism is different;
-
No sorting will be done. In other words, the biggest benefit of enabling this mechanism is that during the
shuffle write
process, there is no need to sort the data, thus saving this part of the performance overhead.
9.2, Summary
SortShuffle
is also divided into normal mechanism andbypass
mechanism- The ordinary mechanism completes the sorting in the memory data structure (default is 5M), which will generate 2M small disk files.
- And when the number of
shuffle map tasks
is less than the value of thespark.shuffle.sort.bypassMergeThreshold
parameter. Or when the operator is not an aggregateshuffle
operator (such asreduceByKey
), theSortShuffle
‘sbypass
mechanism will be triggered. ,SortShuffle's bypass
mechanism does not perform sorting, which greatly improves its performance.
10. Shuffle configuration options
Shuffle
stage division:
- shuffle write:
mapper
stage, the final result of the previousstage
is written out; - shuffle read:
reduce
stage, the nextstage
pulls the previousstage
for merging.
Details
Spark’s shuffle
tuning: Mainly adjusts the size of the buffer, the number of pull retries and the waiting time, memory proportional allocation, whether to perform sorting operations, etc.
-
spark.shuffle.file.buffer
- Parameter description: This parameter is used to set the
buffer
buffer size ofshuffle write task
‘sBufferedOutputStream
(default is 32K) . Before writing data to the disk file, it will be written into thebuffer
buffer first. After the buffer is full, it will be overwritten to the disk; - Tuning suggestions: If the memory resources available for the job are sufficient, you can increase the size of this parameter appropriately (such as 64k), thereby reducing disk file overflow during the
shuffle write
process. The number of disk IO times can be reduced, thereby improving performance. In practice, it has been found that if this parameter is properly adjusted, performance will be improved by 1% to 5%.
- Parameter description: This parameter is used to set the
-
spark.reducer.maxSizeInFlight
- Parameter description: This parameter is used to set the buffer size of the
shuffle read task
, and thisbuffer
buffer determines how much data can be pulled each time . (Default 48M); - Tuning suggestions: If the memory resources available for the job are sufficient, you can increase the size of this parameter appropriately (for example, 96m), thereby reducing the number of data pulls, and thus the number of network transmissions. thereby improving performance. In practice, it has been found that if this parameter is properly adjusted, performance will be improved by 1% to 5%.
- Parameter description: This parameter is used to set the buffer size of the
-
spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait
- spark.shuffle.io.maxRetries:
shuffle read task
pulls its own data from the node whereshuffle write task
is located. If the network is abnormal, If the pull fails, it will be automatically retried. This parameter represents the maximum number of retries. (Default is 3 times); - spark.shuffle.io.retryWait: This parameter represents the waiting interval for each retry to pull data. (default is 5s);
- Tuning suggestions: General tuning is to increase the number of retries without adjusting the time interval.
- spark.shuffle.io.maxRetries:
-
spark.shuffle.memoryFraction
- Parameter description: This parameter represents the proportion of memory in
Executor
memory allocated toshuffle read task
for aggregation operations.
- Parameter description: This parameter represents the proportion of memory in
-
spark.shuffle.manager
- Parameter description: This parameter is used to set the type of
shufflemanager
(default issort
).Spark1.5x
will have three options in the future:- Hash:The default value of
spark1.x
version,HashShuffleManager
; - Sort: The default value of the
spark2.x
version, a normal mechanism, when the number ofshuffle read tasks
is less than or equal to.spark.shuffle.sort.bypassMergeThreshold
parameter, automatically turns on thebypass
mechanism
- Hash:The default value of
- Parameter description: This parameter is used to set the type of
-
spark.shuffle.sort.bypassMergeThreshold
- Parameter description: When
ShuffleManager
isSortShuffleManager
, if the number ofshuffle read tasks
is less than this threshold (default is 200 ), then the sorting operation will not be performed during theshuffle write
process; - Tuning suggestions: When you use
SortShuffleManager
, if you really do not need the sorting operation, it is recommended to increase this parameter.
- Parameter description: When