Directory
- Overview
- Dynamically adjust Join strategy
-
- principle
- Actual combat
- Dynamically Optimize Skewed Join
-
- principle
- Actual combat
Overview
broadcast hash join
is similar to broadcast variable
in Spark shared variables. If Spark join can adopt this strategy, the join performance will be the best.
- Adaptive Query Execution (Adaptive Query Execution)
- Dynamically adjust Join strategy
- principle
- Actual combat
- Dynamically Optimize Skewed Join
- principle
- Default environment configuration
- Change setting
- Dynamically adjust Join strategy
Dynamic adjustment of Join strategy
In fact, limitations in production, especially in factories, when designing tables, are not so reasonable. As a result, this situation is rare and difficult to adjust.
Principle
AQE can convert sort-merge join
into broadcast hash join
, provided that the join table is smaller than the adaptive broadcast hash join
threshold.
After the adaptive query execution mechanism is turned on, the join strategy can be re-planned based on the most accurate data indicators at runtime to dynamically adjust the join strategy.
Look at the picture below:
During the subsequent testing process, you can see the execution graph of spark sql.
Attribute name | Default value | Explanation | Version |
---|---|---|---|
spark.sql.adaptive.localShuffleReader.enabled | true | When the value is true, and spark.sql.adaptive.enabled When is also true, Spark tries to use the local shuffle reader to read the shuffle data when the shuffle partition is not required, for example: when converting sort-merge into broadcast- hash join After |
3.0.0 |
spark.sql.adaptive.autoBroadcastJoinThreshold | (none) | Configure the maximum number of bytes for the table, which can be optimized into broadcast join . By setting this configuration to -1, broadcast can be disabled. The default value is the same as spark.sql.autoBroadcastJoinThreshold. Same |
3.2.0 |
spark.sql.autoBroadcastJoinThreshold | 10MB | Same as above | 1.1.0 |
When all shuffle partitions are less than the threshold, AQE will convert sort-merge join into shuffled hash join; maximum threshold configuration: spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold
Attribute name | Default value | Explanation | Version |
---|---|---|---|
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold | 0 | Configure the maximum number of bytes for each partition to build a local hash map. If This value is not less than spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold and all partitions are not greater than this configuration. The join selection prefers to use shuffled hash join instead of sort merge join |
3.2 .0 |
Practical combat
executed sql
select count(*) from xx where dt ='2023-06-30' and workorder='011002118525'; ## The same tables are connected select * from (select * from xx where dt ='2023-06-30' and workorder='011002118525') as a left join xx as b on b.dt ='2023-06-30' and b.workorder='011002118525' and a.id = b.id;
From the above figure, the data of more than three million must exceed 10MB, so it is sort merge join
Modify the sql as follows:
select * from (select id from xx where dt = '2023-06-30' and workorder='011002118525' ) as a join xx as b on a.id = b.id and b.dt = '2023- 06-30' and b.unitid = 'H8TGWJ035ZY0000431';
Dynamic Optimization of Skewed Join
Principle
Serious data skew will seriously affect the performance of join queries. This feature dynamically handles skewing data during a sort-merge join, dividing it into tasks of similar size. When both spark.sql.adaptive.enabled
and spark.sql.adaptive.skewJoin.enabled
are enabled, the Dynamic Optimization Skew
function will Take effect.
Attribute name | Default value | Explanation | Version |
---|---|---|---|
spark.sql.adaptive.skewJoin.enabled | true | When the same is enabled spark.sql.adaptive.enabled , Dynamic Optimization Skew This feature will take effect |
3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionFactor | 5 | If the size of the partition is greater than this factor multiplied by the median partition size and is also greater than spark.sql.adaptive.skewJoin.strakedPartitionThresholdInBytes , The partition is considered skewed. |
3.2.0 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes | 256MB | If partition If the byte size is greater than this threshold and is also greater than spark.sql.adaptive.skewJoin.strakedPartitionFactor times the median partition size, the partition is considered skewed. Ideally, this configuration should be set greater than spark.sql.adaptive.advisoryPartitionSizeInBytes . |
3.0.0 |
Suppose there are two tables t1
and t2
. The amount of data in the P0
partition in table t1
is significantly larger than For other partitions, the default execution situation is like this, see this picture:
The data in the p0
partition in the t1
table is much larger than the data in the p1\p2\p3
partitions. It can be considered that t1
The data in the table is skewed
.
When the partitions p1, p2, and p3 in the t1 and t2 tables are joined, there will basically be no data skew, because the data in these partitions is relatively moderate. However, the data in the P0 partition will be skewed during join, which will cause the join time to be too long
.
The dynamically optimized skewed join mechanism will split the P0 partition into two sub-partitions P0-1 and P0-2, and associate each sub-partition to the corresponding partition P0 of table t2. See this figure:
The P0
partition in the t2
table will copy two copies of the same data, and the P0
split from the t1
table Code>Partitioned data is joined and associated.
This is equivalent to breaking up the skewed partitions in the t1
table, and ultimately will not produce data skew
during join.
Practical combat
todo: If you encounter it in the future, please add it