08 | How does MapReduce allow data to complete a journey?

In the last issue, we talked about the MapReduce programming model that divides the big data calculation process into two stages: Map and Reduce. Let’s review it first. In the Map stage, each data block is assigned a Map calculation task, and then all the map output keys are processed. Merge, the same Key and its corresponding Value are sent to the same Reduce task for processing. Through these two stages, engineers only need to follow the MapReduce programming model to develop complex big data computing programs.

So how does this program run in a distributed cluster? How does the MapReduce program find the corresponding data and perform calculations? The answer is that it requires the MapReduce computing framework. In the last issue, I talked about MapReduce as both a programming model and a computing framework. After talking about the programming model, today we will discuss how MapReduce allows data to complete a journey, that is, how the MapReduce computing framework operates.

First of all, I want to tell you that in practice, there are two key issues that need to be dealt with in this process.

How to assign a Map calculation task to each data block, that is, how the code is sent to the server where the data block is located, how it is started after sending, and how to know where the data you need to calculate is in the file after starting (what is the BlockID) .

How to aggregate the same output from map on different servers and send them to the Reduce task for processing.

So which steps of the MapReduce calculation process do these two key questions correspond to? According to what I said in the last issue, I found the diagram of the MapReduce calculation process. You can see the two places marked in red in the picture. These two key issues correspond to the two “MapReduce framework processing” in the picture. Specifically, they are MapReduce job starting and running, and MapReduce data merging and joining.

MapReduce job starting and running mechanism

Let’s take Hadoop 1 as an example. The MapReduce operation process involves three types of key processes.

1. Big data application process. This type of process is the main entry point for starting a MapReduce program. It mainly specifies Map and Reduce classes, input and output file paths, etc., and submits jobs to the Hadoop cluster, which is the JobTracker process mentioned below. This is a MapReduce program process started by the user, such as the WordCount program we mentioned in the last issue.

2.JobTracker process. This type of process instructs the TaskTracker process mentioned below to start a corresponding number of Map and Reduce process tasks based on the amount of input data to be processed, and manages task scheduling and monitoring of the entire job life cycle. This is a resident process of the Hadoop cluster. It should be noted that the JobTracker process is globally unique in the entire Hadoop cluster.

3.TaskTracker process. This process is responsible for starting and managing the Map process and the Reduce process. Because each data block needs to have a corresponding map function, the TaskTracker process is usually started on the same server as the HDFS DataNode process. In other words, the vast majority of servers in the Hadoop cluster run both the DataNode process and the TaskTracker process.

The JobTracker process and the TaskTracker process have a master-slave relationship. There is usually only one master server (or there is another backup machine that provides high-availability services, but only one server provides services to the outside world during operation, and only one really works). The slave server There may be hundreds or thousands of servers, and all slave servers obey the control and scheduling of the master server. The master server is responsible for allocating server resources to applications and scheduling job execution, while specific computing operations are completed on the slave server.

Specifically, the master server of MapReduce is JobTracker, and the slave server is TaskTracker. Do you remember that HDFS also has a master-slave architecture? The master server of HDFS is NameNode and the slave server is DataNode. Yarn, Spark, etc., which will be discussed later, also have such an architecture. This one-master, multiple-slave server architecture is also the architectural solution for most big data systems.

Reusable architectural solutions are called architectural patterns. One master and multiple slaves can be said to be the most important architectural pattern in the field of big data. There is only one master server, which controls the overall situation; there are many slave servers, which are responsible for specific things. In this way, many servers can be effectively organized to present a unified and powerful computing capability to the outside world.

At this point, we have an intuitive understanding of the startup and operation mechanism of MapReduce. So what is the specific job startup and calculation process? Draw a picture based on what is said above. You can look at it step by step and feel the whole process.

If we think of this calculation process as a small journey, the journey can be summarized as follows:

1. The application process JobClient stores user job JAR packages in HDFS. In the future, these JAR packages will be distributed to servers in the Hadoop cluster to perform MapReduce calculations.

2. The application submits the job to JobTracker.

3. JobTracker creates a JobInProcess tree according to the job scheduling policy, and each job will have its own JobInProcess tree.

4.JobInProcess creates a corresponding number of TaskInProcess based on the number of input data fragments (usually the number of data blocks) and the set number of Reduces.

5. The TaskTracker process and the JobTracker process communicate regularly.

6. If TaskTracker has free computing resources (has free CPU cores), JobTracker will assign tasks to it. When assigning tasks, the data block computing tasks on the same machine will be matched to the TaskTracker based on its server name, so that the started computing tasks can exactly process the data on the local machine to achieve the “mobile computing ratio” we mentioned at the beginning. Mobile data is more cost-effective”.

7. After receiving the task, TaskTracker will select the task according to the task type (whether it is Map or Reduce) and task parameters (job JAR package path, input data file path, starting position and offset of the data to be processed in the file, and multiple data blocks). Backup DataNode host name, etc.), start the corresponding Map or Reduce process.

8. After the Map or Reduce process is started, check whether there is a JAR package file for the task to be executed locally. If not, download it from HDFS, and then load the Map or Reduce code to start execution.

9. If it is a Map process, read data from HDFS (usually the data block to be read is stored on the local machine); if it is a Reduce process, write the result data to HDFS.

Through such a computing journey, MapReduce can distribute big data job computing tasks across the entire Hadoop cluster. The data to be processed by each Map computing task can usually be read from the local disk. Do you understand the process more clearly now? You may feel that this process is not too simple!

In fact, all you have to do is write a map function and a reduce function. You don’t need to worry about how these two functions are distributed and started on the cluster, nor do you need to worry about how the data blocks are allocated to computing tasks. . It’s all done by the MapReduce computing framework! Aren’t you very excited? This is also the power of MapReduce that we have repeatedly talked about.

MapReduce data merging and connection mechanism

Where MapReduce calculations really work their magic is in the merging and joining of data.

Let me go back to the WordCount example of the MapReduce programming model in the previous issue. We want to count the number of times the same word appears in all input data, and a Map can only process a part of the data. A popular word will appear in almost all Maps. , which means that the same words must be combined together for statistics to get the correct result.

In fact, almost all big data computing scenarios need to deal with data association issues. Simple ones like WordCount only need to merge Keys. For more complex ones like database join operations, two types of Types (or more types) of data are connected based on Key.

Between the map output and reduce input, the MapReduce computing framework handles data merging and joining operations. This operation has a special vocabulary called shuffle. So what exactly is shuffle? What is the specific process of shuffle? Please see the picture below.

The calculation results of each Map task will be written to the local file system. When the Map task is about to complete the calculation, the MapReduce calculation framework will start the shuffle process and call a Partitioner interface in the Map task process to process each performs Reduce partition selection and then sends it to the corresponding Reduce process through HTTP communication. In this way, no matter which server node the Map is located on, the same Key will be sent to the same Reduce process. The Reduce task process sorts and merges the received , and the same Keys are put together to form a and passed to Reduce for execution.

The key here is which Reduce process the shuffled by the map output is sent to. It is implemented by the Partitioner. The default Partitioner of the MapReduce framework uses the hash value of the Key to modulate the number of Reduce tasks. The same Key will definitely fall into on the same Reduce task ID. From an implementation point of view, such Partitioner code only requires one line.

 /** Use {@link Object#hashCode()} to partition. */
public int getPartition(K2 key, V2 value, int numReduceTasks) {
    return (key.hashCode() & amp; Integer.MAX_VALUE) % numReduceTasks;
 }

Having said so much, to understand shuffle, you only need to remember this:Distributed computing requires merging related data on different servers together for the next step of calculation, which is shuffle.

Shuffle is the most magical place in the big data computing process. Whether it is MapReduce or Spark, as long as it is a big data batch calculation, there will definitely be a shuffle process. Only by correlating the data can the intrinsic relationship of the data be Only then will the value be revealed. If you don’t understand shuffle, you will definitely be confused in map and reduce programming, and you won’t know how to correctly design the output of map and the input of reduce. Shuffle is also the most difficult and most performance-consuming part of the entire MapReduce process. In the early MapReduce code, half of the code was about shuffle processing.

Summary

MapReduce programming is relatively simple, but it is not simple for the MapReduce framework to execute a relatively simple program in parallel on a distributed large-scale server cluster. Understanding the startup and running mechanism of MapReduce jobs, and understanding the role and implementation principles of the shuffle process will be of great help to your understanding of the core principles of big data, and to truly grasp big data and make good use of it.