Background
The solution to the slow problem of from_json regexp_replace combination expressions encountered in Spark 3.1.1
In fact, it has been solved in spark 3.4.x,
The specific solution can be found in SPARK-44700,
That is to set spark.sql.optimizer.collapseProjectAlwaysInline
to false
(the default is false)
But how does spark 3.4.x solve it?
Analysis
Take the following SQL as an example:
Seq("""{"a":1, "b":0.8}""").toDF("s").write.saveAsTable("t") // spark.sql.planChangeLog.level warn val df = sql( """ |SELECT j.* |FROM (SELECT from_json(regexp_replace(s, 'a', 'new_a'), 'new_a INT, b DOUBLE') AS j | FROM t) tmp |""".stripMargin) df. explain(true)
In spark 3.1.1
there will be the following conversion:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.CollapseProject === !Project [j#17.new_a AS new_a#19, j#17.b AS b#20] Project [from_json(StructField(new_a,IntegerType,true), StructField(b,DoubleType,true), regexp_replace(s#18 , a, new_a, 1), Some(America/Los_Angeles)).new_a AS new_a#19, from_json(StructField(new_a,IntegerType,true), StructField(b,DoubleType,true), regexp_replace(s#18, a, new_a, 1), Some(America/Los_Angeles)).b AS b#20] ! + - Project [from_json(StructField(new_a,IntegerType,true), StructField(b,DoubleType,true), regexp_replace(s#18, a, new_a, 1), Some(America/Los_Angeles)) AS j#17] + - Relation[s#18] parquet ! + - Relation[s#18] parquet 09:46:57.649 WARN org.apache.spark.sql.catalyst.rules.PlanChangeLogger: === Applying Rule org.apache.spark.sql.catalyst.optimizer.OptimizeJsonExprs === !Project [from_json(StructField(new_a,IntegerType,true), StructField(b,DoubleType,true), regexp_replace(s#18, a, new_a, 1), Some(America/Los_Angeles)).new_a AS new_a#19, from_json(StructField(new_a,IntegerType,true), StructField(b,DoubleType,true), regexp_replace(s#18, a, new_a, 1), Some(America/Los_Angeles)).b AS b#20] Project [from_json (StructField(new_a,IntegerType,true), regexp_replace(s#18, a, new_a, 1), Some(America/Los_Angeles)).new_a AS new_a#19, from_json(StructField(b,DoubleType,true), regexp_replace( s#18, a, new_a, 1), Some(America/Los_Angeles)).b AS b#20] + - Relation[s#18] parquet + - Relation[s#18] parquet
The final physical plan is as follows:
== Physical Plan == Project [from_json(StructField(new_a,IntegerType,true), regexp_replace(s#18, a, new_a, 1), Some(America/Los_Angeles)).new_a AS new_a#19, from_json(StructField(b,DoubleType,true) , regexp_replace(s#18, a, new_a, 1), Some(America/Los_Angeles)).b AS b#20] + - *(1) ColumnarToRow + - FileScan parquet default.t[s#18] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/jiahong.li/xmalaya/github/spark/spark-warehouse/org. apache.spark.sq..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<s:string>
Here there are as many columns as new_a,b
, regexp_replace
will be calculated several times, that is to say, there will be performance loss.
In spark 3.4.0
there is no conversion of the above rules CollapseProject
and OptimizeJsonExprs
, the generated physical plan is as follows:
== Physical Plan == *(2) Project [j#17.new_a AS new_a#20, j#17.b AS b#21] + - Project [from_json(StructField(new_a,IntegerType,true), StructField(b,DoubleType,true), regexp_replace(s#18, a, new_a, 1), Some(America/Los_Angeles)) AS j#17] + - *(1) ColumnarToRow + - FileScan parquet spark_catalog.default.t[s#18] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/jiahong.li/xmalaya/ultimate-git/ spark/spark-warehouse/org...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<s:string>
No matter how many columns new_a,b
there are (although there is only one column in the current example), regexp_replace
will only be calculated once.
Analyze it:
The key is in the CollapseProject
Rule:
def apply(plan: LogicalPlan): LogicalPlan = { apply(plan, conf. getConf(SQLConf. COLLAPSE_PROJECT_ALWAYS_INLINE)) } ... def apply(plan: LogicalPlan, alwaysInline: Boolean): LogicalPlan = { plan.transformUpWithPruning(_.containsPattern(PROJECT), ruleId) { case p1 @ Project(_, p2: Project) if canCollapseExpressions(p1. projectList, p2. projectList, alwaysInline) => p2.copy(projectList = buildCleanedProjectList(p1.projectList, p2.projectList))
The most important thing here is the canCollapseExpressions
method, which is mainly to judge whether the project can be merged:
def canCollapseExpressions( consumers: Seq[Expression], producerMap: Map[Attribute, Expression], alwaysInline: Boolean = false): Boolean = { consumers .filter(_.references.exists(producerMap.contains)) .flatMap(collectReferences) .groupBy(identity) .mapValues(_.size) .forall { case (reference, count) => val producer = producerMap. getOrElse(reference, reference) val relatedConsumers = consumers. filter(_. references. contains(reference)) def cheapToInlineProducer: Boolean = trimAliases(producer) match { case e @ (_: CreateNamedStruct | _: UpdateFields | _: CreateMap | _: CreateArray) => var nonCheapAccessSeen = false def nonCheapAccessVisitor(): Boolean = { try { nonCheapAccessSeen } finally { nonCheapAccessSeen = true } } !relatedConsumers.exists(findNonCheapAccesses(_, reference, e, nonCheapAccessVisitor)) case other => isCheap(other) } producer.deterministic & amp; & amp; (count == 1 || alwaysInline || cheapToInlineProducer) }
Here, the processing of JsonToStructs(RegExpReplace)
is in case other,
def isCheap(e: Expression): Boolean = e match { case _: Attribute | _: OuterReference => true case_if e.foldable => true // PythonUDF is handled by the rule ExtractPythonUDFs case _: PythonUDF => true // Alias and ExtractValue are very cheap. case _: Alias | _: ExtractValue => e.children.forall(isCheap) case_ => false }
isCheap matches the following case _ => false,
So isCheap returns false, and the number of count is 2, alwaysInline is false by default, cheapToInlineProducer is false
,
So in the end canCollapseExpressions returns false
, so CollapseProject will not be applied to the current plan in spark 3.4.0, so there will be no loss of performance
Summary
The main difference of the plan lies in the difference between Rule CollapseProject
in spark 3.1.1
and spark 3.4.0
.