SparkListener Bloodline–Implementation Idea of Openlineage Plugin

Article directory

  • 1. SparkListener
    • 1.1 Source code analysis
    • 1.2 Methods provided by Listener
  • Two, OpenLineage’s SparkListener plug-in implementation
    • 2.1 Initialization parameters
    • 2.2 Class loading information
    • 2.3 Trigger execution
    • 2.4 Logic plan analysis
    • 2.5 Get metadata
  • 3. Specific implementation ideas of Visitor (follow-up)
  • Four. Summary

1. SparkListener

1.1 Source code analysis

The Spark listener is mainly used to monitor various activities of the Spark application, and the SparkListener can monitor events by registering to the ListenerBus. As long as “spark.extraListeners” is added to the parameter when starting Spark, spark will resolve and implant the Listener we implemented through Utils.classForName.

 private def setupAndStartListenerBus(): Unit = {<!-- -->
    // Use reflection to instantiate listeners specified via `spark.extraListeners`
    try {<!-- -->
      val listenerClassNames: Seq[String] =
        conf.get("spark.extraListeners", "").split(',').map(_.trim).filter(_ != "")
      for (className <- listenerClassNames) {<!-- -->
        // Use reflection to find the right constructor
        val constructors = {<!-- -->
          val listenerClass = Utils. classForName(className)
          listenerClass
              .getConstructors
              .asInstanceOf[Array[Constructor[_ <: SparkListenerInterface]]]
        }
        val constructorTakingSparkConf = constructors. find {<!-- --> c =>
          c.getParameterTypes.sameElements(Array(classOf[SparkConf]))
        }
        lazy val zeroArgumentConstructor = constructors. find {<!-- --> c =>
          c.getParameterTypes.isEmpty
        }
        val listener: SparkListenerInterface = {<!-- -->
          if (constructorTakingSparkConf.isDefined) {<!-- -->
            constructorTakingSparkConf.get.newInstance(conf)
          } else if (zeroArgumentConstructor.isDefined) {<!-- -->
            zeroArgumentConstructor.get.newInstance()
          } else {<!-- -->
            throw new SparkException(
              s"$className did not have a zero-argument constructor or a" +
                " single-argument constructor that accepts SparkConf. Note: if the class is" +
                " defined inside of another Scala class, then its constructors may accept an" +
                "implicit parameter that references the enclosing class; in this case, you must" +
                "define the listener as a top-level class in order to prevent this extra" +
                " parameter from breaking Spark's ability to find a valid constructor.")
          }
        }
        listenerBus. addListener(listener)
        logInfo(s"Registered listener $className")
      }
    } catch {<!-- -->
      case e: Exception =>
        try {<!-- -->
          stop()
        } finally {<!-- -->
          throw new SparkException(s"Exception when registering SparkListener", e)
        }
    }

    listenerBus. start()

listenerBus is the bus of all listeners, the start method will start the listenerThread thread, and when an event enters, it will pass the event to all registered listeners

 /** Post the application start event */
  private def postApplicationStart() {<!-- -->
    // Note: this code assumes that the task scheduler has been initialized and has contacted
    // the cluster manager to get an application ID (in case the cluster manager provides one).
    listenerBus. post(SparkListenerApplicationStart(appName, Some(applicationId),
      startTime, sparkUser, applicationAttemptId, schedulerBackend. getDriverLogUrls))
  }

The above code example describes the process of the listenerBus passing the event (event) when the App starts. The post method will put the event into the queue first, and then the listenerThread will take the event in a loop, and judge which method of all registered listeners should be triggered according to the type.

 protected override def doPostEvent(
      listener: StreamingListener,
      event: StreamingListenerEvent): Unit = {<!-- -->
    event match {<!-- -->
      case receiverStarted: StreamingListenerReceiverStarted =>
        listener.onReceiverStarted(receiverStarted)
      case receiverError: StreamingListenerReceiverError =>
        listener.onReceiverError(receiverError)
      case receiverStopped: StreamingListenerReceiverStopped =>
        listener.onReceiverStopped(receiverStopped)
      case batchSubmitted: StreamingListenerBatchSubmitted =>
        listener.onBatchSubmitted(batchSubmitted)
      case batchStarted: StreamingListenerBatchStarted =>
        listener.onBatchStarted(batchStarted)
      case batchCompleted: StreamingListenerBatchCompleted =>
        listener.onBatchCompleted(batchCompleted)
      case outputOperationStarted: StreamingListenerOutputOperationStarted =>
        listener.onOutputOperationStarted(outputOperationStarted)
      case outputOperationCompleted: StreamingListenerOutputOperationCompleted =>
        listener.onOutputOperationCompleted(outputOperationCompleted)
      case_ =>
    }
  }

1.2 Methods provided by Listener

@DeveloperApi
abstract class SparkListener extends SparkListenerInterface {<!-- -->
  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {<!-- --> }

  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {<!-- --> }

  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {<!-- --> }

  override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = {<!-- --> }

  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {<!-- --> }

  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {<!-- --> }

  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {<!-- --> }

  override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = {<!-- --> }

  override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = {<!-- --> }

  override def onBlockManagerRemoved(
      blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = {<!-- --> }

  override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = {<!-- --> }

  override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {<!-- --> }

  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {<!-- --> }

  override def onExecutorMetricsUpdate(
      executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = {<!-- --> }

  override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {<!-- --> }

  override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {<!-- --> }

  override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {<!-- --> }

  override def onOtherEvent(event: SparkListenerEvent): Unit = {<!-- --> }
}

2. SparkListener plug-in implementation of OpenLineage

For the introduction of openlineage, please refer to my column
Openlineage Data Map

2.1 Initialization parameters

OpenLineage’s Spark listener implementation code class is io.openlineage.spark.agent.OpenLineageSparkListener. Every time the listener is initialized, it will get the corresponding parameters from Spark Conf. The parameters mainly include the target that needs to be sent after the analysis of blood relationship is completed. Information (such as the server address sent to kafka or the url of http communication)

 @Override
  public void onApplicationStart(SparkListenerApplicationStart applicationStart) {<!-- -->
    initializeContextFactoryIfNotInitialized();
  }

  private void initializeContextFactoryIfNotInitialized() {<!-- -->
    if (contextFactory != null || isDisabled) {<!-- -->
      return;
    }
    SparkEnv sparkEnv = SparkEnv$.MODULE$.get();
    if (sparkEnv != null) {<!-- -->
      try {<!-- -->
        ArgumentParser args = ArgumentParser. parse(sparkEnv. conf());
        contextFactory = new ContextFactory(new EventEmitter(args));
  ······
  ······
  // how argument parser works
  public static ArgumentParser parse(SparkConf conf) {<!-- -->
    ArgumentParserBuilder builder = ArgumentParser. builder();
    adjustDeprecatedConfigs(conf);
    conf.setIfMissing(SPARK_CONF_DISABLED_FACETS, DEFAULT_DISABLED_FACETS);
    conf.setIfMissing(SPARK_CONF_TRANSPORT_TYPE, "http");

    if (conf.get(SPARK_CONF_TRANSPORT_TYPE).equals("http")) {<!-- -->
      findSparkConfigKey(conf, SPARK_CONF_HTTP_URL)
          .ifPresent(url -> UrlParser.parseUrl(url).forEach(conf::set));
    }
    findSparkConfigKey(conf, SPARK_CONF_APP_NAME)
        .filter(str -> !str.isEmpty())
        .ifPresent(builder::appName);
    findSparkConfigKey(conf, SPARK_CONF_NAMESPACE).ifPresent(builder::namespace);
    findSparkConfigKey(conf, SPARK_CONF_JOB_NAME).ifPresent(builder::jobName);
    findSparkConfigKey(conf, SPARK_CONF_PARENT_RUN_ID).ifPresent(builder::parentRunId);
    builder.openLineageYaml(extractOpenlineageConfFromSparkConf(conf));
    return builder. build();

Note that the extractOpenlineageConfFromSparkConf method here is mainly to parse the conf into a serializable openLineageYaml class for sending, see the following code

/** Configuration for {@link OpenLineageClient}. */
public class OpenLineageYaml {<!-- -->
  @Getter
  @JsonProperty("transport")
  private TransportConfig transportConfig;

  @Getter
  @JsonProperty("facets")
  private FacetsConfig facetsConfig;
}

2.2 Class loading information


When the listener is initialized, the EventEmitter class will be generated, which will analyze the information of openlineageyaml and generate a specific client communication class for sending to the metadata map or message middleware (such as kafka)

 this.client =
        OpenLineageClient. builder()
            .transport(
                new TransportFactory(argument. getOpenLineageYaml(). getTransportConfig()). build())
            .disableFacets(disabledFacets)
            .build();
  }

ContextFactory receives the EventEmitter parameter, and identifies the spark version according to the spark environment (currently Openlineage supports Spark2, 3 lineage analysis), and internally generates the corresponding version of VisitorFactory. VistoryFactory will load accessors according to different versions of spark.

 static VisitorFactory getInstance() {<!-- -->
    String version = package$.MODULE$.SPARK_VERSION();
    try {<!-- -->
      return (VisitorFactory) Class.forName(getVisitorFactoryForVersion(version)).newInstance();
    } catch (Exception e) {<!-- -->
      throw new RuntimeException(
          String.format("Can't instantiate visitor factory for version: %s", version), e);
    }
  }
  ······
  ······
  static String getVisitorFactoryForVersion(String version) {<!-- -->
    if (version.startsWith("2.")) {<!-- -->
      return SPARK2_FACTORY_NAME;
    } else if (version. startsWith("3.2")) {<!-- -->
      return SPARK32_FACTORY_NAME;
    } else {<!-- -->
      return SPARK3_FACTORY_NAME;
    }
  }
  ······
  ······
  private static final String SPARK2_FACTORY_NAME =
      "io.openlineage.spark.agent.lifecycle.Spark2VisitorFactoryImpl";

  private static final String SPARK3_FACTORY_NAME =
      "io.openlineage.spark.agent.lifecycle.Spark3VisitorFactoryImpl";

  private static final String SPARK32_FACTORY_NAME =
      "io.openlineage.spark.agent.lifecycle.Spark32VisitorFactoryImpl";

At the same time, ContextFactory is also responsible for creating a specific ExecutionContext (every time a listening event is triggered, a Context will be updated in Registry’s HashMap), and the member variable runEventBuilder (including handlerFactory) is mainly used for Handles analyzing Spark logical plans. When runEventBuilder is initialized, it will register the metadata constructor and the specific Input/Output logical plan accessor (obtain the corresponding version of the previously loaded accessor from the handlerFactory).

 public Optional<ExecutionContext> createSparkSQLExecutionContext(
      SparkListenerSQLExecutionEnd event) {<!-- -->
    return executionFromCompleteEvent(event)
        .map(
            queryExecution -> {<!-- -->
              SparkSession sparkSession = queryExecution. sparkSession();
              OpenLineageContext olContext =
                  OpenLineageContext. builder()
                      .sparkSession(Optional.of(sparkSession))
                      .sparkContext(sparkSession.sparkContext())
                      .openLineage(new OpenLineage(Versions.OPEN_LINEAGE_PRODUCER_URI))
                      .queryExecution(queryExecution)
                      .customEnvironmentVariables(
                          this.openLineageEventEmitter.getCustomEnvironmentVariables())
                      .build();
              OpenLineageRunEventBuilder runEventBuilder =
                  new OpenLineageRunEventBuilder(olContext, handlerFactory);
              return new SparkSQLExecutionContext(
                  event.executionId(), openLineageEventEmitter, olContext, runEventBuilder);
            });
  }

2.3 Trigger execution

Every time a method of the Listener is triggered, a RunEvent will be generated (RunEvent is the specific metadata information).
Take triggering the onJobStart method as an example, the above object will be automatically initialized, and the Context will be adapted to execute the logical plan accessor:

 @Override
  public void onJobStart(SparkListenerJobStart jobStart) {<!-- -->
    if (isDisabled) {<!-- -->
      return;
    }
    initializeContextFactoryIfNotInitialized();
    Optional<ActiveJob> activeJob =
        asJavaOptional(
                SparkSession. getDefaultSession()
                    .map(sparkContextFromSession)
                    .orElse(activeSparkContext))
            .flatMap(
                ctx ->
                    Optional.ofNullable(ctx.dagScheduler())
                        .map(ds -> ds.jobIdToActiveJob().get(jobStart.jobId())))
            .flatMap(ScalaConversionUtils::asJavaOptional);
    Set<Integer> stages =
        ScalaConversionUtils.fromSeq(jobStart.stageIds()).stream()
            .map(Integer. class::cast)
            .collect(Collectors.toSet());

    if (sparkVersion. startsWith("3")) {<!-- -->
      jobMetrics.addJobStages(jobStart.jobId(), stages);
    }

    Optional.ofNullable(getSqlExecutionId(jobStart.properties()))
        .map(Optional::of)
        .orElseGet(
            () ->
                asJavaOptional(
                        SparkSession. getDefaultSession()
                            .map(sparkContextFromSession)
                            .orElse(activeSparkContext))
                    .flatMap(
                        ctx ->
                            Optional.ofNullable(ctx.dagScheduler())
                                .map(ds -> ds.jobIdToActiveJob().get(jobStart.jobId()))
                                .flatMap(ScalaConversionUtils::asJavaOptional))
                    .map(job -> getSqlExecutionId(job.properties())))
        .map(Long::parseLong)
        .map(id -> getExecutionContext(jobStart.jobId(), id))
        .orElseGet(() -> getExecutionContext(jobStart.jobId()))
        .ifPresent(
            context -> {<!-- -->
              // set it in the rddExecutionRegistry so jobEnd is called
              activeJob.ifPresent(context::setActiveJob);
              // start
              context.start(jobStart);
            });
  }

The context.start method is the entry point of the parser (p.s. If it is the End event of the listener, it corresponds to context.end), call runEventBuilder.run to execute the logical plan analysis, Only the entry code is put here, and the analysis details will be in-depth in the next section:

 @Override
  public void start(SparkListenerSQLExecutionStart startEvent) {<!-- -->
    log.debug("SparkListenerSQLExecutionStart - executionId: {}", startEvent.executionId());
    if (!olContext.getQueryExecution().isPresent()) {<!-- -->
      log.info(NO_EXECUTION_INFO, olContext);
      return;
    } else if (EventFilterUtils. isDisabled(olContext, startEvent)) {<!-- -->
      log. info(
          "OpenLineage received Spark event that is configured to be skipped: SparkListenerSQLExecutionStart");
      return;
    }
    RunEvent event =
        runEventBuilder.buildRun(
            buildParentFacet(),
            openLineage.newRunEventBuilder().eventTime(toZonedTime(startEvent.time())),
            buildJob(olContext. getQueryExecution(). get()),
            startEvent);

    log.debug("Posting event for start {}: {}", executionId, event);
    eventEmitter. emit(event);
  }

2.4 Logic plan analysis

Here we mainly explain the idea of the logical plan accessor to obtain metadata information, and first briefly summarize the irrelevant entry codes

 RunEvent event =
        runEventBuilder.buildRun(
            buildParentFacet(),
            openLineage.newRunEventBuilder().eventTime(toZonedTime(startEvent.time())),
            buildJob(olContext. getQueryExecution(). get()),
            startEvent);
······
······
  RunEvent buildRun(
      Optional<ParentRunFacet> parentRunFacet,
      RunEventBuilder runEventBuilder,
      JobBuilder jobBuilder,
      SparkListenerSQLExecutionStart event) {<!-- -->
    runEventBuilder.eventType(RunEvent.EventType.START);
    return buildRun(parentRunFacet, runEventBuilder, jobBuilder, event, Optional.empty());
  }
······
······
  RunEvent buildRun(
      Optional<ParentRunFacet> parentRunFacet,
      RunEventBuilder runEventBuilder,
      JobBuilder jobBuilder,
      SparkListenerStageSubmitted event) {<!-- -->
    Stage stage = stageMap.get(event.stageInfo().stageId());
    RDD<?> rdd = stage.rdd();

    List<Object> nodes = new ArrayList<>();
    nodes.addAll(Arrays.asList(event.stageInfo(), stage));

    nodes.addAll(Rdds.flattenRDDs(rdd));

    return populateRun(parentRunFacet, runEventBuilder, jobBuilder, nodes);
  }

The core code logic is in populateRun, which is responsible for packaging metadata, fetching metadata information, and calling the previously registered accessor of RunEventBuilder to analyze the logic plan. Just take the example of obtaining the Input data set in the logical plan. The logic of RunEventBuilder obtaining the Visitor (visitor) during class loading is as follows:

 @Override
  public Collection<PartialFunction<LogicalPlan, List<InputDataset>>>
      createInputDatasetQueryPlanVisitors(OpenLineageContext context) {<!-- -->
    List<PartialFunction<LogicalPlan, List<InputDataset>>> inputDatasets =
        visitorFactory. getInputVisitors(context);

    ImmutableList<PartialFunction<LogicalPlan, List<InputDataset>>> inputDatasetVisitors =
        ImmutableList.<PartialFunction<LogicalPlan, List<InputDataset>>>builder()
            .addAll(
                generate(
                    eventHandlerFactories,
                    factory -> factory.createInputDatasetQueryPlanVisitors(context)))
            .addAll(inputDatasets)
            .build();
    context.getInputDatasetQueryPlanVisitors().addAll(inputDatasetVisitors);
    return inputDatasetVisitors;
  }

Every time the monitoring is triggered, populateRun will call the visitor in the handler to collect the logic of the Input dataset (namely map(inputVisitor)):

 Function1<LogicalPlan, Collection<InputDataset>> inputVisitor =
        visitLogicalPlan(PlanUtils. merge(inputDatasetQueryPlanVisitors));

    List<InputDataset> datasets =
        Stream.concat(
                buildDatasets(nodes, inputDatasetBuilders),
                openLineageContext
                    .getQueryExecution()
                    .map(
                        qe ->
                            fromSeq(qe.optimizedPlan().map(inputVisitor)).stream()
                                .flatMap(Collection::stream)
                                .map(((Class<InputDataset>) InputDataset. class)::cast))
                    .orElse(Stream.empty()))
            .collect(Collectors.toList());

2.5 Get metadata

After populateRun gets all the metadata, it will be wrapped into a RunEvent

 private RunEvent populateRun(
      Optional<ParentRunFacet> parentRunFacet,
      RunEventBuilder runEventBuilder,
      JobBuilder jobBuilder,
      List<Object> nodes) {<!-- -->
    OpenLineage openLineage = openLineageContext. getOpenLineage();

    RunFacetsBuilder runFacetsBuilder = openLineage. newRunFacetsBuilder();
    OpenLineage. JobFacetsBuilder jobFacetsBuilder =
        openLineageContext.getOpenLineage().newJobFacetsBuilder();

    parentRunFacet.ifPresent(runFacetsBuilder::parent);
    OpenLineage.JobFacets jobFacets = buildJobFacets(nodes, jobFacetBuilders, jobFacetsBuilder);
    List<InputDataset> inputDatasets = buildInputDatasets(nodes);
    List<OutputDataset> outputDatasets = buildOutputDatasets(nodes);
    ······
    ······
    return runEventBuilder. build();

Finally, call the emit method to send the meta information of runEvent

 public void emit(OpenLineage. RunEvent event) {<!-- -->
    try {<!-- -->
      this.client.emit(event);
      log.debug(
          "Emitting lineage completed successfully: {}", OpenLineageClientUtils.toJson(event));
    } catch (OpenLineageClientException exception) {<!-- -->
      log.error("Could not emit lineage w/ exception", exception);
    }
  }


The red report here can be ignored, because openlineage uses @slf4j annotation, the code will be generated after compile

3. Specific implementation ideas of Visitor (follow-up)

This is a new hole. The main logic of Vistor is the traversal operation of the Spark logical plan (tree). The content involved will be more complicated. Different plug-ins have different traversal ideas, so the blogger will start with how to traverse the logical plan of Spark itself. To pave the way, and then gradually expand to the implementation ideas of each plug-in Vistor.

4. Summary

Follow-up will introduce: Atlas lineage plug-in implementation ideas, Spline lineage plug-in implementation ideas, data lineage Antlr4 analysis, Atlas-Openlineage-Spline multi-system metadata synchronization. Welcome everyone to leave a comment~