Contents

MIT 6.824 2020 (1) lab1 MapReduce实现笔记

MIT 6.824 2020 (1) lab1 MapReduce实现笔记

花了几天时间做了这个lab, 算是入了分布式的门。一开始没啥头绪,但想明白Master需要维护哪些元信息,Master和Worker何时需要进行的rpc通信这些基本问题后,思路就渐渐明朗。得益于Go语言的简洁、方便的gorountine、GC、自带的RPC框架(虽然不知道为什么*Arg和*Reply不能传nil指针,传nil指针会导致RPC调用的结果有问题,关键是不提示有问题),实现起来也非常轻松,几乎没有心智负担。

以下是我的一些笔记,这门课不允许在网上公开代码,但是为了便于描述,我还是将一部分核心代码贴了出来。

Master和Worker的定义

命名方式随意了点:)

看变量名大概就能知道什么意思,需要补充的是,TaskStatus的Start表示未被分配,Processing表示有worker正在处理,而Done表示该任务已经完成。

另外,Master会维持一个全局的global_phase变量,用来指示现在所处的阶段,lifetime是 MapPhase -> ReducePhase -> CompletedPhase。并通过维护done_cnt计数器来知晓现在完成的Map/Reduce Task的数量,在全部完成时,进行Phase的转换。

type Phase int8
type TaskType int8
type TaskStatus int8

// global phase
const (
	MapPhase Phase = iota
	ReducePhase
	CompletedPhase
)

// TaskType
const (
	Map TaskType = iota
	Reduce
	Wait
	Exit
)

// TaskStatus
const (
	Start TaskStatus = iota
	Processing
	Done
)

type MapTask struct {
	// machine_id int
	NReduce   int
	TaskId    int
	InputFile string
	Status    TaskStatus
	// iter_files []string
	Time time.Time
}

type ReduceTask struct {
	TaskId     int
	InputFiles []string
	Status     TaskStatus
	Time       time.Time
}

type Task struct {
	Task_type   TaskType
	Map_task    *MapTask
	Reduce_task *ReduceTask
}

type Master struct {
	// lifetime: MapPhase -> ReducePhase -> CompletedPhase
	global_phase Phase


	// how many task has completed correctly
	done_cnt int

	mutex sync.Mutex

	// map task num & reduce task num
	map_task_num int
	n_reduce int
	
	// map task array
	map_tasks    []MapTask

	// reduce task array
	reduce_tasks []ReduceTask
}

Master实现

Master主要就是被动接收一些rpc调用,维护状态,最核心的是GetTask rpc调用,worker调用这个来获取Task。

逻辑也很简单,看代码就行:

func (m *Master) GetTask(args *ExampleArgs, task *Task) error {
	m.mutex.Lock()
	defer m.mutex.Unlock()
	switch m.global_phase {
	case MapPhase:
		for i := range m.map_tasks {
			map_task := &m.map_tasks[i]
			if map_task.Status == Start {
				// if there is available task to be done
				map_task.Status = Processing
				map_task.Time = time.Now()
				task.Task_type = Map
				task.Map_task = map_task
				// fmt.Printf("%+v \n", task.Map_task)
				return nil
			}
		}
		// if all tasks are done or are processing , just let it wait
		task.Task_type = Wait
		return nil
	case ReducePhase:
		// fmt.Printf("%+v ", m.reduce_tasks)
		for i := range m.reduce_tasks {
			reduce_task := &m.reduce_tasks[i]
			if reduce_task.Status == Start {
				reduce_task.Status = Processing
				task.Task_type = Reduce
				task.Reduce_task = reduce_task
				// fmt.Printf("%+v ", task.Reduce_task)
				return nil
			}
		}
		task.Task_type = Wait
		return nil
	case CompletedPhase:
		task.Task_type = Exit
		return nil
	}
	panic("Unreachable")
	return nil
}

另外TaskDone(更新done_cnt, 判断是否需要进行global_phase的阶段切换)、UpdateTime(接收心跳信息,更新对应task的Time字段)。

值得注意的是,这里我在设计上,将心跳信号与任务进行了绑定(而不是worker),且只有处于Processing阶段的任务才有心跳。语义上可以理解为属于正在被处理的任务的心跳。检测心跳的代码也贴出来吧,这里我设置的每5s检测一次,判定超时时间也为5s(worker每2s发送一次心跳,所以这里允许有3s的延迟)。当然,这些参数都是我随意设的。

go func() {
   for {
      time.Sleep(5 * time.Second)
      m.mutex.Lock()
      if m.global_phase == MapPhase {
         for i := range m.map_tasks {
            map_task := &m.map_tasks[i]
            // fmt.Printf("time check:%d %d %s\n", i, map_task.Status, time.Now().Sub(map_task.Time).String())
            if map_task.Status == Processing && time.Now().Sub(map_task.Time) > 5*time.Second {
               // if timeout, reset status to Start
               map_task.Status = Start
            }
         }
      } else if m.global_phase == ReducePhase {
         for i := range m.reduce_tasks {
            reduce_task := &m.reduce_tasks[i]
            if reduce_task.Status == Processing && time.Now().Sub(reduce_task.Time) > 5*time.Second {
               reduce_task.Status = Start
            }
         }
      } else {
         m.mutex.Unlock()
         return
      }
      m.mutex.Unlock()
   }
}()

Worker主循环

Worker的主循环如下,其中fetch task是用rpc从master获取任务task, 并根据task的类型进行分类处理。

//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	// main loop
	for {
		task := fetch_task()
		// fmt.Printf("%+v \n", task)
		switch task.Task_type {
		case Map:
			// start send heartbeat
			quit := send_heart_beat(task.Map_task.TaskId, Map)
			// do map work
			err := handle_map_task(task.Map_task, mapf)
			if err == nil {
				mes := DoneMessage{
					task.Map_task.TaskId,
					Map,
				}
				call("Master.TaskDone", mes, &ExampleReply{})
				fmt.Printf("Map %d done by %d\n", task.Map_task.TaskId, os.Getpid())
			}
			// stop send heartbeat
			quit <- true
		case Reduce:
			quit := send_heart_beat(task.Reduce_task.TaskId, Reduce)
			// do reduce work
			err := handle_reduce_task(task.Reduce_task, reducef)
			if err == nil {
				mes := DoneMessage{
					task.Reduce_task.TaskId,
					Reduce,
				}
				call("Master.TaskDone", mes, &ExampleReply{})
				fmt.Printf("Reduce %d done by %d\n", task.Reduce_task.TaskId, os.Getpid())
			}
			quit <- true
		case Wait:
			time.Sleep(1 * time.Second)
		case Exit:
			return
		}
	}
}

对于Map和Reduce这两种工作,在接收到任务时,需要开始向master提供心跳,我具体设计的是2s发送一次;在Map/Reduce任务完成或者异常退出时,需要终止心跳的发送。我是将发送心跳的过程用goroutine来实现的,通过channel来判断什么时候退出。具体函数如下:

func send_heart_beat(task_id int, task_type TaskType) chan<- bool {
	quit := make(chan bool)
	go func() {
		for {
			select {
			case <-quit:
				return
			default:
				// Do other stuff
				mes := HeartBeatMessage{task_id, task_type}
				reply := ExampleReply{}
				call("Master.UpdateTime", &mes, &reply)
				time.Sleep(2 * time.Second)
			}
		}
	}()
	return quit
}

另外这里除了Map和Reduce两种主要的task类型,还有Sleep(在MapPhase/ReducePhase阶段,没有空闲任务时,worker会fetch到wait类型的任务,相当于等待其他正在进行的Map/Reduce任务完成)。而在CompletedPhase阶段,即所有的工作都完成后,worker会从master fetch到一个Exit类型的task,直接退出进程。

用到的资料

http://nil.csail.mit.edu/6.824/2020/labs/lab-mr.html

一篇教会你写90%的shell脚本 - 知乎

MapReduce论文翻译