There is no problem with spark-sql writing to the object storage path (the abnormal path disappears automatically)

1. Problem analysis

1, environment
spark3.2 hadoop3.2.2

2, Problem phenomenon
When inserting overwrite table to the hive table, an error is reported that the path does not exist, causing the task to fail.

When the path to the table is on hdfs, there is no problem. There will be problems when the path of the table is on the object store.

Insert overwrite table with partition path is no problem.

Specific error:

org.apache.spark.sql.AnalysisException: Path does not exist: s3a://xxxx/hive/warehouse/tablename
                 at org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:978)
                 at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:780)
                 at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:777)

3. Analysis of the cause of the problem
After analyzing the full task logs, it was found that spark am failed twice.

The first failure was caused by an abnormal data conversion in the task, which is a business exception. However, when the fault-tolerant mechanism of yarn starts spark am for the second time, an exception of Path does not exist is reported.

In the execution logic of spark inseroverwrite, the path of the table is deleted first, and then written. And the action of deleting the path is done on the driver side. The re-creation of the table path is done when the task is executed. So the problem is why the task does not create the object storage table path again. First sort out the writing logic of the task (the algorithm of fileoutcommitter version 1.0 is used by default). Before the task writes data, a temporary directory will be created and the data will be written into the temporary directory.

When a task is successfully executed, the data is transferred from the temporary directory to the intermediate directory through the commit task logic of the commiter. When all tasks are successfully completed, the driver will call the commit job logic of the commiter to transfer the data in the intermediate directory to the final path, which is the table path. It should be noted that if the directory or path involved in the above process does not exist, it will be created automatically.

For the table whose table path is on hdfs, the temporary directory and the intermediate directory are all under the table path. So even if the task has a business exception, the temporary directory has been created before the exception occurs, and the path of the table has also been created. When am is executed for the second time, the Path does not exist error will not be reported.

For tables whose table path is in object storage, the temporary directory is a local directory, similar to /xx/xxx. Therefore, the task has a business exception, and the table path is not created. Why is the temporary directory a local directory, because the task uses the s3a protocol to commit task and commit job.

So the problem is on the s3a protocol.

4. Problem solution
Do not use the s3a protocol to execute insert overwrite table.

set spark.sql.sources.commitProtocolClass=org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol;
set spark.sql.sources.outputCommitterClass=;

Note: If the s3a protocol is not used, the commit performance will be seriously affected, because the mv or rename of the object storage is copied first and then deleted.

3.parquet file write optimization

set spark.sql.parquet.fs.optimized.committer.optimization-enabled=true;

The actual commitTask details of FileOutputCommitter are related to the parameter mapreduce.fileoutputcommitter.algorithm.version (the default value is 1).

When mapreduce.fileoutputcommitter.algorithm.version=1:

The operation of commit is to rename ${output.dir}/_temporary/${appAttemptId}/_temporary/${taskAttemptId} to ${output.dir}/_temporary/${appAttemptId}/${taskId}

When mapreduce.fileoutputcommitter.algorithm.version=2:

The operation of commit is to move the files under ${output.dir}/_temporary/${appAttemptId}/_temporary/${taskAttemptId} to the ${output.dir} directory (that is, the final output directory)

The spark task can enable the commit logic of version 2 by setting the spark configuration spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2
Before hadoop 2.7.0, the implementation of FileOutputCommitter did not distinguish between versions, and the commit logic of version=1 was uniformly used. Therefore, if the hadoop dependency package version of spark is lower than 2.7.0, it is useless to set mapreduce.fileoutputcommitter.algorithm.version=2

2. Detailed introduction to commit principle

When Spark outputs data to HDFS, the following problems need to be solved:

  • Since multiple tasks write data to HDFS at the same time, how to ensure that all files written by all tasks are either visible to the outside world at the same time, or not visible to the outside world at the same time, that is, to ensure data consistency
  • The same Task may have two identical Task instances to write the same data to HDFS because of Speculation, how to ensure that only one commit is successful
  • For large jobs (such as tens of thousands or even hundreds of thousands of tasks), how to efficiently manage all files

commit principle

This article explains the principle of commit in detail by executing the following Spark program in Local mode

sparkContext.textFile("/json/input.zstd")
  .map(_.split(","))
  .saveAsTextFile("/jason/test/tmp")

Before describing the principle of commit in detail, several predicates need to be explained

  • Task, that is, a Task of a Stage in a Job of an Application
  • TaskAttempt, each execution of Task is regarded as a TaskAttempt. For the same Task, multiple TaskAttemps may exist at the same time
  • Application Attempt, that is, an execution of Application

In this article, the following abbreviations will be used

  • ${output.dir.root} is the output directory root path
  • ${appAttempt} is the Application Attempt ID, which is an integer and starts from 0
  • ${taskAttemp} is the Task Attetmp ID, which is an integer and starts from 0

Check Job Output Directory

Before starting the Job, the Driver first checks whether the output directory already exists through the checkOutputSpecs method of FileOutputFormat. If it already exists, directly throw FileAlreadyExistsException

Driver executes setupJob

Before the job starts, the Driver (this example uses local mode, so it is executed by the main thread) calls FileOuputCommitter.setupJob to create the Application Attempt directory, namely output.dir.root/temporary/xxxx/{appAttempt}

Task executes setupTask

The FileOutputCommitter.setupTask method is executed by each Task (this example uses local mode, so it is executed by the task thread). This method does nothing because the Task temporary directory is created by the Task on demand.

Create Task directory on demand

In this example, the Task needs to create a LineRecordWriter through the getRecordWriter method of TextOutputFormat to write data. Before creating, you need to set the Task output path through FileOutputFormat.getTaskOutputPath, that is, ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}/${fileName }. All data of this Task Attempt are written in the files under this directory

Check if you need commit

After the Task execution data is written, check whether it needs to commit through the FileOutputCommitter.needsTaskCommit method. It is written under ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} data.

The check is based on whether the ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} directory exists

If commit is required and Output commit coordination is enabled, the OutputCommitCoordinator on the Driver side needs to use RPC to determine whether the Task Attempt can be committed

The reason why the CommitCoordinator on the Driver side needs to judge whether to commit is because there may be cases where multiple Attemps of the same Task write data at the same time and apply for commit due to speculation or other reasons (such as the previous TaskAttemp not being Killed successfully).

CommitCoordinator

When the TaskAttempt applying for commitTask is a failed Attempt, it will be rejected directly

If the TaskAttempt succeeds, and the CommitCoordinator has not allowed the commit request of other Attempts of the Task, the commit request of the TaskAttempt is allowed

If the CommitCoordinator has allowed the TaskAttempt’s commit request before, it will continue to agree to the TaskAttempt’s commit request, that is, the CommitCoordinator’s processing of the application is idempotent.

If the TaskAttempt succeeds, and the CommitCoordinator has previously allowed the commit requests of other Attempts of the Task, the commit request of the current TaskAttempt is directly rejected

In order to achieve the above functions, OutputCommitCoordinator maintains a StageState for each ActiveStage as follows

private case class StageState(numPartitions: Int) {
  val authorizedCommitters = Array.fill[TaskAttemptNumber](numPartitions)(NO_AUTHORIZED_COMMITTER)
  val failures = mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]()
  }

In this data structure, the TaskAttempt that each Task is allowed to commit is saved. The default is NO_AUTHORIZED_COMMITTER

At the same time, all failed Attempts for each Task are saved

commitTask

When the TaskAttempt is allowed to commit, the Task (in this case, it is executed by the task thread because it uses the local model) will commitTask in the following way.

When the value of mapreduce.fileoutputcommitter.algorithm.version is 1 (the default value), the Task will assign taskAttemptPath to ${output.dir.root}/_temporary/${appAttempt}/_temporary/ ${taskAttempt} The command is committedTaskPath, which is ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt}

If the value of mapreduce.fileoutputcommitter.algorithm.version is 2, directly set taskAttemptPath as ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}< All files in /code> are moved to outputPath, which is ${output.dir.root}/

commitJob

When all Tasks are executed successfully, the Driver (in this case, the main thread executes because the local model is used) executes FileOutputCommitter.commitJob

If the value of mapreduce.fileoutputcommitter.algorithm.version is 1, the Driver will traverse all committedTaskPaths in a single thread, that is, ${output.dir.root}/_temporary/${appAttempt}/${ taskAttempt}, and move all files under it to finalOutput, namely ${output.dir.root}

If the value of mapreduce.fileoutputcommitter.algorithm.version is 2, no files need to be moved. Because all Task output files have been moved to finalOutput in commitTask, which is ${output.dir.root}

After all committed Task output files are moved to finalOutput, namely ${output.dir.root}, Driver deletes ${output.dir.root}/_temporary/ through cleanupJob Download all content

recoverTask

The commitTask and commitJob mechanisms described above ensure the data consistency of different Attemps of different Tasks in an Application Attemp when committing

When the entire Application is retryed, the Task that has been successfully committed in the previous Application Attemp does not need to be re-executed, and its data can be directly restored

When restoring the Task, first obtain the last Application Attempt and the corresponding committedTaskPath, namely ${output.dir.root}/_temporary/${preAppAttempt}/${taskAttempt}

If the value of mapreduce.fileoutputcommitter.algorithm.version is 1, and preCommittedTaskPath exists (indicating that the Task has been committed in the previous Application Attempt), then directly rename preCommittedTaskPath to committedTaskPath

If the value of mapreduce.fileoutputcommitter.algorithm.version is 2, there is no need to restore any data, because the data of the committed Task in the previous Application Attempt has been moved to ${output in commitTask .dir.root} in

abortTask

When the Task is aborted, the FileOutputCommitter.abortTask method is called by the Task to delete the ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}

abortJob

Aborting the Job is done by the Driver calling the FileOutputCommitter.abortJob method. This method deletes ${output.dir.root}/_temporary through the FileOutputCommitter.cleanupJob method

Summary

V1 vs. V2 committer process

V1 committer (that is, the value of mapreduce.fileoutputcommitter.algorithm.version is 1), the commit process is as follows

  • Task thread writes TaskAttempt data to ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
  • commitTask moves ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} to ${output.dir.root}/_temporary/ by the Task thread ${appAttempt}/${taskAttempt}
  • commitJob moves all ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt} to ${output.dir.root} in turn by Driver single thread code>, then create a _SUCCESS tag file
  • recoverTask moves ${output.dir.root}/_temporary/${preAppAttempt}/${preTaskAttempt} to ${output.dir.root}/_temporary/${ by Task thread appAttempt}/${taskAttempt}

V2 committer (that is, the value of mapreduce.fileoutputcommitter.algorithm.version is 2), the commit process is as follows

  • Task thread writes TaskAttempt data to ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
  • commitTask moves ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} to ${output.dir.root} by the Task thread >
  • commitJob creates _SUCCESS tag file
  • recoverTask No action required

V1 vs. V2 committer performance comparison

After the execution of the Job in V1, the driver uses the commitJob method to move the output files of all Tasks to the output root directory in a single-threaded series. The unit of moving is file. When the number of tasks is large (large job, or a large number of small tasks caused by small files), and the Name Node RPC is slow, the process will take a long time. In practice, it may happen that all Tasks are executed, but the Job is not. Even commitJob takes longer than all Task execution times

In V2, after the Task ends, the Task will move its own data files to the output root directory in the commitTask method. On the one hand, the file is moved when the task ends, and there is no need to wait for the end of the job to move the file, that is, the file movement is initiated earlier and ends earlier. On the other hand, moving files between different tasks in parallel greatly shortens the time spent on file movement of all tasks in the entire job

V1 vs. V2 committer consistency comparison

In V1, the data files will be moved to the output root directory and will be visible to the outside world only when the Job is finished. Before this, all files were inside ${output.dir.root}/_temporary/${appAttempt} and its sub-files and were not visible to the outside world.

When the commitJob process takes a short time, it is less likely to fail. It can be considered that the commit process of V1 is a two-phase commit. Either all tasks commit successfully or fail.

Due to the problems mentioned above, the commitJob process may take a long time. If the Driver fails during this process, some Task data may be moved to ${output.dir.root} to be visible to the outside world, and some Task data may The problem is not visible to the outside world if it is not moved in time. At this time, the problem of data inconsistency occurred

V2 When the Task ends, the data is immediately moved to ${output.dir.root}, which is immediately visible to the outside world. If the application fails during execution, the committed Task data is still visible to the outside world, while the failed Task data or uncommitted Task data is not visible to the outside world. That is, V2 is more prone to data consistency problems

3. V1 and V2 commiter version comparison

mapreduce.fileoutputcommitter.algorithm.version The parameter has a great impact on the file output. The following summarizes the advantages and disadvantages of the two versions in various aspects.

1. Performance

v1 only copies the output files to the temporary directory after the task ends, and then the Driver copies these files to the output directory after the job ends. If there are a lot of files, the Driver needs to constantly interact with the NameNode, and this process is single-threaded, so it will inevitably increase the time consumption. If we encounter a spark task where all the tasks are over but the task is not over, it is likely that the Driver is still copying files.

v2 immediately copies the output file to the output directory after the task ends, and the Driver does not need to copy it after the Job ends.

Therefore, in terms of performance, v2 beats v1.

2. Data consistency

V1 only copies files in batches after the job is finished. In fact, it is a two-phase submission, which can ensure that all data is displayed to the user, or none is displayed (of course, complete data consistency cannot be guaranteed during the copy process, but this time is generally not too long). If the task fails, you can also directly delete the _temporary directory, which can better ensure data consistency.

V2 copies files after the task ends, which will cause the user to see part of the output before the spark task is completed, so that there is no way to ensure data consistency. In addition, if the task fails during the output process, some data will be successfully output and some will not be output.

Therefore, in terms of data consistency, v1 is better than v2

3. Summary

Obviously, if we are obsessed with performance and don't care about the consistency of data output, we can set mapreduce.fileoutputcommitter.algorithm.version to 2 to improve performance.

But if we require high data consistency for the output, it is best not to set mapreduce.fileoutputcommitter.algorithm.version to 2 for performance.

The knowledge points of the article match the official knowledge files, and you can further learn related knowledge MySQL entry skill tree SQL advanced skillsWrite and conflict 55540 people are learning the system

syntaxbug.com © 2021 All Rights Reserved.