go;gutter:true;
lab1 要求按照论文实现一个mapReduce 框架</p>
<pre><code>
;gutter:true;
lab1 :https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
论文:https://zhuanlan.zhihu.com/p/122571315
在mrsequential.go文件中有个单机版mapReduce实现很简单建议阅读。
go;gutter:true;undefined
go;gutter:true;undefined
整体框架流程:
Coordinator 是协调器,负责
① 给woker分发任务
② 合并由map任务执行产生的中间文件
③ 任务超时重新分配任务
woker 是工作器,负责
①循环申请map 或reduce任务
先看woker:
worker 向 Coordinator 发送任务申请后,判断得到的是什么样类型的任务
go;gutter:true;
//申请任务
for {
args := Args{}
args.Signal = REQUEST_WORKER</p>
<p>reply := RpcCall(args)
switch reply.STATUS {
case COORDINATOR_MAP: //获得map任务
MapHandle(&reply,mapf)
case COORDINATOR_REDUCE: //获得reduce任务
ReduceHandle(&reply,reducef)</p>
<pre><code>
</code></pre>
<p>case COORDINATOR__MAP_END: //没申请到任务重新获取 continue case END: //结束 return</p>
<pre><code>
;gutter:true;
}
Recude任务 处理方式和mrsequential.go中几乎是一样的不多说了。map任务 会从Coordinator 获得文件名、任务id、Nreduce(中间文件个数)
undefined
kva是通过mapf 对文件处理得到的数据。
我开启两个任务分发器,和Nreduce 个文件写入器,进行并发处理数据。将数据写入到Nreduce个中间文件中,分发依据为ihash函数。
go;gutter:true;
kva := MapMachingFile(reply.FileName, mapf)
midFileName := "mr-out-" + reply.FileName
chanArray := make([]chan KeyValue, 10)
for i := 0; i < 10; i++ {
chanArray[i] = make(chan KeyValue, 10)
}</p>
<pre><code>//开启reduceNumber个文件写入线程
var w sync.WaitGroup
var mapW sync.WaitGroup
w.Add(reply.Neduce)
mapW.Add(2)
for i := 0; i < 10; i++ {
go GoMakeMidFile(midFileName+strconv.Itoa(i), chanArray[i], &w)
}
// 开启分发线程,分发数据到文件写入线程
lenght := len(kva)
go MapDistributeMidData(chanArray, kva[:lenght/2], &mapW)
go MapDistributeMidData(chanArray, kva[lenght/2:], &mapW)
//所有分发线程结束
mapW.Wait()
for cIndex := 0; cIndex < 10; cIndex++ {
close(chanArray[cIndex])
}
//所有文件写入线程结束
w.Wait()
</code></pre>
<pre><code>
worker结束剩下看Coordinator 。
</code></pre>
<p>1 type Coordinator struct {
2 filebit //数据分发记录
3 Nreduce int
4 midFileMergeC chan int
5 Mergefiled //已处理数据记录
6 monitorC []chan int //监听每个worker是否按时完成
7 STATUS
8 RedeceS
9 *sync.Mutex
10 End bool
11 }</p>
<pre><code>
</code></pre>
<p>Coordinator 结构记录的信息主要为三部分 2、3、4、5行记录map相关 6 为监听chan,监听任务是否超时 7位Coordinator 当前的状态,通过状态判断要分发map任务、reduce任务、结束判断worker的目的,请求任务就分发任务处理,完成map任务就将所有map产生中间数据一一对应合并到Nreduce个文件中。</p>
<pre><code>
</code></pre>
<p>//信号处理
func (c <em>Coordinator) SignalTask (args </em>Args, reply *Reply) error {
switch args.Signal {
case REQUEST_WORKER:
c.distributeTask(args,reply)</p>
<pre><code>//中间文件处理
case COMPLETE:
c.midFileMerge(args,reply)
}
return nil
</code></pre>
<p>}</p>
<pre><code>
在初始化Coordinator时,还会打开一些线程。本线程会开启10个中间文件写入线程,当每个worker处理完map任务后,会将自己处理的map文件相关信息传给 _Coordinator,_ _Coordinator通过chan将数据发给每个文件合并线程_StartMergeFile。
_举个例子_
_workerMap A产生了 1,2,3 个中间文件_
_1号文件 合并到 mr-out-m-1_
_2号文件 合并到 mr-out-m-2_
_3号文件 合并到 mr-out-m-3_
_workerMap B 又产生1、2、3个中间文件_
_1号文件 合并到 mr-out-m-1_
_2号文件 合并到 mr-out-m-2_
_3号文件 合并到 mr-out-m-3_
</code></pre>
<pre><code>
;gutter:true;
//开启Nreduce个中间文件写入线程
//返回文件写入chan 切片
func (c *Coordinator)runFileWorker () []chan int {
cLi := make([]chan int,c.Nreduce)
for i := 0 ; i < c.Nreduce ; i ++ {
cLi[i] = make(chan int,10)
}
for fid := 0 ; fid < c.Nreduce ; fid ++ {
go c.StartMergeFile(fid,cLi[fid])
}
return cLi
}
记录信息是否已处理的结构:
filebit 、ReduceS 核心是通过一个简单的bitmap实现的
go;gutter:true;
type filebit struct{
rw *sync.Mutex
bitMap
file []string
}</p>
<p>type RedeceS struct {
filebit
}</p>
<pre><code>
;gutter:true;
type bitMap struct {
bit int16
size int
}
//获取一个未使用位置
func (b *bitMap) GetOne() int {
for i := int(0) ; i < b.size ; i ++ {
if b.isZero(i) {
b.seTUsed(i)
return i
}
}
//这里超过size限制会直接报错
return -1
}
//第i位是否为0
//为0未使用
func (b *bitMap) isZero (index int) bool {
return ((1 << index) & b.bit) == 0
}
//设置index位已使用
func (b *bitMap) seTUsed (index int) {
b.bit = (1 << index) | b.bit
}
func (b *bitMap) setEnUsed (index int) {
b.bit = (0 << index) | b.bit
}
undefined
任务超时处理:
func (c *Coordinator) monitorWorker (id int) {
timer := time.NewTimer(time.Duration(time.Second*10))
select {
case c.monitorC[id]:
return
case timer.C:
//超时设置为未分配,重新分配
c.SetEnUsed(id)
}
}
每次分发一个任务出去,就会开启一个线程监听刚发送出去的任务。
当Coordinator 接收到任务完成信号,就会给任务id对应的信号监听函数发送信息,结束监听函数。
当未在规定时间内发送信号给监听函数,则将当前监听的任务id在 filebit结构中 标记在为未分发,重新轮循分发给下一个到来的worker。
如果这个未按时完成任务的worker后来完成任务并且发送信号过来,当这个任务已经还是为未分发状态则舍弃这个worker请求。
如果这个任务同时分发给了其他worker,则接收这个worker,舍弃最后来的。(这里设计的不太好)
undefined
Original: https://www.cnblogs.com/thotf/p/16458901.html
Author: thotf
Title: mit6.824 lab1 (2022)
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/516259/
转载文章受原作者版权保护。转载请注明原作者出处!