mit6.824 lab1 (2022)

go;gutter:true; lab1 要求按照论文实现一个mapReduce 框架</p> <pre><code> ;gutter:true;
lab1 :https://pdos.csail.mit.edu/6.824/labs/lab-mr.html

论文:https://zhuanlan.zhihu.com/p/122571315

在mrsequential.go文件中有个单机版mapReduce实现很简单建议阅读。

go;gutter:true;undefined

go;gutter:true;undefined

整体框架流程:

Coordinator 是协调器,负责

① 给woker分发任务

② 合并由map任务执行产生的中间文件

③ 任务超时重新分配任务

woker 是工作器,负责

①循环申请map 或reduce任务

先看woker:

worker 向 Coordinator 发送任务申请后,判断得到的是什么样类型的任务

go;gutter:true; //申请任务 for { args := Args{} args.Signal = REQUEST_WORKER</p> <p>reply := RpcCall(args) switch reply.STATUS { case COORDINATOR_MAP: //获得map任务   MapHandle(&reply,mapf) case COORDINATOR_REDUCE: //获得reduce任务     ReduceHandle(&reply,reducef)</p> <pre><code> </code></pre> <p>case COORDINATOR__MAP_END: //没申请到任务重新获取   continue  case END: //结束   return</p> <pre><code> ;gutter:true;
}

Recude任务  处理方式和mrsequential.go中几乎是一样的不多说了。map任务  会从Coordinator 获得文件名、任务id、Nreduce(中间文件个数)

undefined

mit6.824 lab1 (2022)

kva是通过mapf 对文件处理得到的数据。

我开启两个任务分发器,和Nreduce 个文件写入器,进行并发处理数据。将数据写入到Nreduce个中间文件中,分发依据为ihash函数。

go;gutter:true; kva := MapMachingFile(reply.FileName, mapf) midFileName := "mr-out-" + reply.FileName chanArray := make([]chan KeyValue, 10) for i := 0; i < 10; i++ { chanArray[i] = make(chan KeyValue, 10) }</p> <pre><code>//开启reduceNumber个文件写入线程 var w sync.WaitGroup var mapW sync.WaitGroup w.Add(reply.Neduce) mapW.Add(2) for i := 0; i < 10; i++ { go GoMakeMidFile(midFileName+strconv.Itoa(i), chanArray[i], &w) } // 开启分发线程,分发数据到文件写入线程 lenght := len(kva) go MapDistributeMidData(chanArray, kva[:lenght/2], &mapW) go MapDistributeMidData(chanArray, kva[lenght/2:], &mapW) //所有分发线程结束 mapW.Wait() for cIndex := 0; cIndex < 10; cIndex++ { close(chanArray[cIndex]) } //所有文件写入线程结束 w.Wait() </code></pre> <pre><code> worker结束剩下看Coordinator 。 </code></pre> <p>1 type Coordinator struct { 2 filebit //数据分发记录 3 Nreduce int 4 midFileMergeC chan int 5 Mergefiled //已处理数据记录 6 monitorC []chan int //监听每个worker是否按时完成 7 STATUS 8 RedeceS 9 *sync.Mutex 10 End bool 11 }</p> <pre><code> </code></pre> <p>Coordinator  结构记录的信息主要为三部分        2、3、4、5行记录map相关        6 为监听chan,监听任务是否超时        7位Coordinator 当前的状态,通过状态判断要分发map任务、reduce任务、结束判断worker的目的,请求任务就分发任务处理,完成map任务就将所有map产生中间数据一一对应合并到Nreduce个文件中。</p> <pre><code> </code></pre> <p>//信号处理 func (c <em>Coordinator) SignalTask (args </em>Args, reply *Reply) error { switch args.Signal { case REQUEST_WORKER: c.distributeTask(args,reply)</p> <pre><code>//中间文件处理 case COMPLETE: c.midFileMerge(args,reply) } return nil </code></pre> <p>}</p> <pre><code> 在初始化Coordinator时,还会打开一些线程。本线程会开启10个中间文件写入线程,当每个worker处理完map任务后,会将自己处理的map文件相关信息传给 _Coordinator,_ _Coordinator通过chan将数据发给每个文件合并线程_StartMergeFile。 _举个例子_ _workerMap A产生了 1,2,3 个中间文件_ _1号文件 合并到 mr-out-m-1_ _2号文件 合并到 mr-out-m-2_ _3号文件 合并到 mr-out-m-3_ _workerMap B 又产生1、2、3个中间文件_ _1号文件 合并到 mr-out-m-1_ _2号文件 合并到 mr-out-m-2_ _3号文件 合并到 mr-out-m-3_ </code></pre> <pre><code> ;gutter:true;
//开启Nreduce个中间文件写入线程
//返回文件写入chan 切片
func (c *Coordinator)runFileWorker () []chan int {
cLi := make([]chan int,c.Nreduce)
for i := 0 ; i < c.Nreduce ; i ++ {
cLi[i] = make(chan int,10)
}

for fid := 0 ; fid < c.Nreduce ; fid ++ {
go c.StartMergeFile(fid,cLi[fid])
}
return cLi
}

记录信息是否已处理的结构:
filebit 、ReduceS 核心是通过一个简单的bitmap实现的

go;gutter:true; type filebit struct{ rw *sync.Mutex bitMap file []string }</p> <p>type RedeceS struct { filebit }</p> <pre><code> ;gutter:true;
type bitMap struct {
bit int16
size int
}

//获取一个未使用位置
func (b *bitMap) GetOne() int {
for i := int(0) ; i < b.size ; i ++ {
if b.isZero(i) {
b.seTUsed(i)
return i
}
}
//这里超过size限制会直接报错
return -1
}

//第i位是否为0
//为0未使用
func (b *bitMap) isZero (index int) bool {
return ((1 << index) & b.bit) == 0
}

//设置index位已使用
func (b *bitMap) seTUsed (index int) {
b.bit = (1 << index) | b.bit
}

func (b *bitMap) setEnUsed (index int) {
b.bit = (0 << index) | b.bit
}

undefined

任务超时处理:
func (c *Coordinator) monitorWorker (id int) {
    timer := time.NewTimer(time.Duration(time.Second*10))
    select {
    case c.monitorC[id]:
        return
    case timer.C:
        //超时设置为未分配,重新分配
        c.SetEnUsed(id)
    }
}

每次分发一个任务出去,就会开启一个线程监听刚发送出去的任务。

当Coordinator 接收到任务完成信号,就会给任务id对应的信号监听函数发送信息,结束监听函数。

当未在规定时间内发送信号给监听函数,则将当前监听的任务id在 filebit结构中 标记在为未分发,重新轮循分发给下一个到来的worker。

如果这个未按时完成任务的worker后来完成任务并且发送信号过来,当这个任务已经还是为未分发状态则舍弃这个worker请求。

如果这个任务同时分发给了其他worker,则接收这个worker,舍弃最后来的。(这里设计的不太好)

undefined

Original: https://www.cnblogs.com/thotf/p/16458901.html
Author: thotf
Title: mit6.824 lab1 (2022)

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/516259/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • Golang:将日志以Json格式输出到Kafka

    工程实践中,我们往往还需要对日志进行采集,将日志归集到一起,然后用于各种处理分析,比如生产环境上的错误分析、异常告警等等。在日志消息系统领域,Kafka久负盛名,这篇文章就以将日志…

    Go语言 2023年5月25日
    092
  • go程序添加远程调用tcpdump功能

    最近开发的telemetry采集系统上线了。听起来高大上,简单来说就是一个grpc/udp服务端,用户的机器(路由器、交换机)将它们的各种统计数据上报采集、整理后交后端的各类AI分…

    Go语言 2023年5月25日
    072
  • Go语言之函数

    函数就是一块执行特定任务的代码,在输入源的基础上通过一些算法生成预期的输出。 Go 语言中的函数声明语法如下: func 函数名(参数名 类型,参数名 类型)(返回值1类型,返回值…

    Go语言 2023年5月25日
    048
  • 【Go实战基础】数组实战,程序员的基本功

    数组实战,程序员的基本功。 实战需求: 输入一个整数数组,实现一个函数来调整该数组中数字的顺序,使得所有奇数位于数组的前半部分,所有偶数位于数组的后半部分。 实战思路: 1、先声明…

    Go语言 2023年5月25日
    050
  • 如何在 Go 中将 []byte 转换为 io.Reader?

    原文链接: 如何在 Go 中将 []byte 转换为 io.Reader? 在 stackoverflow 上看到一个问题,题主进行了一个网络请求,接口返回的是 []byte。如果…

    Go语言 2023年5月25日
    083
  • 惨,给Go提的代码被批麻了

    hello大家好,我是小楼。 不知道大家还记不记得我上次找到了一个Go的Benchmark执行会超时的Bug?就是这篇文章《我好像发现了一个Go的Bug?》。 之后我就向Go提交了…

    Go语言 2023年5月25日
    086
  • Go语言的goroutine

    Go世界里,每一个并发执行的活动成为goroutine。 通过创建goroutine,就可以实现并行运算,十分方便。 如果有函数f(),那么: f():调用函数f(),并且等待它返…

    Go语言 2023年5月29日
    078
  • Golang通脉之指针

    指针的概念 指针是存储另一个变量的内存地址的变量。 变量是一种使用方便的占位符,用于引用计算机内存地址。 一个指针变量可以指向任何一个值的内存地址。 在上面的图中,变量b的值为15…

    Go语言 2023年5月25日
    090
  • 对不起,我错了,这代码不好写

    hello,大家好呀,我是小楼。 前几天不是写了这篇文章《发现一个开源项目优化点,点进来就是你的了》嘛。 文章介绍了Sentinl的自适应缓存时间戳算法,从原理到实现都手把手解读了…

    Go语言 2023年5月25日
    077
  • Go语言实现线程安全访问队列

    这个例子用Go语言的包”container/list”实现一个线程安全访问的队列。其中不少细节耐人寻味,做出它则是花费了不少精力,找不到样例啊! Go语言的…

    Go语言 2023年5月29日
    057
  • Go语言之循环与条件判断

    Go 语言中没有 while 循环,只有一个 for 循环 for 变量初始化;条件;变量自增/自减 { 循环体内容 } 1、基本使用 for i := 0; i < 10;…

    Go语言 2023年5月25日
    071
  • Go Micro Dashboard – 实现细节(一)

    前言 Go Micro Dashboard是基于go-micro和ng-alain开发的, 它既是go-micro开发过程中的工具,也可以作为学习go-micro的实际案例。接下来…

    Go语言 2023年5月25日
    093
  • muduo源码分析之TcpServer模块

    这次我们开始 muduo源代码的实际编写,首先我们知道 muduo是 LT模式, Reactor模式,下图为 Reactor模式的流程图[来源1] 然后我们来看下 muduo的整体…

    Go语言 2023年5月25日
    060
  • 使用Go搭建并行排序处理管道笔记

    go;collapse:true;;gutter:true; package main</p> <p>import ( "bufio" …

    Go语言 2023年5月25日
    065
  • go微服务框架Kratos笔记(五)使用nacos作为服务注册和服务发现中心

    引言 上篇介绍了如何使用nacos作为配置管理中心,并使用viper来解析配置官方介绍nacos不仅可以用来做配置中心还支持服务注册、发现以及动态DNS服务功能 nacos的安装 …

    Go语言 2023年5月25日
    060
  • Go标准的目录结构(自总结)

    微服务版 &#x251C;&#x2500;&#x2500; LICENSE.md &#x251C;&#x2500;&#x2500; …

    Go语言 2023年5月25日
    058
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球