9.spark adaptive query-AQE’s dynamic adjustment of Join strategy

Directory

  • Overview
  • Dynamically adjust Join strategy
    • principle
    • Actual combat
  • Dynamically Optimize Skewed Join
    • principle
    • Actual combat

Overview

broadcast hash join is similar to broadcast variable in Spark shared variables. If Spark join can adopt this strategy, the join performance will be the best.

  • Adaptive Query Execution (Adaptive Query Execution)
    • Dynamically adjust Join strategy
      • principle
      • Actual combat
    • Dynamically Optimize Skewed Join
      • principle
      • Default environment configuration
      • Change setting

Dynamic adjustment of Join strategy

In fact, limitations in production, especially in factories, when designing tables, are not so reasonable. As a result, this situation is rare and difficult to adjust.

Principle

AQE can convert sort-merge join into broadcast hash join, provided that the join table is smaller than the adaptive broadcast hash join threshold.
After the adaptive query execution mechanism is turned on, the join strategy can be re-planned based on the most accurate data indicators at runtime to dynamically adjust the join strategy.
Look at the picture below:

During the subsequent testing process, you can see the execution graph of spark sql.

Attribute name Default value Explanation Version
spark.sql.adaptive.localShuffleReader.enabled true When the value is true, and spark.sql.adaptive.enabled When is also true, Spark tries to use the local shuffle reader to read the shuffle data when the shuffle partition is not required, for example: when converting sort-merge into broadcast- hash join After 3.0.0
spark.sql.adaptive.autoBroadcastJoinThreshold (none) Configure the maximum number of bytes for the table, which can be optimized into broadcast join. By setting this configuration to -1, broadcast can be disabled. The default value is the same as spark.sql.autoBroadcastJoinThreshold. Same 3.2.0
spark.sql.autoBroadcastJoinThreshold 10MB Same as above 1.1.0

When all shuffle partitions are less than the threshold, AQE will convert sort-merge join into shuffled hash join; maximum threshold configuration: spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold

Attribute name Default value Explanation Version
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold 0 Configure the maximum number of bytes for each partition to build a local hash map. If This value is not less than spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold and all partitions are not greater than this configuration. The join selection prefers to use shuffled hash join instead of sort merge join 3.2 .0

Practical combat

executed sql

select count(*) from xx where dt ='2023-06-30' and workorder='011002118525';
## The same tables are connected
select * from (select * from xx where dt ='2023-06-30' and workorder='011002118525') as a
left join xx as b on b.dt ='2023-06-30' and b.workorder='011002118525' and a.id = b.id;


From the above figure, the data of more than three million must exceed 10MB, so it is sort merge join


Modify the sql as follows:

select * from (select id from xx where dt = '2023-06-30' and workorder='011002118525' ) as a join xx as b on a.id = b.id and b.dt = '2023- 06-30' and b.unitid = 'H8TGWJ035ZY0000431';

Dynamic Optimization of Skewed Join

Principle

Serious data skew will seriously affect the performance of join queries. This feature dynamically handles skewing data during a sort-merge join, dividing it into tasks of similar size. When both spark.sql.adaptive.enabled and spark.sql.adaptive.skewJoin.enabled are enabled, the Dynamic Optimization Skew function will Take effect.

Attribute name Default value Explanation Version
spark.sql.adaptive.skewJoin.enabled true When the same is enabled spark.sql.adaptive.enabled, Dynamic Optimization Skew This feature will take effect 3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5 If the size of the partition is greater than this factor multiplied by the median partition size and is also greater than spark.sql.adaptive.skewJoin.strakedPartitionThresholdInBytes, The partition is considered skewed. 3.2.0
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB If partition If the byte size is greater than this threshold and is also greater than spark.sql.adaptive.skewJoin.strakedPartitionFactor times the median partition size, the partition is considered skewed. Ideally, this configuration should be set greater than spark.sql.adaptive.advisoryPartitionSizeInBytes. 3.0.0

Suppose there are two tables t1 and t2. The amount of data in the P0 partition in table t1 is significantly larger than For other partitions, the default execution situation is like this, see this picture:

The data in the p0 partition in the t1 table is much larger than the data in the p1\p2\p3 partitions. It can be considered that t1The data in the table is skewed.
When the partitions p1, p2, and p3 in the t1 and t2 tables are joined, there will basically be no data skew, because the data in these partitions is relatively moderate. However, the data in the P0 partition will be skewed during join, which will cause the join time to be too long.

The dynamically optimized skewed join mechanism will split the P0 partition into two sub-partitions P0-1 and P0-2, and associate each sub-partition to the corresponding partition P0 of table t2. See this figure:

The P0 partition in the t2 table will copy two copies of the same data, and the P0 split from the t1 table Code>Partitioned data is joined and associated.
This is equivalent to breaking up the skewed partitions in the t1 table, and ultimately will not produce data skew during join.

Practical combat

todo: If you encounter it in the future, please add it