mit6.824 lab1 (2022)

go;gutter:true; lab1 要求按照论文实现一个mapReduce 框架</p> <pre><code> ;gutter:true;
lab1 :https://pdos.csail.mit.edu/6.824/labs/lab-mr.html

论文:https://zhuanlan.zhihu.com/p/122571315

在mrsequential.go文件中有个单机版mapReduce实现很简单建议阅读。

go;gutter:true;undefined

go;gutter:true;undefined

整体框架流程:

Coordinator 是协调器,负责

① 给woker分发任务

② 合并由map任务执行产生的中间文件

③ 任务超时重新分配任务

woker 是工作器,负责

①循环申请map 或reduce任务

先看woker:

worker 向 Coordinator 发送任务申请后,判断得到的是什么样类型的任务

go;gutter:true; //申请任务 for { args := Args{} args.Signal = REQUEST_WORKER</p> <p>reply := RpcCall(args) switch reply.STATUS { case COORDINATOR_MAP: //获得map任务   MapHandle(&reply,mapf) case COORDINATOR_REDUCE: //获得reduce任务     ReduceHandle(&reply,reducef)</p> <pre><code> </code></pre> <p>case COORDINATOR__MAP_END: //没申请到任务重新获取   continue  case END: //结束   return</p> <pre><code> ;gutter:true;
}

Recude任务  处理方式和mrsequential.go中几乎是一样的不多说了。map任务  会从Coordinator 获得文件名、任务id、Nreduce(中间文件个数)

undefined

mit6.824 lab1 (2022)

kva是通过mapf 对文件处理得到的数据。

我开启两个任务分发器,和Nreduce 个文件写入器,进行并发处理数据。将数据写入到Nreduce个中间文件中,分发依据为ihash函数。

go;gutter:true; kva := MapMachingFile(reply.FileName, mapf) midFileName := "mr-out-" + reply.FileName chanArray := make([]chan KeyValue, 10) for i := 0; i < 10; i++ { chanArray[i] = make(chan KeyValue, 10) }</p> <pre><code>//开启reduceNumber个文件写入线程 var w sync.WaitGroup var mapW sync.WaitGroup w.Add(reply.Neduce) mapW.Add(2) for i := 0; i < 10; i++ { go GoMakeMidFile(midFileName+strconv.Itoa(i), chanArray[i], &w) } // 开启分发线程,分发数据到文件写入线程 lenght := len(kva) go MapDistributeMidData(chanArray, kva[:lenght/2], &mapW) go MapDistributeMidData(chanArray, kva[lenght/2:], &mapW) //所有分发线程结束 mapW.Wait() for cIndex := 0; cIndex < 10; cIndex++ { close(chanArray[cIndex]) } //所有文件写入线程结束 w.Wait() </code></pre> <pre><code> worker结束剩下看Coordinator 。 </code></pre> <p>1 type Coordinator struct { 2 filebit //数据分发记录 3 Nreduce int 4 midFileMergeC chan int 5 Mergefiled //已处理数据记录 6 monitorC []chan int //监听每个worker是否按时完成 7 STATUS 8 RedeceS 9 *sync.Mutex 10 End bool 11 }</p> <pre><code> </code></pre> <p>Coordinator  结构记录的信息主要为三部分        2、3、4、5行记录map相关        6 为监听chan,监听任务是否超时        7位Coordinator 当前的状态,通过状态判断要分发map任务、reduce任务、结束判断worker的目的,请求任务就分发任务处理,完成map任务就将所有map产生中间数据一一对应合并到Nreduce个文件中。</p> <pre><code> </code></pre> <p>//信号处理 func (c <em>Coordinator) SignalTask (args </em>Args, reply *Reply) error { switch args.Signal { case REQUEST_WORKER: c.distributeTask(args,reply)</p> <pre><code>//中间文件处理 case COMPLETE: c.midFileMerge(args,reply) } return nil </code></pre> <p>}</p> <pre><code> 在初始化Coordinator时,还会打开一些线程。本线程会开启10个中间文件写入线程,当每个worker处理完map任务后,会将自己处理的map文件相关信息传给 _Coordinator,_ _Coordinator通过chan将数据发给每个文件合并线程_StartMergeFile。 _举个例子_ _workerMap A产生了 1,2,3 个中间文件_ _1号文件 合并到 mr-out-m-1_ _2号文件 合并到 mr-out-m-2_ _3号文件 合并到 mr-out-m-3_ _workerMap B 又产生1、2、3个中间文件_ _1号文件 合并到 mr-out-m-1_ _2号文件 合并到 mr-out-m-2_ _3号文件 合并到 mr-out-m-3_ </code></pre> <pre><code> ;gutter:true;
//开启Nreduce个中间文件写入线程
//返回文件写入chan 切片
func (c *Coordinator)runFileWorker () []chan int {
cLi := make([]chan int,c.Nreduce)
for i := 0 ; i < c.Nreduce ; i ++ {
cLi[i] = make(chan int,10)
}

for fid := 0 ; fid < c.Nreduce ; fid ++ {
go c.StartMergeFile(fid,cLi[fid])
}
return cLi
}

记录信息是否已处理的结构:
filebit 、ReduceS 核心是通过一个简单的bitmap实现的

go;gutter:true; type filebit struct{ rw *sync.Mutex bitMap file []string }</p> <p>type RedeceS struct { filebit }</p> <pre><code> ;gutter:true;
type bitMap struct {
bit int16
size int
}

//获取一个未使用位置
func (b *bitMap) GetOne() int {
for i := int(0) ; i < b.size ; i ++ {
if b.isZero(i) {
b.seTUsed(i)
return i
}
}
//这里超过size限制会直接报错
return -1
}

//第i位是否为0
//为0未使用
func (b *bitMap) isZero (index int) bool {
return ((1 << index) & b.bit) == 0
}

//设置index位已使用
func (b *bitMap) seTUsed (index int) {
b.bit = (1 << index) | b.bit
}

func (b *bitMap) setEnUsed (index int) {
b.bit = (0 << index) | b.bit
}

undefined

任务超时处理:
func (c *Coordinator) monitorWorker (id int) {
    timer := time.NewTimer(time.Duration(time.Second*10))
    select {
    case c.monitorC[id]:
        return
    case timer.C:
        //超时设置为未分配,重新分配
        c.SetEnUsed(id)
    }
}

每次分发一个任务出去,就会开启一个线程监听刚发送出去的任务。

当Coordinator 接收到任务完成信号,就会给任务id对应的信号监听函数发送信息,结束监听函数。

当未在规定时间内发送信号给监听函数,则将当前监听的任务id在 filebit结构中 标记在为未分发,重新轮循分发给下一个到来的worker。

如果这个未按时完成任务的worker后来完成任务并且发送信号过来,当这个任务已经还是为未分发状态则舍弃这个worker请求。

如果这个任务同时分发给了其他worker,则接收这个worker,舍弃最后来的。(这里设计的不太好)

undefined

Original: https://www.cnblogs.com/thotf/p/16458901.html
Author: thotf
Title: mit6.824 lab1 (2022)

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/516259/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球