@author qcliu
@time 2015/07/29
MIT6.824 Lab1
总结。
第一个任务是单机版词频统计。入口为 wc.go
的 main()
。 统计的过程分为4个部分:
将一个大文件切分成若干个小文件。 采用逐行扫描的方式。涉及到
经过Spilt过程, kjv12.txt(file size:4834757)
已经被切分成了5个文件。
mrtmp.kjv12.txt-0 966967 mrtmp.kjv12.txt-1 966941 mrtmp.kjv12.txt-2 966974 mrtmp.kjv12.txt-3 966970 mrtmp.kjv12.txt-4 966905
之后执行DoMap()操作。
for i:=0; i<nMap; i++ { DoMap(i, mr.file, mr.NReduce, Map) }
对每一个子文件进行DoMap()。
在DoMap()中,将子文件的内容读到内存中的bype[]。对bype[]进行Map()操作。
Map()是对传入的string进行单词切分,然后建立一个HashMap,key是单词,value是单词出现的频率。这里可以采用bit方式记录词频,即1111代表4次。
这里Go提供了一个很好用的分割字符串的库。
strings.FieldsFunc 得到子文件中每个单词及其出现的频率后,将这份hashMap以 JSON
格式存入nreduce个文件中,这里nreduce为3。
对于文件 mrtmp.kjv12.txt-0 966967
,会被分为3个文件
mrtmp.kjv12.txt-0-0 mrtmp.kjv12.txt-0-1 mrtmp.kjv12.txt-0-2
GoJson
在进入Reduce阶段时,已经存在了nmap*nreduce个文件。
mrtmp.kjv12.txt-0-0 mrtmp.kjv12.txt-0-1 mrtmp.kjv12.txt-0-2 mrtmp.kjv12.txt-1-0 mrtmp.kjv12.txt-1-1 mrtmp.kjv12.txt-1-2 mrtmp.kjv12.txt-2-0 mrtmp.kjv12.txt-2-1 mrtmp.kjv12.txt-2-2 mrtmp.kjv12.txt-3-0 mrtmp.kjv12.txt-3-1 mrtmp.kjv12.txt-3-2 mrtmp.kjv12.txt-4-0 mrtmp.kjv12.txt-4-1 mrtmp.kjv12.txt-4-2
在Reduce时,进行nreduce次DoReduce(), 每一次里面都打开nmap个文件。
在一次DoReduce()中,对nmap个文件建立一个 map[string]*list.List
,用于存放nmap个文件中不同单词以及在不同文件中出现的频率,这里的频率是用字符串长度表示的。将nmap个文件中单词出现的频率进行合计,用JSON格式写入文件中。nreduce次DoReduce()总共产生nreduce个merge文件。
Merge()时已经存在nreduce个文件,此处nreduce为3.每个文件中存放着若干单词和对应的频率。
Merge()操作对3个文件的内容再一次进行合计。用一个hashMap存放3个文件中的单词和对应的频率。将最终的结果写入文件
partII是真正的分布式的MapReduce。对应于上图,test_test.go是我们的UserProgram,在setup()中开启的master,之后开启了2个Worker。
java
func TestBasic(t *testing.T) { InitLog() Logger.Printf("Test: Basic mapreduce .../n") mr := setup() for i := 0; i < 2; i++ { go RunWorker(mr.MasterAddress, port("worker"+strconv. (i)),MapFunc, ReduceFunc, -1) } // Wait until MR is done <-mr.DoneChannel check(t, mr.file) checkWorker(t, mr.stats) cleanup(mr) fmt.Printf(" ... Basic Passed/n") }
master是负责切分文件,给Worker分配任务,合并结果。在开启Master时,将MapReduce通过rpc暴露给Worker。然后等待Worker注册。此时Worker可以调用MapReduce的Register(),从而通知Master有Worker可用。
开启Worker时,也通过RPC将Worker暴露给Master,之后通过RPC调用Register(),通知Master有Worker可用。等待Master分配任务。
在mapreduce.go中,StarRegistrationServer()负责Master监听是否有Worker进行RPC调用。
java
func (mr *MapReduce) StartRegistrationServer() { rpcs := rpc.NewServer() rpcs.Register(mr)//take mr expose to client os.Remove(mr.MasterAddress) // only needed for "unix" l, e := net.Listen("unix", mr.MasterAddress) if e != nil { log.Fatal("RegstrationServer", mr.MasterAddress, " error: ", e) } mr.l = l // now that we are listening on the master address, can fork off // accepting connections to another thread. go func() { for mr.alive { conn, err := mr.l.Accept() Logger.Printf("accepted %T from worker/n", conn) if err == nil { go func() { rpcs.ServeConn(conn) conn.Close() }() } else { DPrintf("RegistrationServer: accept error", err) break } } DPrintf("RegistrationServer: done/n") }() }
其中,mr.l.Accept()会阻塞。如果Worker此时通过RPC调用Register()
java
func call(srv string, rpcname string, args interface{}, reply interface{}) bool { c, errx := rpc.Dial("unix", srv) if errx != nil { return false } defer c.Close() err := c.Call(rpcname, args, reply) if err == nil { return true } fmt.Println(err) return false }
rpc.Dial("unix", srv)
时,Master会收到这个连接请求。 conn, err := mr.l.Accept()
的阻塞结束,到 rpcs.ServeConn(conn)
时继续阻塞,等待Worker的调用。 c.Call(rpcname, args, reply)
,其中rpcname为Master的一个方法名,此处是Register。 rpcs.ServeConn(conn)
阻塞结束,调用成功。 worker failures 可能发生在RPC调用的时候。
java
func doMap(mr *MapReduce, worker chan string) { done := make(chan int) for i := 0; i<mr.nMap; i++ { go func(i int) { for avil := range worker { args := DoJobArgs_new(mr.file, Map, i, mr.nReduce) var reply DoJobReply ok := call(avil, "Worker.DoJob", args, &reply) if ok { done<-1 worker<-avil break// break the for range }else { fmt.Printf("No aviliable worker for Map %d/n", args.JobNumber) } } }(i) } for i :=0; i<mr.nMap; i++ { <-done } }
所以用 for avil := range worker
不停地从channel中取可用的worker来完成响应的工作,只有完成后才 break
。
如果call不会发生错误,那么 for avil := range worker
完全可以改为 avil:=<-worker