转载

MapReduce

MapReduce

@author qcliu

@time 2015/07/29

Abstract

MIT6.824 Lab1

总结。

Part I: Word count

第一个任务是单机版词频统计。入口为 wc.gomain() 。 统计的过程分为4个部分:

  • Split
  • Map
  • Reduce
  • Merge

Split

将一个大文件切分成若干个小文件。 采用逐行扫描的方式。涉及到

  • bufio.Write
  • bufio.Scanner

Map

经过Spilt过程, kjv12.txt(file size:4834757) 已经被切分成了5个文件。

mrtmp.kjv12.txt-0 966967 mrtmp.kjv12.txt-1 966941 mrtmp.kjv12.txt-2 966974 mrtmp.kjv12.txt-3 966970 mrtmp.kjv12.txt-4 966905 

之后执行DoMap()操作。

for i:=0; i<nMap; i++ {     DoMap(i, mr.file, mr.NReduce, Map) } 

对每一个子文件进行DoMap()。

在DoMap()中,将子文件的内容读到内存中的bype[]。对bype[]进行Map()操作。

Map()是对传入的string进行单词切分,然后建立一个HashMap,key是单词,value是单词出现的频率。这里可以采用bit方式记录词频,即1111代表4次。

这里Go提供了一个很好用的分割字符串的库。

strings.FieldsFunc

得到子文件中每个单词及其出现的频率后,将这份hashMap以 JSON 格式存入nreduce个文件中,这里nreduce为3。

对于文件 mrtmp.kjv12.txt-0 966967 ,会被分为3个文件

mrtmp.kjv12.txt-0-0 mrtmp.kjv12.txt-0-1 mrtmp.kjv12.txt-0-2 

GoJson

Reduce

在进入Reduce阶段时,已经存在了nmap*nreduce个文件。

mrtmp.kjv12.txt-0-0 mrtmp.kjv12.txt-0-1 mrtmp.kjv12.txt-0-2 mrtmp.kjv12.txt-1-0 mrtmp.kjv12.txt-1-1 mrtmp.kjv12.txt-1-2 mrtmp.kjv12.txt-2-0 mrtmp.kjv12.txt-2-1 mrtmp.kjv12.txt-2-2 mrtmp.kjv12.txt-3-0 mrtmp.kjv12.txt-3-1 mrtmp.kjv12.txt-3-2 mrtmp.kjv12.txt-4-0 mrtmp.kjv12.txt-4-1 mrtmp.kjv12.txt-4-2 

在Reduce时,进行nreduce次DoReduce(), 每一次里面都打开nmap个文件。

在一次DoReduce()中,对nmap个文件建立一个 map[string]*list.List ,用于存放nmap个文件中不同单词以及在不同文件中出现的频率,这里的频率是用字符串长度表示的。将nmap个文件中单词出现的频率进行合计,用JSON格式写入文件中。nreduce次DoReduce()总共产生nreduce个merge文件。

Merge

Merge()时已经存在nreduce个文件,此处nreduce为3.每个文件中存放着若干单词和对应的频率。

Merge()操作对3个文件的内容再一次进行合计。用一个hashMap存放3个文件中的单词和对应的频率。将最终的结果写入文件

Part II: MapReduce

MapReduce partII是真正的分布式的MapReduce。对应于上图,test_test.go是我们的UserProgram,在setup()中开启的master,之后开启了2个Worker。

javafunc TestBasic(t *testing.T) {  InitLog()  Logger.Printf("Test: Basic mapreduce .../n")  mr := setup()  for i := 0; i < 2; i++ {   go RunWorker(mr.MasterAddress, port("worker"+strconv.   (i)),MapFunc, ReduceFunc, -1)  }  // Wait until MR is done  <-mr.DoneChannel  check(t, mr.file)  checkWorker(t, mr.stats)  cleanup(mr)  fmt.Printf("  ... Basic Passed/n") }  

Master

master是负责切分文件,给Worker分配任务,合并结果。在开启Master时,将MapReduce通过rpc暴露给Worker。然后等待Worker注册。此时Worker可以调用MapReduce的Register(),从而通知Master有Worker可用。

Worker

开启Worker时,也通过RPC将Worker暴露给Master,之后通过RPC调用Register(),通知Master有Worker可用。等待Master分配任务。

Example

在mapreduce.go中,StarRegistrationServer()负责Master监听是否有Worker进行RPC调用。

javafunc (mr *MapReduce) StartRegistrationServer() {  rpcs := rpc.NewServer()  rpcs.Register(mr)//take mr expose to client  os.Remove(mr.MasterAddress) // only needed for "unix"  l, e := net.Listen("unix", mr.MasterAddress)  if e != nil {   log.Fatal("RegstrationServer", mr.MasterAddress, " error: ", e)  }  mr.l = l  // now that we are listening on the master address, can fork off  // accepting connections to another thread.  go func() {   for mr.alive {    conn, err := mr.l.Accept()    Logger.Printf("accepted %T from worker/n", conn)    if err == nil {     go func() {      rpcs.ServeConn(conn)      conn.Close()     }()    } else {     DPrintf("RegistrationServer: accept error", err)     break    }   }   DPrintf("RegistrationServer: done/n")  }() }  

其中,mr.l.Accept()会阻塞。如果Worker此时通过RPC调用Register()

javafunc call(srv string, rpcname string, args interface{}, reply interface{}) bool {  c, errx := rpc.Dial("unix", srv)  if errx != nil {   return false  }  defer c.Close()  err := c.Call(rpcname, args, reply)  if err == nil {   return true  }  fmt.Println(err)  return false }  
  1. srv为Master的地址。当Worker执行 rpc.Dial("unix", srv) 时,Master会收到这个连接请求。
  2. 于是 conn, err := mr.l.Accept() 的阻塞结束,到 rpcs.ServeConn(conn) 时继续阻塞,等待Worker的调用。
  3. 当Worker执行 c.Call(rpcname, args, reply) ,其中rpcname为Master的一个方法名,此处是Register。
  4. 此时Master的 rpcs.ServeConn(conn) 阻塞结束,调用成功。

Part III: Handling worker failures

worker failures 可能发生在RPC调用的时候。

javafunc doMap(mr *MapReduce, worker chan string) {  done := make(chan int)  for i := 0; i<mr.nMap; i++ {   go func(i int) {    for avil := range worker {     args := DoJobArgs_new(mr.file, Map, i, mr.nReduce)     var reply DoJobReply     ok := call(avil, "Worker.DoJob", args, &reply)     if ok {      done<-1      worker<-avil      break// break the for range     }else {      fmt.Printf("No aviliable worker for Map %d/n", args.JobNumber)     }    }   }(i)  }  for i :=0; i<mr.nMap; i++ {   <-done  } }  

所以用 for avil := range worker 不停地从channel中取可用的worker来完成响应的工作,只有完成后才 break

如果call不会发生错误,那么 for avil := range worker 完全可以改为 avil:=<-worker

正文到此结束
Loading...