MIT 6.824 2020 (1) lab1 MapReduce实现笔记
MIT 6.824 2020 (1) lab1 MapReduce实现笔记
花了几天时间做了这个lab, 算是入了分布式的门。一开始没啥头绪,但想明白Master需要维护哪些元信息,Master和Worker何时需要进行的rpc通信这些基本问题后,思路就渐渐明朗。得益于Go语言的简洁、方便的gorountine、GC、自带的RPC框架(虽然不知道为什么*Arg和*Reply不能传nil指针,传nil指针会导致RPC调用的结果有问题,关键是不提示有问题),实现起来也非常轻松,几乎没有心智负担。
以下是我的一些笔记,这门课不允许在网上公开代码,但是为了便于描述,我还是将一部分核心代码贴了出来。
Master和Worker的定义
命名方式随意了点:)
看变量名大概就能知道什么意思,需要补充的是,TaskStatus的Start表示未被分配,Processing表示有worker正在处理,而Done表示该任务已经完成。
另外,Master会维持一个全局的global_phase变量,用来指示现在所处的阶段,lifetime是 MapPhase -> ReducePhase -> CompletedPhase。并通过维护done_cnt计数器来知晓现在完成的Map/Reduce Task的数量,在全部完成时,进行Phase的转换。
type Phase int8
type TaskType int8
type TaskStatus int8
// global phase
const (
MapPhase Phase = iota
ReducePhase
CompletedPhase
)
// TaskType
const (
Map TaskType = iota
Reduce
Wait
Exit
)
// TaskStatus
const (
Start TaskStatus = iota
Processing
Done
)
type MapTask struct {
// machine_id int
NReduce int
TaskId int
InputFile string
Status TaskStatus
// iter_files []string
Time time.Time
}
type ReduceTask struct {
TaskId int
InputFiles []string
Status TaskStatus
Time time.Time
}
type Task struct {
Task_type TaskType
Map_task *MapTask
Reduce_task *ReduceTask
}
type Master struct {
// lifetime: MapPhase -> ReducePhase -> CompletedPhase
global_phase Phase
// how many task has completed correctly
done_cnt int
mutex sync.Mutex
// map task num & reduce task num
map_task_num int
n_reduce int
// map task array
map_tasks []MapTask
// reduce task array
reduce_tasks []ReduceTask
}
Master实现
Master主要就是被动接收一些rpc调用,维护状态,最核心的是GetTask rpc调用,worker调用这个来获取Task。
逻辑也很简单,看代码就行:
func (m *Master) GetTask(args *ExampleArgs, task *Task) error {
m.mutex.Lock()
defer m.mutex.Unlock()
switch m.global_phase {
case MapPhase:
for i := range m.map_tasks {
map_task := &m.map_tasks[i]
if map_task.Status == Start {
// if there is available task to be done
map_task.Status = Processing
map_task.Time = time.Now()
task.Task_type = Map
task.Map_task = map_task
// fmt.Printf("%+v \n", task.Map_task)
return nil
}
}
// if all tasks are done or are processing , just let it wait
task.Task_type = Wait
return nil
case ReducePhase:
// fmt.Printf("%+v ", m.reduce_tasks)
for i := range m.reduce_tasks {
reduce_task := &m.reduce_tasks[i]
if reduce_task.Status == Start {
reduce_task.Status = Processing
task.Task_type = Reduce
task.Reduce_task = reduce_task
// fmt.Printf("%+v ", task.Reduce_task)
return nil
}
}
task.Task_type = Wait
return nil
case CompletedPhase:
task.Task_type = Exit
return nil
}
panic("Unreachable")
return nil
}
另外TaskDone(更新done_cnt, 判断是否需要进行global_phase的阶段切换)、UpdateTime(接收心跳信息,更新对应task的Time字段)。
值得注意的是,这里我在设计上,将心跳信号与任务进行了绑定(而不是worker),且只有处于Processing阶段的任务才有心跳。语义上可以理解为属于正在被处理的任务的心跳。检测心跳的代码也贴出来吧,这里我设置的每5s检测一次,判定超时时间也为5s(worker每2s发送一次心跳,所以这里允许有3s的延迟)。当然,这些参数都是我随意设的。
go func() {
for {
time.Sleep(5 * time.Second)
m.mutex.Lock()
if m.global_phase == MapPhase {
for i := range m.map_tasks {
map_task := &m.map_tasks[i]
// fmt.Printf("time check:%d %d %s\n", i, map_task.Status, time.Now().Sub(map_task.Time).String())
if map_task.Status == Processing && time.Now().Sub(map_task.Time) > 5*time.Second {
// if timeout, reset status to Start
map_task.Status = Start
}
}
} else if m.global_phase == ReducePhase {
for i := range m.reduce_tasks {
reduce_task := &m.reduce_tasks[i]
if reduce_task.Status == Processing && time.Now().Sub(reduce_task.Time) > 5*time.Second {
reduce_task.Status = Start
}
}
} else {
m.mutex.Unlock()
return
}
m.mutex.Unlock()
}
}()
Worker主循环
Worker的主循环如下,其中fetch task是用rpc从master获取任务task, 并根据task的类型进行分类处理。
//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
// main loop
for {
task := fetch_task()
// fmt.Printf("%+v \n", task)
switch task.Task_type {
case Map:
// start send heartbeat
quit := send_heart_beat(task.Map_task.TaskId, Map)
// do map work
err := handle_map_task(task.Map_task, mapf)
if err == nil {
mes := DoneMessage{
task.Map_task.TaskId,
Map,
}
call("Master.TaskDone", mes, &ExampleReply{})
fmt.Printf("Map %d done by %d\n", task.Map_task.TaskId, os.Getpid())
}
// stop send heartbeat
quit <- true
case Reduce:
quit := send_heart_beat(task.Reduce_task.TaskId, Reduce)
// do reduce work
err := handle_reduce_task(task.Reduce_task, reducef)
if err == nil {
mes := DoneMessage{
task.Reduce_task.TaskId,
Reduce,
}
call("Master.TaskDone", mes, &ExampleReply{})
fmt.Printf("Reduce %d done by %d\n", task.Reduce_task.TaskId, os.Getpid())
}
quit <- true
case Wait:
time.Sleep(1 * time.Second)
case Exit:
return
}
}
}
对于Map和Reduce这两种工作,在接收到任务时,需要开始向master提供心跳,我具体设计的是2s发送一次;在Map/Reduce任务完成或者异常退出时,需要终止心跳的发送。我是将发送心跳的过程用goroutine来实现的,通过channel来判断什么时候退出。具体函数如下:
func send_heart_beat(task_id int, task_type TaskType) chan<- bool {
quit := make(chan bool)
go func() {
for {
select {
case <-quit:
return
default:
// Do other stuff
mes := HeartBeatMessage{task_id, task_type}
reply := ExampleReply{}
call("Master.UpdateTime", &mes, &reply)
time.Sleep(2 * time.Second)
}
}
}()
return quit
}
另外这里除了Map和Reduce两种主要的task类型,还有Sleep(在MapPhase/ReducePhase阶段,没有空闲任务时,worker会fetch到wait类型的任务,相当于等待其他正在进行的Map/Reduce任务完成)。而在CompletedPhase阶段,即所有的工作都完成后,worker会从master fetch到一个Exit类型的task,直接退出进程。