实验项目由纯 Go 语言开发。代码框架已经搭好并提供了各种条件的测试用例来模仿分布式场景下的意外事件。最终目的是在指定修改的地方 coding 并通过测试用例,同时需要关注时间消耗,一定程度上性能做到最优。

熟悉项目

master_rpc.go 里面启动 master 提供 RPC 服务用来和 workers 之间用通信。每个 worker 都会在 master.go 中的 Register() 的注册服务。

MapReduce 会从 master.go 中的 run() 启动。这个方法的签名为

1
2
3
4
5
6
7
func (mr *Master) run(
jobName string,
files []string,
nreduce int,
schedule func(phase jobPhase),
finish func(),
) {}

整个实验分成了 sequential 和 parallel 两种模式。最大的区别在于调度函数的实现方式,实验在sequential 已经实现了串行的调度,而后面将要自己实现并行的调度器,也就是 run 方法中的 schedule 需要自己写更快的。

Part I: Map/Reduce input and output

MapReduce 的基本输入输出

  1. 给 map task 的输出进行分片
    doMap 方法中的 mapF 会产生一个 key-value 数组。需要做的就是把这个数组按照里面 key 转换成比较均匀的的 nReduce 个由 key-value 组成的小数组并写到文件当中。这里有个要点,如果采用直接按顺序划分。比如说如果数组的长度是 m,第一组是 0..m/nReduce-1, 第二组是 m/nReduce..m/nReduce*2-1 直到划分完。这样做有个问题就是需要把相同 key 放到一组便于后面的 Reduce 操作。所以目前看来做 Hash 然后 mod nReduce 这样既可以比较均匀划分文件,同时又可以让相同 key 的在一个文件里面。

  2. 给 reduce task 组装所有的输入
    读取 1 产生的中间文件。相同的 key 可以把对应的 value 组合起来产生多个 key-values 的数据结构。随后对这样的数据结构进行按照 key 排序并写到 reduce 的输出文件当中。

通过 1,2 可以确定最终会产生 nMap * nReduce 个中间文件。

Part II: Single-worker word count

单机版的 word count

相对于 PartI 其中的 mapF 和 reduceF 需要按照当前用户逻辑来实现。mapF 需要做的就是把出现过的单词都转换成 word-count 这样的 key-value 结构。reduceF 只需要对出现过的内容做一个获取长度就结束了。

Part III: Distributing MapReduce tasks

分布式版的 word count

在完成了 Part I 和 Part II 之后应该就对单机版的 word count 如何实现比较清晰了。在分布式场景下,workers 到 master 之间会用 rpc 的方式通信。 workers 会充分利用计算机的多核实现并发。按照要求是需要实现调度器 schedule.go 中的 schedule() 方法。按照 sequential 的模式,调度器会被传入当前的阶段是 map 还是 reduce,并且只会被传入一次。按照题目前面给的要求,输入文件的个数决定了 map worker 的个数。随后 map worker 执行的同时,reduce worker 也在同时执行。那么其实就是改造成了用 goroutine 来 rpc 调用对应的用户方法, 并用 waitGroup 来记录完成情况。

Part IV: Handling worker failures

容错
测试代码会让部分 worker 出现失效,这个失效可以理解为不论当前工作完成多少,即使是完成了也有可能返回 false。这也就意味着可能出现多个 workers 做了同样的任务。所以原则上,需要保证任务 idempotent,从函数式的角度而言,本身作为一个函数就应该是相同输入相同输出的状态。为实现容错,还需要实现重试机制。在多个 goroutine 同时跑的时候,最简单的方法实现重试就是附加一个 retryChan 用来存放记录重试任务。

源码