Spark custom output file

1. Overview

This article will start from the source code to describe how spark calls Hadoop several OutputFormats to achieve file output. Here will describe several commonly used operators in work, for example: saveAsTextFile(path), saveAsHadoopFile(path)

2. Spark source code analysis

The underlying call of saveAsTextFile(path) is also saveAsHadoopFile(path), so here is mainly about the source code of the latter; This step will also take you to realize what can be customized;

1.main

def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("")
    val sc = new SparkContext(conf)
    // Disable the success file
    sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
    val value: RDD[(String,Int)] = sc. parallelize(List(
      ("1",1), ("1",1), ("2",1), ("2",1),("2",1),
    ))
    value1
      .saveAsHadoopFile("C:\Users\Desktop\learn\spark_program_test\definedFileName"
                        ,classOf[String]
                        ,classOf[String]
                        ,classOf[TextOutputFormat[String,String]])
    sc. stop()
  }

2. PairRDDFunctions

def saveAsHadoopFile[F <: OutputFormat[K, V]](
      path: String)(implicit fm: ClassTag[F]): Unit = self.withScope {
    saveAsHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]])
  }
def saveAsHadoopFile(
      path: String,
      keyClass: Class[_],
      valueClass: Class[_],
      outputFormatClass: Class[_ <: OutputFormat[_, _]],
      conf: JobConf = new JobConf(self.context.hadoopConfiguration),
      codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {
    // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
    val hadoopConf = conf
    hadoopConf.setOutputKeyClass(keyClass)
    hadoopConf.setOutputValueClass(valueClass)
    conf. setOutputFormat(outputFormatClass)
    for (c <- codec) {
      hadoopConf.setCompressMapOutput(true)
      hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
      hadoopConf.setMapOutputCompressorClass(c)
      hadoopConf.set("mapreduce.output.fileoutputformat.compress.codec", c.getCanonicalName)
      hadoopConf.set("mapreduce.output.fileoutputformat.compress.type",
        CompressionType.BLOCK.toString)
    }

    // Use configured output committer if already set
    if (conf. getOutputCommitter == null) {
      hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
    }

    // When speculation is on and output committer class name contains "Direct", we should warn
    // users that they may lose data if they are using a direct output committer.
    val speculationEnabled = self. conf. getBoolean("spark. speculation", false)
    val outputCommitterClass = hadoopConf. get("mapred. output. committer. class", "")
    if (speculationEnabled & amp; & amp; outputCommitterClass. contains("Direct")) {
      val warningMessage =
        s"$outputCommitterClass may be an output committer that writes data directly to " +
          "the final location. Because speculation is enabled, this output committer may " +
          "cause data loss (see the case in SPARK-10063). If possible, please use an output " +
          "committer that does not have this behavior (e.g. FileOutputCommitter)."
      logWarning(warningMessage)
    }

    FileOutputFormat.setOutputPath(hadoopConf,
      SparkHadoopWriterUtils.createPathFromString(path, hadoopConf))
    saveAsHadoopDataset(hadoopConf)
  }

Here the OutputFormat is specified as TextOutputFormat, if not specified, it is also the default TextOutputFormat; enter the second method of PairRDDFunctions, saveAsHadoopDataset(hadoopConf) and then enter;

def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {
    // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
    val hadoopConf = conf
    val outputFormatInstance = hadoopConf. getOutputFormat
    val keyClass = hadoopConf.getOutputKeyClass
    val valueClass = hadoopConf.getOutputValueClass
    if (outputFormatInstance == null) {
      throw new SparkException("Output format class not set")
    }
    if (keyClass == null) {
      throw new SparkException("Output key class not set")
    }
    if (valueClass == null) {
      throw new SparkException("Output value class not set")
    }
    SparkHadoopUtil.get.addCredentials(hadoopConf)

    logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
      valueClass. getSimpleName + ")")

    if (SparkHadoopWriterUtils.isOutputSpecValidationEnabled(self.conf)) {
      // FileOutputFormat ignores the filesystem parameter
      val ignoredFs = FileSystem. get(hadoopConf)
      hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
    }

    val writer = new SparkHadoopWriter(hadoopConf)
    writer. preSetup()

    val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
      // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
      // around by taking a mod. We expect that no task will be attempted 2 billion times.
      val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt

      val (outputMetrics, callback) = SparkHadoopWriterUtils.initHadoopOutputMetrics(context)

      writer.setup(context.stageId, context.partitionId, taskAttemptId)
      writer. open()
      var recordsWritten = 0L

      Utils. tryWithSafeFinallyAndFailureCallbacks {
        while (iter. hasNext) {
          val record = iter. next()
          writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])

          // Update bytes written metric every few records
          SparkHadoopWriterUtils.maybeUpdateOutputMetrics(outputMetrics, callback, recordsWritten)
          recordsWritten += 1
        }
      }(finallyBlock = writer. close())
      writer.commit()
      outputMetrics. setBytesWritten(callback())
      outputMetrics.setRecordsWritten(recordsWritten)
    }

    self.context.runJob(self, writeToFile)
    writer.commitJob()
  }

Arriving here is the main logic of writing to the file:

①writer.open(): It is the method of SparkHadoopWriter; first, the file name (such as part-0000) will be initialized here, and then passed into the RecordWriter returned by getRecordWriter of the OutputFormat class you set, so if you want to customize the file name, From here it seems that the getRecordWriter method can be rewritten, and how TextOutputFormat and MultipleTextOutputFormat rewrite getRecordWriter will be explained later;

def open() {
    val numfmt = NumberFormat. getInstance(Locale.US)
    numfmt.setMinimumIntegerDigits(5)
    numfmt. setGroupingUsed(false)

    val outputName = "part-" + numfmt. format(splitID)
    val path = FileOutputFormat. getOutputPath(conf. value)
    val fs: FileSystem = {
      if (path != null) {
        path. getFileSystem(conf. value)
      } else {
        FileSystem. get(conf. value)
      }
    }
    getOutputCommitter(). setupTask(getTaskContext())
    writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL)
  }

②writeToFile function: This is how to write a file specifically. First, you can see that only one file will be generated for each partition here, and then call the write method of the RecordWriter used by the OutputFormat you set to write the file; if you want to To define the content to write, you need to customize the RecordWriter class;

3. TextOutputFormat & MultipleTextOutputFormat

1. TextoutputFormat

This class can be copied directly, and then just change the code slightly according to your own needs. Next, I will come out from the requirements to see how to rewrite this class;

①The file encoding format is an encoding format other than UTF-8, or the newline character is not ‘\
‘: Because these two are hard-coded in the LineRecoderWriter of TextoutputFormat, you need to rewrite the TextoutputFormat class, just copy the entire class code Just do it, and then modify the place you need to change, for example:

public class MyOutput<K,V> extends FileOutputFormat<K, V> {
    protected static class LineRecordWriter<K, V>
            implements RecordWriter<K, V> {
        private static final String utf8 = "GBK";
        private static final byte[] newline;
        static {
            try {
                newline = "\r\
".getBytes(utf8);
            } catch (UnsupportedEncodingException uee) {
                throw new IllegalArgumentException("can't find " + utf8 + " encoding");
            }
        }
. . . 

The encoding format and line breaks are modified here

②Key/value separator, this can be written to death (when rewriting), or you can change the hadoopconf configuration in main

def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("")
    val sc = new SparkContext(conf)
    //Modify the key/value separator of the output file
    sc.hadoopConfiguration.set("mapreduce.output.textoutputformat.separator",",")

③ Modify the file name

@Override
    public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException {
        //Added by the rewritten class, this can be customized
        name = Integer. parseInt(name. split("-")[1]) + "";

. . .
}

④Modify the key/value writing logic: I have not modified it here, it can be modified according to the business logic;

 public synchronized void write(K key, V value)
                throws IOException {

            boolean nullKey = key == null || key instanceof NullWritable;
            boolean nullValue = value == null || value instanceof NullWritable;
            if (nullKey & amp; & amp; nullValue) {
                return;
            }
            if (!nullKey) {
                writeObject(key);
            }
            if (!(nullKey || nullValue)) {
                out.write(keyValueSeparator);
            }
            if (!nullValue) {
                writeObject(value);
            }
            out.write(newline);
        }
//When passed from PairRddFunction, the key/value are converted to Anyval, so here will go else
private void writeObject(Object o) throws IOException {
            if (o instanceof Text) {
                Text to = (Text) o;
                out.write(to.getBytes(), 0, to.getLength());
            } else {
                out.write(o.toString().getBytes(utf8));
            }
        }

4. MultipleTextOutputFormat

This is a simple custom file name provided by Hadoop, which can customize the output key/value data, but the final written file is still the LineRcorderWriter of TextOutputFormat, which means that the file encoding format and line break cannot be customized.

 //Modify the generated partition file name, the name passed in for each partition is different such as: Part-0001, the priority is lower than generateFileNameForKeyValue
  protected String generateLeafFileName(String name) {
    return name;
  }

  //key, value does not need to be explained. The name here is the name returned by generateLeafFileName. If there is no generateLeafFileName, it will be Part-0001. It should be noted that because the file is written in multiple partitions, if different partitions generate files with the same file name, it will be Overwritten, if only key is used, the same key must be in the same partition, key + name can be guaranteed not to be overwritten, but too many files may be generated
  protected String generateFileNameForKeyValue(K key, V value, String name) {
    return name;
  }

//Actually write the key
  protected K generateActualKey(K key, V value) {
    return key;
  }
  
  //actually written value
  protected V generateActualValue(K key, V value) {
    return value;
  }
  
  //This method determines the RecorderWriter that finally writes the file, which is called by the getRecordWriter method. In fact, mutipleOutputFormat, the rewritten RecordWriter (in the form of an internal class), just makes the name, key, and value customizable
  abstract protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs,
      JobConf job, String name, Progressable arg3) throws IOException;

Remarks: 1. If the business logic letter of guarantee needs to be divided according to the specified file size or number of items, in addition to using the foreachPartition operator, the source code of the saveAsHadoopDataset method of the PairRddFunction class must be modified for the rest;

2.spark will generate a .crc file when writing a local file, but will not generate it when writing to hdfs;