6.824 Lab1 MapReduce

我要通过写博客逼自己做完这些 lab. 课程说不要泄露答案,所以我会把答案做个 base64 编码,算是防君子吧。

Lab 1 (这篇记录的是 2018 版本的):
http://nil.csail.mit.edu/6.824/2018/labs/lab-1.html

Lab 1 (2020 版):
https://pdos.csail.mit.edu/6.824/labs/lab-mr.html

MapReduce Paper:
http://research.google.com/archive/mapreduce-osdi04.pdf

MapReduce

划论文的重点:
为了方便分布式处理大量数据,有了 Map + Reduce 这种模式。
Map 对输入的数据做类似分类的操作,输入 key, value. Reduce 对 Map 输出的 '' 进行聚合,做进一步的输出。
中间的基础设施(MR)会对数据源进行分割,比如分成 nMap 份(每一份成为 split),然后会调用 n 个 Mapper,分别对这些输入进行处理。Mapper 会将处理后的所有 Key, value 按照 key 进行排序(称作 shuffle),然后分发到 Reducer 中,reducer 处理后得到最终输出。

MR 的架构上,由一个 Master 进行任务调度的工作,有多个节点执行 Map/Reduce 任务。节点可能会出现失效的情况,这个时候 Master 要重新在健康节点上调度一个相同的任务。

在有些情况下,某些机器可能会意外地特别慢,因此可能会启动一些冗余地 backup task. 例如,可能会对同一个 split 启动两个 mapper,这两个 mapper 处理同样的数据,如果谁先执行完就用谁的输出作为结果,另一个 task 直接 Kill 掉。

在实际应用中,例如 Hadoop 中,会考虑到数据本地性(Data Locality)的问题,尽可能在数据所在的节点启动相应的 Mapper, 这样可以减少数据的网络传输。

Lab 实现

安装 Go

$ wget -qO- https://dl.google.com/go/go1.13.6.linux-amd64.tar.gz | sudo tar xz -C /usr/local

配置 PATH, 在 ~/.bashrc 加上:

export PATH=$PATH:$(go env GOPATH)/bin
export GOPATH=$(go env GOPATH)

Golang 的官网上有交互式的入门教程,走一遍能了解个大概。

完成 Sequential MR

第一部分要实现 common_map.go 里的 doMap() 和 common_reduce.go 里的 doReduce().

Master 会在节点调用 doMap() 作为 worker,doMap()会读取相应的输入数据,调用开发人员写的 Mapper 函数 mapF()对输入进行处理,将得到的 KVs 输出,按照 key 进行 hash, 分别写到 nReduce 个中间文件中。

doReduce() 则会读取该 Reducer 相应的中间文件。因为每个 Map worker 会输出 nReduce 个中间文件,所以 doReduce() 会读取 nMap 个中间文件。读到这些中间文件后,要把这些数据按照 key 排序,每一个 key 调用一个 reduceF(), 将这个 key 对应的所有内容传到 reduceF() 中。

Go 的 Defer statement 可以在 Defer 所在的函数结束后执行相应指令,所以可以在打开文件后,在后面加一句 defer 关闭文件,可以防止忘记关闭。
https://tour.golang.org/flowcontrol/12

func doMap(
        jobName string, // the name of the MapReduce job
        mapTask int, // which map task this is
        inFile string,
        nReduce int, // the number of reduce task that will be run ("R" in the paper)
        mapF func(filename string, contents string) []KeyValue,
) {
// 下面经过 base64 转码
ICAgICAgICBjb250ZW50LCBlcnIgOj0gaW91dGlsLlJlYWRGaWxlKGluRmlsZSkKICAgICAgICBj
aGVja19lcnJvcihlcnIpCgogICAgICAgIG91dGZpbGVzIDo9IG1ha2UoW10qb3MuRmlsZSwgblJl
ZHVjZSkKICAgICAgICBlbmNvZGVycyA6PSBtYWtlKFtdKmpzb24uRW5jb2RlciwgblJlZHVjZSkK
ICAgICAgICBmbXQuUHJpbnRmKCJuUmVkdWNlOiAldlxuIiwgblJlZHVjZSkKICAgICAgICBmb3Ig
aSA6PSAwOyBpIDwgblJlZHVjZTsgaSsrIHsKICAgICAgICAgICAgICAgIG91dGZpbGVzW2ldLCBl
cnIgPSBvcy5DcmVhdGUocmVkdWNlTmFtZShqb2JOYW1lLCBtYXBUYXNrLCBpKSkKICAgICAgICAg
ICAgICAgIGNoZWNrX2Vycm9yKGVycikKICAgICAgICAgICAgICAgIGRlZmVyIG91dGZpbGVzW2ld
LkNsb3NlKCkKICAgICAgICAgICAgICAgIGZtdC5QcmludGxuKCJjcmVhdGVkICIgKyAocmVkdWNl
TmFtZShqb2JOYW1lLCBtYXBUYXNrLCBpKSkpCiAgICAgICAgICAgICAgICBlbmNvZGVyc1tpXSA9
IGpzb24uTmV3RW5jb2RlcihvdXRmaWxlc1tpXSkKICAgICAgICB9CgogICAgICAgIGt2cyA6PSBt
YXBGKGluRmlsZSwgc3RyaW5nKGNvbnRlbnQpKQoKICAgICAgICB2YXIgciBpbnQKICAgICAgICBm
b3IgXywga3YgOj0gcmFuZ2Uga3ZzIHsKICAgICAgICAgICAgICAgIHIgPSBpaGFzaChrdi5LZXkp
ICUgblJlZHVjZQogICAgICAgICAgICAgICAgZXJyID0gZW5jb2RlcnNbcl0uRW5jb2RlKCZrdikK
ICAgICAgICB9Cg==
}
func doReduce(
        jobName string, // the name of the whole MapReduce job
        reduceTask int, // which reduce task this is
        outFile string, // write the output here
        nMap int, // the number of map tasks that were run ("M" in the paper)
        reduceF func(key string, values []string) string,
) {
ICAgICAgICBrZXlzIDo9IG1ha2UoW11zdHJpbmcsIDApCiAgICAgICAgY29udGVudHMgOj0gbWFr
ZShtYXBbc3RyaW5nXVtdc3RyaW5nKQogICAgICAgIHZhciBrdiBLZXlWYWx1ZQoKICAgICAgICAv
LyBSZWFkIGFsbCBpbnRlcm1lZGlhdGUgZmlsZXMgZm9yIHRoaXMgcmVkdWNlVGFzaywgcHV0IGRh
dGEgaW50byBrZXlzIGFuZCBjb250ZW50cy4KICAgICAgICBmb3IgaSA6PSAwOyBpIDwgbk1hcDsg
aSsrIHsKICAgICAgICAgICAgICAgIGYsIGVyciA6PSBvcy5PcGVuKHJlZHVjZU5hbWUoam9iTmFt
ZSwgaSwgcmVkdWNlVGFzaykpCiAgICAgICAgICAgICAgICBjaGVja19lcnJvcihlcnIpCiAgICAg
ICAgICAgICAgICBkZWMgOj0ganNvbi5OZXdEZWNvZGVyKGYpCiAgICAgICAgICAgICAgICBmb3Ig
ZXJyID09IG5pbCB7CiAgICAgICAgICAgICAgICAgICAgICAgIGVyciA9IGRlYy5EZWNvZGUoJmt2
KQogICAgICAgICAgICAgICAgICAgICAgICBfLCBrZXlFeGlzdCA6PSBjb250ZW50c1trdi5LZXld
CiAgICAgICAgICAgICAgICAgICAgICAgIGlmICFrZXlFeGlzdCB7CiAgICAgICAgICAgICAgICAg
ICAgICAgICAgICAgICAga2V5cyA9IGFwcGVuZChrZXlzLCBrdi5LZXkpCiAgICAgICAgICAgICAg
ICAgICAgICAgIH0KICAgICAgICAgICAgICAgICAgICAgICAgY29udGVudHNba3YuS2V5XSA9IGFw
cGVuZChjb250ZW50c1trdi5LZXldLCBrdi5WYWx1ZSkKICAgICAgICAgICAgICAgIH0KICAgICAg
ICAgICAgICAgIGYuQ2xvc2UoKQogICAgICAgIH0KCiAgICAgICAgLy8gU29ydCB0aGUga2V5cwog
ICAgICAgIHNvcnQuU3RyaW5ncyhrZXlzKQoKICAgICAgICAvLyBQZXJmb3JtIHRoZSByZWR1Y2Ug
ZnVuY3Rpb24gb24gZWFjaCBrZXlzLgogICAgICAgIG91dGYsIGVyciA6PSBvcy5DcmVhdGUob3V0
RmlsZSkKICAgICAgICBjaGVja19lcnJvcihlcnIpCiAgICAgICAgZGVmZXIgb3V0Zi5DbG9zZSgp
CiAgICAgICAgZW5jIDo9IGpzb24uTmV3RW5jb2RlcihvdXRmKQogICAgICAgIGZvciBfLCBrIDo9
IHJhbmdlIGtleXMgewogICAgICAgICAgICAgICAgcmVzdWx0IDo9IHJlZHVjZUYoaywgY29udGVu
dHNba10pCiAgICAgICAgICAgICAgICBlbmMuRW5jb2RlKEtleVZhbHVle2ssIHJlc3VsdH0pCiAg
ICAgICAgfQo=
}
$ cd 6.824-mr/
$ export "GOPATH=$PWD"
$ cd "$GOPATH/src/mapreduce"
$ go test -run Sequential
PASS
ok      mapreduce       2.145s

写一个 Word Count

这一部分要写 main/wc.go 里的 mapF() 和 reduceF(), 实现一个 Word Count.

mapF() 对输入进行分词,每隔词输出一个 word,1 即可。

func mapF(filename string, contents string) []mapreduce.KeyValue {
ICAgICAgICBmIDo9IGZ1bmMoYyBydW5lKSBib29sIHsKICAgICAgICAgICAgICAgIHJldHVybiAh
dW5pY29kZS5Jc0xldHRlcihjKQogICAgICAgIH0KICAgICAgICB3b3JkcyA6PSBzdHJpbmdzLkZp
ZWxkc0Z1bmMoY29udGVudHMsIGYpCiAgICAgICAgcmVzdWx0IDo9IG1ha2UoW11tYXByZWR1Y2Uu
S2V5VmFsdWUsIDApCgogICAgICAgIGZvciBfLCB3b3JkIDo9IHJhbmdlIHdvcmRzIHsKICAgICAg
ICAgICAgICAgIGt2IDo9IG1hcHJlZHVjZS5LZXlWYWx1ZXt3b3JkLCAiMSJ9CiAgICAgICAgICAg
ICAgICByZXN1bHQgPSBhcHBlbmQocmVzdWx0LCBrdikKICAgICAgICB9CiAgICAgICAgcmV0dXJu
IHJlc3VsdAo=
}

reduceF() 只需要数一下对应的 key 都多少个即可。
func reduceF(key string, values []string) string {
ICAgICAgICAvLyBKdXN0IHJldHVybiB0aGUgc2l6ZSBvZiB0aGUgdmFsdWVzLgogICAgICAgIHJl
dHVybiBzdHJjb252Lkl0b2EobGVuKHZhbHVlcykpCg==
}
$ cd "$GOPATH/src/main"
$ time go run wc.go master sequential pg-*.txt
master: Map/Reduce task completed

分布式执行 MapReduce 任务

这部分要写 schedule.go 的 schedule(),模拟分布式调度 Mapreduce Tasks.

schedule() 可以从 registerChan 中获取 worker 节点的 rpc 信息。 这些节点可能是在 schedule() 调用前或者调用后启动的,所以可以预期 registerChan 会一直来数据。在不考虑 worker 失败的情况下,来一个 worker 就放一个 task 即可。但是 worker 有可能会失败,所以如果一个 task 在一个 worker 上得不到结果,那么可以将它放到下一个 task 之中。

func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
ICAgICAgICB2YXIgbnRhc2tzIGludAogICAgICAgIHZhciBuX290aGVyIGludCAvLyBudW1iZXIg
b2YgaW5wdXRzIChmb3IgcmVkdWNlKSBvciBvdXRwdXRzIChmb3IgbWFwKQogICAgICAgIHN3aXRj
aCBwaGFzZSB7CiAgICAgICAgY2FzZSBtYXBQaGFzZToKICAgICAgICAgICAgICAgIG50YXNrcyA9
IGxlbihtYXBGaWxlcykKICAgICAgICAgICAgICAgIG5fb3RoZXIgPSBuUmVkdWNlCiAgICAgICAg
Y2FzZSByZWR1Y2VQaGFzZToKICAgICAgICAgICAgICAgIG50YXNrcyA9IG5SZWR1Y2UKICAgICAg
ICAgICAgICAgIG5fb3RoZXIgPSBsZW4obWFwRmlsZXMpCiAgICAgICAgfQoKICAgICAgICBmbXQu
UHJpbnRmKCJTY2hlZHVsZTogJXYgJXYgdGFza3MgKCVkIEkvT3MpXG4iLCBudGFza3MsIHBoYXNl
LCBuX290aGVyKQoKICAgICAgICBmbXQuUHJpbnRmKCJERUJVRyAgbWFwRmlsZXMgJXYgXG4iLCBt
YXBGaWxlcykKCiAgICAgICAgLy8gQWxsIG50YXNrcyB0YXNrcyBoYXZlIHRvIGJlIHNjaGVkdWxl
ZCBvbiB3b3JrZXJzLiBPbmNlIGFsbCB0YXNrcwogICAgICAgIC8vIGhhdmUgY29tcGxldGVkIHN1
Y2Nlc3NmdWxseSwgc2NoZWR1bGUoKSBzaG91bGQgcmV0dXJuLgogICAgICAgIC8vCiAgICAgICAg
Ly8gWW91ciBjb2RlIGhlcmUgKFBhcnQgSUlJLCBQYXJ0IElWKS4KICAgICAgICAvLwoKICAgICAg
ICB2YXIgd2cgc3luYy5XYWl0R3JvdXAKICAgICAgICBmb3IgaSA6PSAwOyBpIDwgbnRhc2tzOyBp
KysgewogICAgICAgICAgICAgICAgd2cuQWRkKDEpCiAgICAgICAgICAgICAgICBpIDo9IGkgLy8g
VGhlIGkgdmFyaWFibGUgaW4gdGhlIGZvciBsb29wIGlzIHNoYXJlZCwgbWFrZSBpdCB2YWxpZCBp
bnNpZGUgdGhlIGxvb3AuCiAgICAgICAgICAgICAgICAvLyBmbXQuUHJpbnRmKCJ3b3JrZXI6ICVz
ICAgdGFzazogJXZcbiIsIHdvcmtlciwgaSkKICAgICAgICAgICAgICAgIGdvIGZ1bmMoKSB7CiAg
ICAgICAgICAgICAgICAgICAgICAgIHJlc3VsdCA6PSBmYWxzZQogICAgICAgICAgICAgICAgICAg
ICAgICB2YXIgd29ya2VyIHN0cmluZwogICAgICAgICAgICAgICAgICAgICAgICBmb3IgIXJlc3Vs
dCB7CiAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgd29ya2VyID0gPC1yZWdpc3RlckNo
YW4KICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAvL2ZtdC5QcmludGYoIkRFQlVHICBD
YWxpaW5nIGRvdGFzay4gdGFzaz0ldiAgbnRhc2s9JXYgXG4iLCBpLCBudGFza3MpCiAgICAgICAg
ICAgICAgICAgICAgICAgICAgICAgICAgcmVzdWx0ID0gY2FsbCh3b3JrZXIsICJXb3JrZXIuRG9U
YXNrIiwgRG9UYXNrQXJnc3tqb2JOYW1lLCBtYXBGaWxlc1tpXSwgcGhhc2UsIGksIG5fb3RoZXJ9
LCBuaWwpCiAgICAgICAgICAgICAgICAgICAgICAgIH0KICAgICAgICAgICAgICAgICAgICAgICAg
d2cuRG9uZSgpCiAgICAgICAgICAgICAgICAgICAgICAgIHJlZ2lzdGVyQ2hhbiA8LSB3b3JrZXIg
Ly9GSVhNRTogRm9yIHRoZSBsYXN0IHRhc2sgIzE5LCB0aGVyZSB3b3VsZCBiZSBubyByZWNlaXZl
ciBhcyB0aGUgc2NoZWR1bGUgZnVuY3Rpb24gaXMgcmV0dXJuZWQuCiAgICAgICAgICAgICAgICB9
KCkKICAgICAgICB9CiAgICAgICAgd2cuV2FpdCgpCiAgICAgICAgZm10LlByaW50ZigiU2NoZWR1
bGU6ICV2IGRvbmVcbiIsIHBoYXNlKQo=
}
$ cd $GOPATH/src/mapreduce
$ go test -run TestParallel
$ go test -run Failure

生成反向索引(Inverted index)

这一部分是写一个 MR, 生成反向索引。编辑 main/ii.go
mapF() 把 word 作为 key, 对应的 document 作为 value 输出。 reduceF() 整理去重排序每个 key 对应的所有 document 即可。

func mapF(document string, value string) (res []mapreduce.KeyValue) {

ICAgICAgICBmIDo9IGZ1bmMoYyBydW5lKSBib29sIHsKICAgICAgICAgICAgICAgIHJldHVybiAh
dW5pY29kZS5Jc0xldHRlcihjKQogICAgICAgIH0KICAgICAgICB3b3JkcyA6PSBzdHJpbmdzLkZp
ZWxkc0Z1bmModmFsdWUsIGYpCiAgICAgICAgcmVzdWx0IDo9IG1ha2UoW11tYXByZWR1Y2UuS2V5
VmFsdWUsIDApCgogICAgICAgIGZvciBfLCB3b3JkIDo9IHJhbmdlIHdvcmRzIHsKICAgICAgICAg
ICAgICAgIGt2IDo9IG1hcHJlZHVjZS5LZXlWYWx1ZXt3b3JkLCBkb2N1bWVudH0KICAgICAgICAg
ICAgICAgIHJlc3VsdCA9IGFwcGVuZChyZXN1bHQsIGt2KQogICAgICAgIH0KICAgICAgICByZXR1
cm4gcmVzdWx0Cg==
}

func reduceF(key string, values []string) string {

ICAgICAgICBzcmNBbHJlYWR5RXhpc3QgOj0gbWFrZShtYXBbc3RyaW5nXWJvb2wpCiAgICAgICAg
cmVzdWx0IDo9IG1ha2UoW11zdHJpbmcsIDApCiAgICAgICAgZm9yIF8sIHNyYyA6PSByYW5nZSB2
YWx1ZXMgewogICAgICAgICAgICAgICAgaWYgIXNyY0FscmVhZHlFeGlzdFtzcmNdIHsKICAgICAg
ICAgICAgICAgICAgICAgICAgc3JjQWxyZWFkeUV4aXN0W3NyY10gPSB0cnVlCiAgICAgICAgICAg
ICAgICAgICAgICAgIHJlc3VsdCA9IGFwcGVuZChyZXN1bHQsIHNyYykKICAgICAgICAgICAgICAg
IH0KICAgICAgICB9CiAgICAgICAgc29ydC5TdHJpbmdzKHJlc3VsdCkKICAgICAgICByZXR1cm4g
c3RyY29udi5JdG9hKGxlbihyZXN1bHQpKSArICIgIiArIHN0cmluZ3MuSm9pbihyZXN1bHQsICIs
IikK
}