Spark 3.4.x’s solution to the slow problem of from_json regexp_replace combination expressions

Background

The solution to the slow problem of from_json regexp_replace combination expressions encountered in Spark 3.1.1
In fact, it has been solved in spark 3.4.x,
The specific solution can be found in SPARK-44700,
That is to set spark.sql.optimizer.collapseProjectAlwaysInline to false (the default is false)
But how does spark 3.4.x solve it?

Analysis

Take the following SQL as an example:

 Seq("""{"a":1, "b":0.8}""").toDF("s").write.saveAsTable("t")
    // spark.sql.planChangeLog.level warn
    val df = sql(
      """
        |SELECT j.*
        |FROM (SELECT from_json(regexp_replace(s, 'a', 'new_a'), 'new_a INT, b DOUBLE') AS j
        | FROM t) tmp
        |""".stripMargin)
    df. explain(true)

In spark 3.1.1 there will be the following conversion:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.CollapseProject ===
!Project [j#17.new_a AS new_a#19, j#17.b AS b#20] Project [from_json(StructField(new_a,IntegerType,true), StructField(b,DoubleType,true), regexp_replace(s#18 , a, new_a, 1), Some(America/Los_Angeles)).new_a AS new_a#19, from_json(StructField(new_a,IntegerType,true), StructField(b,DoubleType,true), regexp_replace(s#18, a, new_a, 1), Some(America/Los_Angeles)).b AS b#20]
! + - Project [from_json(StructField(new_a,IntegerType,true), StructField(b,DoubleType,true), regexp_replace(s#18, a, new_a, 1), Some(America/Los_Angeles)) AS j#17] + - Relation[s#18] parquet
! + - Relation[s#18] parquet
           
09:46:57.649 WARN org.apache.spark.sql.catalyst.rules.PlanChangeLogger:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.OptimizeJsonExprs ===
!Project [from_json(StructField(new_a,IntegerType,true), StructField(b,DoubleType,true), regexp_replace(s#18, a, new_a, 1), Some(America/Los_Angeles)).new_a AS new_a#19, from_json(StructField(new_a,IntegerType,true), StructField(b,DoubleType,true), regexp_replace(s#18, a, new_a, 1), Some(America/Los_Angeles)).b AS b#20] Project [from_json (StructField(new_a,IntegerType,true), regexp_replace(s#18, a, new_a, 1), Some(America/Los_Angeles)).new_a AS new_a#19, from_json(StructField(b,DoubleType,true), regexp_replace( s#18, a, new_a, 1), Some(America/Los_Angeles)).b AS b#20]
  + - Relation[s#18] parquet + - Relation[s#18] parquet
   

The final physical plan is as follows:

== Physical Plan ==
Project [from_json(StructField(new_a,IntegerType,true), regexp_replace(s#18, a, new_a, 1), Some(America/Los_Angeles)).new_a AS new_a#19, from_json(StructField(b,DoubleType,true) , regexp_replace(s#18, a, new_a, 1), Some(America/Los_Angeles)).b AS b#20]
 + - *(1) ColumnarToRow
    + - FileScan parquet default.t[s#18] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/jiahong.li/xmalaya/github/spark/spark-warehouse/org. apache.spark.sq..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<s:string>

Here there are as many columns as new_a,b, regexp_replace will be calculated several times, that is to say, there will be performance loss.

In spark 3.4.0 there is no conversion of the above rules CollapseProject and OptimizeJsonExprs, the generated physical plan is as follows:

== Physical Plan ==
*(2) Project [j#17.new_a AS new_a#20, j#17.b AS b#21]
 + - Project [from_json(StructField(new_a,IntegerType,true), StructField(b,DoubleType,true), regexp_replace(s#18, a, new_a, 1), Some(America/Los_Angeles)) AS j#17]
    + - *(1) ColumnarToRow
       + - FileScan parquet spark_catalog.default.t[s#18] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/jiahong.li/xmalaya/ultimate-git/ spark/spark-warehouse/org...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<s:string>

No matter how many columns new_a,b there are (although there is only one column in the current example), regexp_replace will only be calculated once.

Analyze it:

The key is in the CollapseProject Rule:

 def apply(plan: LogicalPlan): LogicalPlan = {
    apply(plan, conf. getConf(SQLConf. COLLAPSE_PROJECT_ALWAYS_INLINE))
  }
  ...
  def apply(plan: LogicalPlan, alwaysInline: Boolean): LogicalPlan = {
  plan.transformUpWithPruning(_.containsPattern(PROJECT), ruleId) {
    case p1 @ Project(_, p2: Project)
        if canCollapseExpressions(p1. projectList, p2. projectList, alwaysInline) =>
      p2.copy(projectList = buildCleanedProjectList(p1.projectList, p2.projectList))
  

The most important thing here is the canCollapseExpressions method, which is mainly to judge whether the project can be merged:

 def canCollapseExpressions(
      consumers: Seq[Expression],
      producerMap: Map[Attribute, Expression],
      alwaysInline: Boolean = false): Boolean = {
    consumers
      .filter(_.references.exists(producerMap.contains))
      .flatMap(collectReferences)
      .groupBy(identity)
      .mapValues(_.size)
      .forall {
        case (reference, count) =>
          val producer = producerMap. getOrElse(reference, reference)
          val relatedConsumers = consumers. filter(_. references. contains(reference))
          def cheapToInlineProducer: Boolean = trimAliases(producer) match {
            case e @ (_: CreateNamedStruct | _: UpdateFields | _: CreateMap | _: CreateArray) =>
              var nonCheapAccessSeen = false
              def nonCheapAccessVisitor(): Boolean = {
                try {
                  nonCheapAccessSeen
                } finally {
                  nonCheapAccessSeen = true
                }
              }

              !relatedConsumers.exists(findNonCheapAccesses(_, reference, e, nonCheapAccessVisitor))

            case other => isCheap(other)
          }

          producer.deterministic & amp; & amp; (count == 1 || alwaysInline || cheapToInlineProducer)
      }

Here, the processing of JsonToStructs(RegExpReplace) is in case other,

 def isCheap(e: Expression): Boolean = e match {
    case _: Attribute | _: OuterReference => true
    case_if e.foldable => true
    // PythonUDF is handled by the rule ExtractPythonUDFs
    case _: PythonUDF => true
    // Alias and ExtractValue are very cheap.
    case _: Alias | _: ExtractValue => e.children.forall(isCheap)
    case_ => false
  }

isCheap matches the following case _ => false,
So isCheap returns false, and the number of count is 2, alwaysInline is false by default, cheapToInlineProducer is false,
So in the end canCollapseExpressions returns false, so CollapseProject will not be applied to the current plan in spark 3.4.0, so there will be no loss of performance

Summary

The main difference of the plan lies in the difference between Rule CollapseProject in spark 3.1.1 and spark 3.4.0.