The solution to the slow problem of from_json regexp_replace combination expressions encountered in Spark 3.1.1

Background

At present, when the company upgrades from spark 2.4.x to 3.1.1, it encounters a kind of extremely slow SQL situation. The SQL is as follows (only the key ones are listed):

 select device_personas.*
 from
 (select
        device_id, ads_id,
        from_json(regexp_replace(device_personas, '(?<=(\{|,))"device_', '"user_device_'), ${device_schema}) as device_personas
        from input )

Its ${device_schema} has hundreds of fields

In the case of 360core 720GB memory before tuning, it takes 43 minutes to run:

After tuning, it only needs to run for 6 minutes when resources remain unchanged:

Conclusion

Let me talk about the conclusion first:
The main reason is the org.apache.spark.sql.catalyst.optimizer.OptimizeJsonExprs new rule introduced by Spark 3.1.x, which cuts unnecessary columns for this SQL:
As a result, regexp_replace will be called many times. The specific reason is as explained in this rule:

if JsonToStructs(json) is shared among all fields of CreateNamedStruct. prunedSchema contains all accessed fields in original CreateNamedStruct.

So set spark.sql.optimizer.enableJsonExpressionOptimization to false, or set

spark.sql.adaptive.optimizer.excludedRules org.apache.spark.sql.catalyst.optimizer.OptimizeJsonExprs
spark.sql.optimizer.excludedRules org.apache.spark.sql.catalyst.optimizer.OptimizeJsonExprs

Skip this rule.

Analysis

The physical plan for this SQL is as follows:

If the rule is not skipped:

The major physics programs are:

(6) Project
Output [10]: [device_id#62, ads_id#63, from_json(StructField(user_device_adv_age_year,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1) , Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_ , 1 ), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293, from_json(StructField(ads_material_text_tag,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_material_text_tag AS ads_material_text_ta g#294, from_json(StructField(ads_ad_pic_resolution ,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_ad_pic_resolution AS ads_ad_pic_resolution#295, from_json(StructField(ctx_sound_patch_scene,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_ sound_patch_scene AS ctx_sound_patch_scene #296, from_json(StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_position AS ctx_position#297, from_json(StructField(album_category_id,StringType,true), album_personas#72, Some(Asia /Shanghai)).album_category_id AS album_category_id#298, from_json(StructField(album_nlp_labels_app,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_nlp_labels_app AS album_nlp_labels_app#29 9]
Input [6]: [device_id#62, ads_id#63, device_personas#69, ads_personas#70, album_personas#72, ctx_personas#73]

The conversion of the processing plan after this rule is as follows (taking two fields as an example):

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.OptimizeJsonExprs ===
 InsertIntoHadoopFsRelationCommand oss://xima-bd-data3.cn-shanghai.oss-dls.aliyuncs.com/reslib/droplet/generate/data/ai-ad/102041271/1723411818435/xqldata/.staging_1691066243227, false, Parquet, Map( coalesceNum -> 500, path -> oss://xima-bd-data3.cn-shanghai.oss-dls.aliyuncs.com/reslib/droplet/generate/data/ai-ad/102041271/1723411818435/xqldata/.staging_1691066243227 ), Overwrite, [device_id, ads_id, user_device_adv_age_year, user_device_child_age, ads_material_text_tag, ads_ad_pic_resolution, ctx_sound_patch_scene, ctx_position, album_category_id, album_nlp_labels_ app] InsertIntoHadoopFsRelationCommand oss://xima-bd-data3.cn-shanghai.oss-dls.aliyuncs.com/reslib/ droplet/generate/data/ai-ad/102041271/1723411818435/xqldata/.staging_1691066243227, false, Parquet, Map(coalesceNum -> 500, path -> oss://xima-bd-data3.cn-shanghai.oss-dls .aliyuncs.com/reslib/droplet/generate/data/ai-ad/102041271/1723411818435/xqldata/.staging_1691066243227), Overwrite, [device_id, ads_id, user_device_adv_age_year, user_device_child_age, ads_material_text_tag, ads_ad_pic_resolution, ctx_sound_patch_scene, ctx_position, album_category_id, album_nlp_labels_app]
  + - Repartition 500, true + - Repartition 500, true
! + - Project [device_id#62, ads_id#63, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,)) "device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, ( ? <= (\ {|,)) "Device_," User_device_, 1), some (asia/state)). User_device_child_age as user_device_age_age#293, from_json ILD (ads_material_text_tag, StringType, True), Structfield (ADS_AD_PIC_RESOLUTION, StringType, StringType, StringType, StringType true), ads_personas#70, Some(Asia/Shanghai)).ads_material_text_tag AS ads_material_text_tag#294, from_json(StructField(ads_material_text_tag,StringType,true), StructField(ads_ad_pic_resolution,StringType,true), ads_personas# 70, Some (Asia/Shanghai )).ads_ad_pic_resolution AS ads_ad_pic_resolution#295, from_json(StructField(ctx_sound_patch_scene,StringType,true), StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_sound_patch_sce ne AS ctx_sound_patch_scene#296, from_json(StructField (ctx_sound_patch_scene,StringType,true), StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_position AS ctx_position#297, from_json(StructField(album_category_id,StringType,true), StructField(alb um_nlp_labels_app, StringType,true), album_personas#72, Some(Asia/Shanghai)).album_category_id AS album_category_id#298, from_json(StructField(album_category_id,StringType,true), StructField(album_nlp_labels_app,StringType,true), album_person as#72, Some(Asia /Shanghai)).album_nlp_labels_app AS album_nlp_labels_app#299] + - Project [device_id#62, ads_id#63, from_json(StructField(user_device_adv_age_year,StringType,true), regexp_replace(device_personas#69, (?<=(\ {|,) )"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\ {|, ))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293, from_json(StructField(ads_material_text_tag,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_material _text_tag AS ads_material_text_tag#294, from_json(StructField(ads_ad_pic_resolution,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_ad_pic_resolution AS ads_ad_pic_resolution#295, from_json(StructField(ctx_sound_ patch_scene,StringType,true), ctx_personas#73, Some( Asia/Shanghai)).ctx_sound_patch_scene AS ctx_sound_patch_scene#296, from_json(StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_position AS ctx_position#297, from_json(StructFiel d(album_category_id, StringType, true ), album_personas#72, Some(Asia/Shanghai)).album_category_id AS album_category_id#298, from_json(StructField(album_nlp_labels_app,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_nlp_labels _app AS album_nlp_labels_app#299]
        + - Filter (if ((label_click#84 = 0)) (rand(7794855199306151884) >= 0.95) else true AND (NOT (isnull(device_personas#69) AND isnull(ads_personas#70)) OR NOT isnull(ctx_personas#73 ))) + - Filter (if ((label_click#84 = 0)) (rand(7794855199306151884) >= 0.95) else true AND (NOT (isnull(device_personas#69) AND isnull(ads_personas#70)) OR NOT isnull( ctx_personas#73)))
           + - Filter ((((dt#82 >= 20230710) AND (dt#82 <= 20230712)) AND NOT coalesce(appshadow#76, ) IN (2,3)) AND ((NOT (position_name#75 = sound_agg ) AND isnotnull(get_json_object(ads_personas#70, $.ads_first_trade))) AND NOT coalesce(get_json_object(ads_personas#70, $.ads_business_type), -11111) IN (1,2,3))) + - Filter ((( (dt#82 >= 20230710) AND (dt#82 <= 20230712)) AND NOT coalesce(appshadow#76, ) IN (2,3)) AND ((NOT (position_name#75 = sound_agg) AND isnotnull(get_json_object( ads_personas#70, $.ads_first_trade))) AND NOT coalesce(get_json_object(ads_personas#70, $.ads_business_type), -11111) IN (1,2,3)))
              + - Relation[device_id#62,ads_id#63,response_id#64,track_id#65,album_id#66,imp_ts#67,click_ts#68,device_personas#69,ads_personas#70,track_personas#71,album_personas#72,ctx_personas# 73,label_conv#74,position_name#75,appshadow#76,play_num#77,sub_num#78,leave_num#79,pay_num#80,live_num#81,dt#82,hour#83,label_click#84] parquet + - Relation [device_id#62, ads_id#63, response_id#64, track_id#65, album_id#66, imp_ts#67, click_ts#68, device_personas#69, ads_personas#70, track_personas#71, album_personas#72, ctx_personas#73, label_conv #74,position_name#75,appshadow#76,play_num#77,sub_num#78,leave_num#79,pay_num#80,live_num#81,dt#82,hour#83,label_click#84] parquet

You can see that the most important transformations are:

from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some (Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, ( ?<=(\{|,)) "device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293

              ||
              \/

from_json(StructField(user_device_adv_age_year,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_y ear#292 , from_json(StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age# 293

The schema in from_json is divided into StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true)
StructField(user_device_adv_age_year,StringType,true)
StructField(user_device_child_age,StringType,true) separate two schemas

So why is it slow? It is because of the processing logic in JsonToStructs:

case class JsonToStructs(
    schema: DataType,
    options: Map[String, String],
    child: Expression,
    timeZoneId: Option[String] = None)
  extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes
    with NullIntolerant {
    ...
    @transient lazy val parser = {
    val parsedOptions = new JSONOptions(options, timeZoneId.get, nameOfCorruptRecord)
    val mode = parsedOptions. parseMode
    if (mode != PermissiveMode & amp; & amp; mode != FailFastMode) {
      throw new IllegalArgumentException(s"from_json() doesn't support the ${mode.name} mode." +
        s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}.")
    }
    val (parserSchema, actualSchema) = nullableSchema match {
      case s: StructType =>
        ExprUtils.verifyColumnNameOfCorruptRecord(s, parsedOptions.columnNameOfCorruptRecord)
        (s, StructType(s. filterNot(_. name == parsedOptions. columnNameOfCorruptRecord)))
      case other =>
        (StructType(StructField("value", other) :: Nil), other)
    }

    val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = false)
    val createParser = CreateJacksonParser.utf8String_

    new FailureSafeParser[UTF8String](
      input => rawParser. parse(input, createParser, identity[UTF8String]),
      mode,
      parserSchema,
      parsedOptions.columnNameOfCorruptRecord)
  }
  ...
  override def nullSafeEval(json: Any): Any = {
    converter(parser. parse(json. asInstanceOf[UTF8String]))
  }

The main concern is the parser variable, because due to the above rules, the two schemas are in different parsers, and the Child here is composed of regexp_replace expressions, so the regular expression will be calculated twice,
Since there will be more than 10 fields, the regular expression will be recalculated more than 100 times (regular expressions are time-consuming).

When this rule is skipped

The major physics programs are:

(6) Project
Output [10]: [device_id#62, ads_id#63, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,) )"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true) , regexp_replace(device_personas#69, (? <= (\ {|,)) "Device_," User_device_, 1), some (asia/support). User_device_child_age as user_device_age#293, from_json FIELD (ads_material_text_tag, StringType, TRUE), Structfield (ADS_AD_PIC_Resolution, StringType ,true), ads_personas#70, Some(Asia/Shanghai)).ads_material_text_tag AS ads_material_text_tag#294, from_json(StructField(ads_material_text_tag,StringType,true), StructField(ads_ad_pic_resolution,StringType,true), ads_personas #70, Some(Asia/ Shanghai)).ads_ad_pic_resolution AS ads_ad_pic_resolution#295, from_json(StructField(ctx_sound_patch_scene,StringType,true), StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_sound_patch _scene AS ctx_sound_patch_scene#296, from_json( StructField(ctx_sound_patch_scene,StringType,true), StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_position AS ctx_position#297, from_json(StructField(album_category_id,StringType,true), Struct Field(album_nlp_labels_app ,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_category_id AS album_category_id#298, from_json(StructField(album_category_id,StringType,true), StructField(album_nlp_labels_app,StringType,true), album_per sonas#72, Some( Asia/Shanghai)).album_nlp_labels_app AS album_nlp_labels_app#299]
Input [6]: [device_id#62, ads_id#63, device_personas#69, ads_personas#70, album_personas#72, ctx_personas#73]

If the rule is skipped, the rule will not be applied, or take two fields as an example, so the Schema of from_json will not change:

from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some (Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, ( ?<=(\{|,)) "device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293

In fact, we can see from the physical plan: In fact, the expression regexp_replace will still appear many times, won’t it be called many times? Of course it will not be called many times, directly look at the physical plan ProjectExec:

ProjectExec

  protected override def doExecute(): RDD[InternalRow] = {
    child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
      val project = UnsafeProjection.create(projectList, child.output)
      project. initialize(index)
      iter. map(project)
    }
  }

The calling chain of this method is as follows:

UnsafeProjection.create
              ||
              \/
InterpretedUnsafeProjection.createProjection/GenerateUnsafeProjection.generate
              ||
              \/
             create
              ||
              \/
createCode(ctx, expressions, subexpressionEliminationEnabled)
              ||
              \/
ctx.generateExpressions(expressions, useSubexprElimination)
              ||
              \/
subexpression Elimination

subexpressionElimination This is mainly to extract public expressions, that is to say, the calculation of subsequent public expressions will only be calculated once
That corresponds to our expression as:

 Alias(GetStructField(attribute.get, i), f.name)()
 Where attribute.get is JsonToStructs(StructType(StructField(user_device_adv_age_year,StringType,true),StructField(user_device_child_age,StringType,true)), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_ , 1), Some(Asia/Shanghai))

Here’s exactly the same as the plan displayed on the Spark UI:

from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some (Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, ( ?<=(\{|,)) "device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293

(mainly the method of calling JsonToStructs.toString)

Other

  • The toString method of Alias is:
s"$child AS $name#${exprId.id}$typeSuffix$delaySuffix"
  • The toString method of GetStructField is:
val fieldName = if (resolved) childSchema(ordinal).name else s"_$ordinal"
s"$child.${name.getOrElse(fieldName)}"
  • UnresolvedStar has an explanation of SELECT record. from (SELECT struct(a,b,c) as record …)*

  • The method buildExpandedProjectList in the ResolveReferences rule calls the expand method of UnresolvedStar
    Here it will be parsed as Alias(GetStructField(attribute.get, i), f.name)()

  • For specific optimization rules, see Optimize Json expression chain