6.824 map-reduce

I recently joined the company, so I have some free time to write something and my own thoughts. The level is relatively good, and we will make progress together with everyone.
In line with the principle of grasping the main contradiction, this article does not involve tutorial content such as environment configuration. I believe that you may not have time to polish and improve after work. Here I will only write about the design and important implementation.
Here is a summary of hadoop design, which I personally think clearly describes the architecture and principles of map-reduce in concise language:

1. Restatement of the problem

This lab implements a map-reduce (hereinafter referred to as mr) framework.
The main work contents of an MR operation are as follows:

Work nodes can be divided into two categories, master and worker. Among them, the master is responsible for work scheduling, and the worker is responsible for the actual execution of map operations and reduce operations. An mr batch can be divided into M map tasks and N reduce tasks. These tasks are sent to the worker in a push/pull manner for the worker to perform Real task execution.

  1. Split the input file into M equal parts. The master generates M map tasks and hands them to the workers for processing. Each map task reads the split file, maps the file content into several k-v pairs according to the function passed by the user, and divides the k-v pairs evenly into N intermediate files (can be based on the hash value). These N files all exist A total of M*N intermediate files are generated locally on the worker.
  2. After all map tasks are completed, the master can start scheduling reduce tasks, generating a total of N reduce tasks. The reduce task needs to read part of the intermediate file generated by each map, as shown in the figure below

This process of integrating reads is called shuffle. In a real distributed environment, the shuffle process requires multiple remote RPC calls, which consumes resources.
Let’s explain the original intention of shuffle through an example again: different map tasks process different files after splitting. For example, mapper_1 processes file_1, mapper_2 processes file_2, and different files are cut horizontally, resulting in file_1 and file_2. Containing the same keys, these same keys should be processed by the same reducer. However, the intermediate files generated by mapper_1 and mapper_2 are distributed on different machines, which requires the reducer to fetch the content corresponding to the same key on each machine. The whole process is shuffle. At the same time, it should be noted that the key mentioned in the above example does not only refer to a single key, but also refers to a collection of keys, for example, their hash values have the same characteristics, etc. It is impossible for us to allocate a reducer for each key to process. In more cases, a reducer processes a batch of keys. For example, reducer_1 in the picture above processes the green part of the key set.
3. On each reducer, the shuffle results are merged, handed over to the user’s reduce function for processing, and finally output to the file system.

2. Design ideas

General idea

This lab adopts a master-worker architecture. The master is responsible for scheduling tasks and the workers are responsible for executing tasks.
There are two types of tasks (hereinafter called tasks), map-task and reduce-task.
The worker pull task method is adopted, that is, the worker continuously polls. If there is an idle task on the master side, it is assigned to the worker, otherwise it keeps polling.

worker design

  1. The main thread starts the timer and starts polling
  2. Send a GetTask request to the master to obtain the tasks currently available on the master. At the same time, it also serves as a heartbeat and has a keep-alive function.
    2.1 If it is a map-task, read the corresponding file and run the map function passed in by the user to obtain several kv pairs. According to hash(key)%N, write the output to N local files. The m-th map-task generates a total of N intermediate files, namely file-m-1, file-m-2, …, file-m-N.
    2.2 If it is a reduce-task, shuffle is performed first. The nth reduce-task reads file-1-n, file-2-n,…, file-M-n files at once and merges them. Then sort by key. Then run the reduce function passed in by user, get the output, and write it to the file output-n. There are N reduce-tasks in total, so a total of N output files output-1~output-N are generated.
    2.3 After GetTask gets the result, map-task/reduce-task should be run asynchronously in order not to block the heartbeat.
    2.4 If GetTask gets the message that the task has been completed, it will directly exit the polling and end the work.
  3. If the map-task/reduce-task task is completed, a FinishTask request is sent to the master, and the asynchronous worker thread ends.
  4. In order to prevent the running map-task/reduce-task from becoming invalid when the worker hangs up, fault tolerance needs to be considered. The design is as follows: the worker reports heartbeats at the same time during scheduled polling. The way to report heartbeats is to carry the current running status in the GetTask request. tasks, so that the master can know that these tasks are currently being run by workers. If a task does not update its status to the master for a long time (the master will maintain a timer to regularly check whether these tasks have expired, as detailed in the next section), the master will think that the worker has expired and reschedule the task.

master design

  1. There are several data structures that need to be maintained in the master, mainly including:
    1.1 The array of map-task, including the status of each map-task (IDLE, RUNNING, FINISH), the location corresponding to each map-task (the storage location of the file after splitting), and the expiration time of each map-task (worker fault-tolerant design)
    1.2 Array of reduce-task, including the status of each reduce-task (IDLE, RUNNING, FINISH), and the location set corresponding to each reduce-task (file-1-n, file-2-n,…, file-M-n ), the expiration time of each reduce-task (worker fault-tolerant design)
    1.3 lock, control shared variable concurrency
  2. After the master starts, it first generates M map-tasks for workers to consume, while waiting for the worker’s consumption results.
  3. After receiving M FinishTasks, N reduce-tasks are generated for workers to consume while waiting for the worker’s consumption results.
  4. After receiving N FinishTasks, change the Done flag to true, and the worker will automatically exit when it polls that Done is true.
  5. In order to prevent workers from hanging up, fault-tolerant design is required. A background thread needs to be maintained on the master side to regularly poll the expiration time (last_modified_time) of each RUNNING map-task/reduce-task. After using time.now-last_modified_time, determine whether it is greater than the threshold (this threshold is set in advance OK), if it is greater than the threshold, it means that it has timed out, and the status of the map-task/reduce-task will be changed to IDLE, waiting to be rescheduled.
  6. In the master-worker architecture, there is no better fault tolerance method if the master hangs up, because the master must be completely reliable. The state of the master can be maintained by persisting the state to DB/multi-node raft, which will not be extended here.

3. Important implementation

Worker’s scheduled polling code

 ticker := time.NewTicker(500 * time.Millisecond)

for {<!-- -->
select {<!-- -->
case <-ticker.C:
// CallExample()
ret := w.getTask()
if ret != nil {<!-- -->
if ret.Finished {<!-- -->
return
} else {<!-- -->
if ret.Type == MAP {<!-- -->
w.lock.Lock()
w.runningTasks[fmt.Sprintf("%s-%d", MAP, ret.Index)] = true
w.lock.Unlock()
go w.handleMap(w.mapf, ret.MapLocation, ret.Index) // async
} else if ret.Type == REDUCE {<!-- -->
w.lock.Lock()
w.runningTasks[fmt.Sprintf("%s-%d", REDUCE, ret.Index)] = true
w.lock.Unlock()
go w.handleReduce(w.reducef, ret.ReduceLocations, ret.Index) // async
}
}
}
}
}

It should be noted that I use an asynchronous thread execution method for the execution of each map-task and reduce-task. The problem that may occur with this method is that if a certain task itself hangs (not the worker hangs ), which may result in it not being able to be run again. But is this really a framework problem? I’m skeptical. If a goroutine hangs up, the only possibility is that an error occurred when the user’s code was running. This error will also be an error when running again, so I don’t think you need to consider this situation (or you can add a retry times, but it is a bit troublesome and does not affect the main implementation, so I won’t do it)

Master’s scheduling code

func (c *Coordinator) Schedule(args *TaskArgs, reply *TaskReply) error {<!-- -->
c.lock.Lock()
defer c.lock.Unlock()

if c.Finished {<!-- -->
reply.Finished = true
return nil
}

// deal with heartBeat
log.Printf("[heartBeat] running tasks are:%v\
", args.RunningTasks)
for i := 0; i < len(args.RunningTasks); i + + {<!-- -->
info := strings.Split(args.RunningTasks[i], "-")
taskType := info[0]
taskIndex, _ := strconv.Atoi(info[1])
if taskType == MAP {<!-- -->
c.MapTasks[taskIndex].LastModifiedTime = time.Now()
} else if taskType == REDUCE {<!-- -->
c.ReduceTasks[taskIndex].LastModifiedTime = time.Now()
}
}
// schedule map-task
for i := 0; i < len(c.MapTasks); i + + {<!-- -->
if c.MapTasks[i].State == IDLE {<!-- -->
reply.Index = i
reply.MapLocation = c.MapTasks[i].MapLocation
reply.Type = MAP
c.MapTasks[i].State = INPROGRESS
c.MapTasks[i].LastModifiedTime = time.Now()
log.Printf("[schedule] mapTask-%d\
", i)
return nil
}
}
// schedule reduce-task
for i := 0; i < len(c.ReduceTasks); i + + {<!-- -->
if c.ReduceTasks[i].State == IDLE {<!-- -->
reply.Index = i
reply.ReduceLocations = c.ReduceTasks[i].ReduceLocations
reply.Type = REDUCE
c.ReduceTasks[i].State = INPROGRESS
c.ReduceTasks[i].LastModifiedTime = time.Now()
log.Printf("[schedule] reduceTask-%d\
", i)
return nil
}
}
return nil
}

Here I adopt a more crude method. Every time the worker polls the master to getTask, the master will lock it and traverse the arrays of map-task and reduce-task to find the appropriate task and return it to the worker. At the same time, the master will also update the expiration time of each RUNNING task based on the status of the task reported by the worker.

4. Results