In Spark SQLHadoopMapReduceCommitProtocol, choose 1 or 2 for mapreduce.fileoutputcommitter.algorithm.version

Background

This article is based on spark 3.1.1
For spark, the default mapreduce.fileoutputcommitter.algorithm.version is 1
This can be seen in the SparkHadoopUtil.scala code:

 private def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
    // Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar"
    for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) {
      hadoopConf.set(key.substring("spark.hadoop.".length), value)
    }
    if (conf.getOption("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version").isEmpty) {
      hadoopConf.set("mapreduce.fileoutputcommitter.algorithm.version", "1")
    }
  }

Gossip

In the InsertIntoHadoopFsRelationCommand class, the FileFormatWriter.write method will be called, and finally the sparkSession.sparkContext.runJob method will be called:

 sparkSession.sparkContext.runJob(
        rddWithNonEmptyPartitions,
        (taskContext: TaskContext, iter: Iterator[InternalRow]) => {
          executeTask(
            description = description,
            jobIdInstant = jobIdInstant,
            sparkStageId = taskContext. stageId(),
            sparkPartitionId = taskContext.partitionId(),
            sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE,
            committer,
            iterator = iter)
        },
        rddWithNonEmptyPartitions.partitions.indices,
        (index, res: WriteTaskResult) => {
          committer.onTaskCommit(res.commitMsg)
          ret(index) = res
        })

The executeTask method will finally call the dataWriter.write and commit methods:

 override def commit(): WriteTaskResult = {
    releaseResources()
    val summary = ExecutedWriteSummary(
      updatedPartitions = updatedPartitions.toSet,
      stats = statsTrackers. map(_. getFinalStats()))
    WriteTaskResult(committer.commitTask(taskAttemptContext), summary)
  }

In the end, HadoopMapReduceCommitProtocol.commitTask will be called, thus calling the FileOutputCommitter.commitTask method:

···
if (algorithmVersion == 1) {
    Path committedTaskPath = getCommittedTaskPath(context);
    if (fs. exists(committedTaskPath)) {
       if (!fs.delete(committedTaskPath, true)) {
         throw new IOException("Could not delete " + committedTaskPath);
       }
    }
    if (!fs.rename(taskAttemptPath, committedTaskPath)) {
      throw new IOException("Could not rename " + taskAttemptPath + " to "
           + committedTaskPath);
    }
    LOG.info("Saved output of task '" + attemptId + "' to " +
        committedTaskPath);
  } else {
    // directly merge everything from taskAttemptPath to output directory
    mergePaths(fs, taskAttemptDirStatus, outputPath);
    LOG.info("Saved output of task '" + attemptId + "' to " +
        outputPath);
···

The algorithmVersion here will perform different operations depending on whether it is 1 or 2:

  • For 1, the file generated by the task will be moved to another temporary directory, and then moved to the final write-out file directory after the job is completed
  • If it is lower than 2, the file generated by the task will be moved to the final write-out file directory

For the advantages and disadvantages of 1 and 2: 1 is better performance than 2, 2 is better consistency than 1, the following analysis of how to do it in spark:

The processing of this problem in spark

Like SPARK-33019 says here:

Apache Spark provides multiple distributions with Hadoop 2.7 and Hadoop 3.2. spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version depends on the Hadoop version. Apache Hadoop 3.0 switches the default algorithm from v1 to v2 and now there exists a discuss to remove v2. We had better provide a consistent default behavior of v1 across various Spark distributions

That is, in order to ensure the forward and backward compatibility of spark, it is forcibly set to the V1 version
Of course, the official Spark documentation also explains Recommended settings for writing to object stores:

For object stores whose consistency model means that rename-based commits are safe use the FileOutputCommitter v2 algorithm for performance; v1 for safety

For more details, please refer to Separation of Storage and Computing on Big Data Cloud, What Should We Pay Attention to?

How to deal with this problem in Hadoop

Refer to MAPREDUCE-7282:

he v2 MR commit algorithm moves files from the task attempt dir into the dest dir on task commit -one by one

It is therefore not atomic

if a task commit fails partway through and another task attempt commits -unless exactly the same filenames are used, output of the first attempt may be included in the final result
if a worker partitions partway through task commit, and then continues after another attempt has committed, it may partially overwrite the output -even when the filenames are the same
Both MR and spark assume that task commits are atomic. Either they need to consider that this is not the case, we add a way to probe for a committer supporting atomic task commit, and the engines both add handling for task commit failures (probably fail job)

Better: we remove this as the default, maybe also warn when it is being used

The general meaning is to ensure the atomicity of task commits, so a good suggestion is to remove v2, and V2 is not recommended.
Of course in the following discussion:

Daryn Sharp Added a comment:
I'm also -1 on changing the default. It exposes users to new (old but new to them) behavior that may have quirks. This was a 2.7 change from 5 years ago so if it's a high risk issue our customers would have squawked by now. Has this been frequently observed or theorized?

Notably our users won't tolerate the performance regression and SLA misses. I seem to recall jobs that ran for a single-digit minutes followed by a double-digit commit. The v2 commit amortized the commit to under a minute.

I'm not a MR expert. Here's my understanding:

if a task commit fails partway through and another task attempt commits -unless exactly the same filenames are used, output of the first attempt may be included in the final result

Isn't that indicative of a non-deterministic job? Should the risk to a few "bad" jobs outweight the benefit to the mass majority of jobs? Why not change the committer for at risk jobs?

if a worker partitions partway through task commit, and then continues after another attempt has committed, it may partially overwrite the output -even when the filenames are the same

I don't think this can happen. Tasks request permission from the AM to commit.

---
Steve Loughran added a comment:

Tasks request permission from the AM to commit.

yes, and then we assume that they continue to completion, rather than pausing for an extended period of time, so by the time the AM/spark driver gets a timeout, it can be assumed to be one of a network failure or the worker has failed/VM/k8s container terminated. The "suspended for a long time and then continues" risk does exist, and is unlikely on a physical cluster, but in a world of VMs, not entirely inconceivable.

I note the MR AM does track its time from last heartbeat to the YARN RM to detect partitions, workers don't.

What is interesting here is that, ideally, if each task is submitted, it communicates with the Driver to ensure that only one task can be submitted successfully (other attempts of the same task will not be submitted), then the correctness of the task commit can also be guaranteed However, if the driver and executor time out due to network reasons, and at the same time, the executor where the task is located communicates with the Driver (the task can be submitted), then the task will continue to submit the task until the driver sends a notification , to remove the executor, then there will still be data inconsistency during this period (of course this involves the timeout configuration spark.executor.heartbeatInterval and spark.network.timeout in spark and spark.rpc.askTimeout).

Conclusion

So the final conclusion is: V1 is safe, but the performance is not good, V2 may be unsafe, but the performance is good, it is recommended to use V1.