Hadoop Mapper calculation and adjustment logic under various InputFormats

  1. FileInputFormat

  1. Calculate splitSize
  • goalSize total file size

  • minSize configuration mapreduce.input.fileinputformat.split.minsize, default value 1

  • blockSize configuration dfs.block.size default value 128m

computeSplitSize

protected long computeSplitSize(long goalSize, long minSize,
                                     long blockSize) {
  return Math.max(minSize, Math.min(goalSize, blockSize));
}
  1. Calculate the number of MapTasks
job.setLong(NUM_INPUT_FILES, files.length);
long totalSize = 0; //compute total size

for (FileStatus file: files) { // check we have valid files
  if (file.isDirectory()) {
    throw new IOException("Not a file: " + file.getPath());
  }
  totalSize + = file.getLen();
}

long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong("mapred.min.split.size", 1), minSplitSize);

for (FileStatus file: files) {
  Path path = file.getPath();
  long length = file.getLen();
  if (length != 0) { // The file is not empty
    FileSystem fs = path.getFileSystem(job);
    BlockLocation[] blkLocations;
    if (file instanceof LocatedFileStatus) {
      blkLocations = ((LocatedFileStatus) file).getBlockLocations();
    } else {
      blkLocations = fs.getFileBlockLocations(file, 0, length);
    }
    if (isSplitable(fs, path)) { // Can be divided into chunks
      long blockSize = file.getBlockSize();
      long splitSize = computeSplitSize(goalSize, minSize, blockSize);

      long bytesRemaining = length;
      // bytesRemaining/splitSize > 1.1 Then split into chunks
      while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
        String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
            length-bytesRemaining, splitSize, clusterMap);
        splits.add(makeSplit(path, length-bytesRemaining, splitSize,
            splitHosts[0], splitHosts[1]));
        bytesRemaining -= splitSize; //The remaining file size after splitting into blocks
      }

      if (bytesRemaining != 0) {
        String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
            - bytesRemaining, bytesRemaining, clusterMap);
        splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
            splitHosts[0], splitHosts[1]));
      }
    } else { // Cannot be divided into blocks. The number of maps is 1
      String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
      splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
    }
  } else {
    //Create empty hosts array for zero length files
    splits.add(makeSplit(path, 0, length, new String[0]));
  }
}
  • Loop through all files

    • Get file format

      • If the file format does not support segmentation, it will be divided into 1

      • If the file format supports splitting and the file size/splitSize is >1.1, then it will be split into 1 block, otherwise it will be directly into 1 block.

  1. Adjust parameters
  • If you want to increase the number of maps, set mapred.map.tasks to a larger value.

  • If you want to reduce the number of maps, set mapred.min.split.size to a larger value.

  1. Expansion:
Compression and segmentation

Whether it can be split: whether the corresponding compression algorithm can search any position of the data stream and read the data further down.

Here are three points of explanation:

  • Consider, for example, a 1 GB file stored in the HDFS file system without compression. If the block size of HDFS is set to 128, then the file will be stored in 8 blocks. A MapReduc/Spark job that uses this file as input data will create 8 map/task tasks, one for each data block. as input data.

  • Now, if compressed by gzip, the file size is 1GB. As before, HDFS also stores this file into 8 data blocks. However, each individual map/task task will not be able to process data independently of other tasks. Officially, the reason is because the data is cut into blocks when stored in HDFS, and the compression algorithm cannot be read from anywhere.

  • The popular explanation is that each block stored in HDFS is not a complete file. We can think of a complete file as having a head and tail identifier. Because it is divided, each data block has some headers. mark, some have tail marks, and some do not have head or tail marks, so multitasking cannot be done to process this file in parallel. For this kind of non-segmentation, all HDFS data blocks of the file can only be transferred to a map/task task for processing, but most of the data blocks are not stored on the node of this task, so they need to be transferred across nodes. , and cannot be processed in parallel, so the running time may be very long.

  1. CombineFileInputFormat

In MR practice, there will be many small files. A single file generates a mapper, which is a waste of resources. If there is no reduce logic in the future, many small files will be generated, and the number of files will skyrocket, which will have a negative impact on subsequent Hive jobs. Influence.

Therefore, multiple files need to be combined into one split in mapper as input, and CombineFileInputFormat meets our needs.

  1. Section method

First time: Split all blocks on the same DN, generation method:

  • Loop nodeToBlocks to get which blocks are on each DN

  • Loop through these block lists

  • Remove the block from blockToNodes to prevent the same block from being included in multiple splits

  • Add the block to a list of valid blocks. This list mainly retains which blocks have been removed from blockToNodes so that they can be restored to blockToNodes later.

  • Add the block size to the temporary variable curSplitSize

  • Determine whether curSplitSize has exceeded the set maxSize

    • If it exceeds, execute and add split information, and reset curSplitSize and validBlocks

    • If not exceeded, continue looping the block list and jump to step 2.

  • The block list on the current DN is cycled to determine whether the remaining blocks are allowed to be split (whether the sum of the remaining block sizes is greater than the minimum split size of each DN)

    • If allowed, execute and add split information

    • If not allowed, return these remaining blocks to blockToNodes

  • reset

  • Skip to step 1

Second time: Merge blocks that are no longer on the same DN but on the same Rack (only the remaining blocks from before);

Finally, merge the blocks that are neither in the same DN nor in the same rack (the remaining blocks after the first two steps)

  1. Adjust parameters

Adjustable parameters to affect the number of maps

  • mapreduce.input.fileinputformat.split.minsize.per.node (same as mapred.min.split.size.per.node) Default value 1

  • mapreduce.input.fileinputformat.split.minsize.per.rack (same as mapred.min.split.size.per.rack) Default value 1

  • mapreduce.input.fileinputformat.split.maxsize (same as mapred.max.split.size) default value 25600000 (256MB)

The most suitable parameter to adjust is mapreduce.input.fileinputformat.split.maxsize, which is the size of the merged files.

  1. DBInputFormat

statement = connection.createStatement();

// Returns the query for getting the total number of rows
results = statement.executeQuery(getCountQuery());
results.next();

long count = results.getLong(1);
int chunks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);

// size of each block
long chunkSize = (count / chunks);

results.close();
statement.close();

List<InputSplit> splits = new ArrayList<InputSplit>();

// Split the rows into n-number of chunks and adjust the last chunk
// accordingly
for (int i = 0; i < chunks; i + + ) {
  DBInputSplit split;

  if ((i + 1) == chunks)
    split = new DBInputSplit(i * chunkSize, count);
  else
    split = new DBInputSplit(i * chunkSize, (i * chunkSize)
         + chunkSize);

  splits.add(split);
}

connection.commit();
return splits;
  1. Section method
  • Get the total number of rows after SQL query and get the number of MAP settings

  • Calculate each block size total_number_of_rows / map_num

  • Divide the obtained results according to the size of each block

  1. Adjust parameters
  • mapreduce.job.maps

Adjust mapreduce.job.maps as needed to achieve the desired number of maps

Reference link

hadoop file block merging hadoop file segmentation_mob6454cc76bc4a’s technical blog_51CTO blog

Detailed explanation of CombineFileInputFormat in Hadoop_job.setinputformatclass(combinesequencefileinputfo-CSDN Blog