mit6.824 lab1 (2022)

go;gutter:true; lab1 要求按照论文实现一个mapReduce 框架</p> <pre><code> ;gutter:true;
lab1 :https://pdos.csail.mit.edu/6.824/labs/lab-mr.html

论文:https://zhuanlan.zhihu.com/p/122571315

在mrsequential.go文件中有个单机版mapReduce实现很简单建议阅读。

go;gutter:true;undefined

go;gutter:true;undefined

整体框架流程:

Coordinator 是协调器,负责

① 给woker分发任务

② 合并由map任务执行产生的中间文件

③ 任务超时重新分配任务

woker 是工作器,负责

①循环申请map 或reduce任务

先看woker:

worker 向 Coordinator 发送任务申请后,判断得到的是什么样类型的任务

go;gutter:true; //申请任务 for { args := Args{} args.Signal = REQUEST_WORKER</p> <p>reply := RpcCall(args) switch reply.STATUS { case COORDINATOR_MAP: //获得map任务   MapHandle(&reply,mapf) case COORDINATOR_REDUCE: //获得reduce任务     ReduceHandle(&reply,reducef)</p> <pre><code> </code></pre> <p>case COORDINATOR__MAP_END: //没申请到任务重新获取   continue  case END: //结束   return</p> <pre><code> ;gutter:true;
}

Recude任务  处理方式和mrsequential.go中几乎是一样的不多说了。map任务  会从Coordinator 获得文件名、任务id、Nreduce(中间文件个数)

undefined

mit6.824 lab1 (2022)

kva是通过mapf 对文件处理得到的数据。

我开启两个任务分发器,和Nreduce 个文件写入器,进行并发处理数据。将数据写入到Nreduce个中间文件中,分发依据为ihash函数。

go;gutter:true; kva := MapMachingFile(reply.FileName, mapf) midFileName := "mr-out-" + reply.FileName chanArray := make([]chan KeyValue, 10) for i := 0; i < 10; i++ { chanArray[i] = make(chan KeyValue, 10) }</p> <pre><code>//开启reduceNumber个文件写入线程 var w sync.WaitGroup var mapW sync.WaitGroup w.Add(reply.Neduce) mapW.Add(2) for i := 0; i < 10; i++ { go GoMakeMidFile(midFileName+strconv.Itoa(i), chanArray[i], &w) } // 开启分发线程,分发数据到文件写入线程 lenght := len(kva) go MapDistributeMidData(chanArray, kva[:lenght/2], &mapW) go MapDistributeMidData(chanArray, kva[lenght/2:], &mapW) //所有分发线程结束 mapW.Wait() for cIndex := 0; cIndex < 10; cIndex++ { close(chanArray[cIndex]) } //所有文件写入线程结束 w.Wait() </code></pre> <pre><code> worker结束剩下看Coordinator 。 </code></pre> <p>1 type Coordinator struct { 2 filebit //数据分发记录 3 Nreduce int 4 midFileMergeC chan int 5 Mergefiled //已处理数据记录 6 monitorC []chan int //监听每个worker是否按时完成 7 STATUS 8 RedeceS 9 *sync.Mutex 10 End bool 11 }</p> <pre><code> </code></pre> <p>Coordinator  结构记录的信息主要为三部分        2、3、4、5行记录map相关        6 为监听chan,监听任务是否超时        7位Coordinator 当前的状态,通过状态判断要分发map任务、reduce任务、结束判断worker的目的,请求任务就分发任务处理,完成map任务就将所有map产生中间数据一一对应合并到Nreduce个文件中。</p> <pre><code> </code></pre> <p>//信号处理 func (c <em>Coordinator) SignalTask (args </em>Args, reply *Reply) error { switch args.Signal { case REQUEST_WORKER: c.distributeTask(args,reply)</p> <pre><code>//中间文件处理 case COMPLETE: c.midFileMerge(args,reply) } return nil </code></pre> <p>}</p> <pre><code> 在初始化Coordinator时,还会打开一些线程。本线程会开启10个中间文件写入线程,当每个worker处理完map任务后,会将自己处理的map文件相关信息传给 _Coordinator,_ _Coordinator通过chan将数据发给每个文件合并线程_StartMergeFile。 _举个例子_ _workerMap A产生了 1,2,3 个中间文件_ _1号文件 合并到 mr-out-m-1_ _2号文件 合并到 mr-out-m-2_ _3号文件 合并到 mr-out-m-3_ _workerMap B 又产生1、2、3个中间文件_ _1号文件 合并到 mr-out-m-1_ _2号文件 合并到 mr-out-m-2_ _3号文件 合并到 mr-out-m-3_ </code></pre> <pre><code> ;gutter:true;
//开启Nreduce个中间文件写入线程
//返回文件写入chan 切片
func (c *Coordinator)runFileWorker () []chan int {
cLi := make([]chan int,c.Nreduce)
for i := 0 ; i < c.Nreduce ; i ++ {
cLi[i] = make(chan int,10)
}

for fid := 0 ; fid < c.Nreduce ; fid ++ {
go c.StartMergeFile(fid,cLi[fid])
}
return cLi
}

记录信息是否已处理的结构:
filebit 、ReduceS 核心是通过一个简单的bitmap实现的

go;gutter:true; type filebit struct{ rw *sync.Mutex bitMap file []string }</p> <p>type RedeceS struct { filebit }</p> <pre><code> ;gutter:true;
type bitMap struct {
bit int16
size int
}

//获取一个未使用位置
func (b *bitMap) GetOne() int {
for i := int(0) ; i < b.size ; i ++ {
if b.isZero(i) {
b.seTUsed(i)
return i
}
}
//这里超过size限制会直接报错
return -1
}

//第i位是否为0
//为0未使用
func (b *bitMap) isZero (index int) bool {
return ((1 << index) & b.bit) == 0
}

//设置index位已使用
func (b *bitMap) seTUsed (index int) {
b.bit = (1 << index) | b.bit
}

func (b *bitMap) setEnUsed (index int) {
b.bit = (0 << index) | b.bit
}

undefined

任务超时处理:
func (c *Coordinator) monitorWorker (id int) {
    timer := time.NewTimer(time.Duration(time.Second*10))
    select {
    case c.monitorC[id]:
        return
    case timer.C:
        //超时设置为未分配,重新分配
        c.SetEnUsed(id)
    }
}

每次分发一个任务出去,就会开启一个线程监听刚发送出去的任务。

当Coordinator 接收到任务完成信号,就会给任务id对应的信号监听函数发送信息,结束监听函数。

当未在规定时间内发送信号给监听函数,则将当前监听的任务id在 filebit结构中 标记在为未分发,重新轮循分发给下一个到来的worker。

如果这个未按时完成任务的worker后来完成任务并且发送信号过来,当这个任务已经还是为未分发状态则舍弃这个worker请求。

如果这个任务同时分发给了其他worker,则接收这个worker,舍弃最后来的。(这里设计的不太好)

undefined

Original: https://www.cnblogs.com/thotf/p/16458901.html
Author: thotf
Title: mit6.824 lab1 (2022)

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/516259/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • Golang项目的配置管理——Viper简易入门配置

    Golang项目的配置管理——Viper简易入门配置 What is Viper? From:https://github.com/spf13/viperViper is a co…

    Go语言 2023年5月25日
    057
  • 使用Go搭建并行排序处理管道笔记

    go;collapse:true;;gutter:true; package main</p> <p>import ( "bufio" …

    Go语言 2023年5月25日
    059
  • 为开源项目 go-gin-api 增加后台任务模块

    任务管理界面 (WEB) 任务调度器 任务执行器 小结 推荐阅读 任务管理界面 (WEB) 支持在 WEB &#x754C;&#x9762; 中对任务进行管理,例如…

    Go语言 2023年5月25日
    0112
  • 使用go语言遇到的一些问题记录

    一、参数校验问题 使用go做web服务时,经常需要对请求参数进行校验,有些必填参数需要校验是否为空。 经常会遇到参数a为int类型,但是其值取值范围为0-xxx。0也是有意义的。 …

    Go语言 2023年5月29日
    065
  • 系统调用跟踪——ls功能实现(二)

    在上篇文章中我们跟踪 ls命令看到了其所使用的这么几个系统调用: stat、openat、fstat、getdents、close、write等,这里再简单介绍下这几个系统调用的功…

    Go语言 2023年5月25日
    067
  • 十分钟学会Golang开发gRPC服务

    gRPC是Google发起的一个开源RPC框架,使用HTTP/2传输协议,使用Protocol Buffers编码协议,相比RESTful框架的程序性能提高不少,而且当前流行的编程…

    Go语言 2023年5月25日
    077
  • Go语言学习笔记1

    1.Go语言环境搭建及基础知识 Go语言官方网站(http://golang.org)代码包文档网站(http://godoc.org)Go语言中文网(http://studygo…

    Go语言 2023年5月29日
    068
  • Go语言之高级篇beego框架之view

    1、基本语法 go统一使用了{{ 和 }}作为左右标签,没有其它的标签符号。 如果你想要修改为其它符号,可以修改配置文件。 使用.来访问当前位置的上下文 使用$来引用当前模板根级的…

    Go语言 2023年5月29日
    070
  • Maglev : A Fast and Reliable Software Network Load Balancer (using Consistent Hashing)

    前言(为什么想读这一篇论文) 这一篇论文吸引我注意的原因是,Consistent Hashing ;本来的特性就是作为分布式缓存之用。谷歌将他们的负载均衡器(代号:Maglev)发…

    Go语言 2023年5月25日
    063
  • Go内存逃逸分析

    Go的内存逃逸及逃逸分析 Go的内存逃逸 分析内存逃逸之前要搞清楚一件事 我们编写的程序中的 函数和 局部变量默认是存放在栈上的(补充一点堆上存储的数据的指针 是存放在栈上的 因为…

    Go语言 2023年5月25日
    051
  • muduo源码分析之回调模块

    这次我们主要来说说 muduo库中大量使用的回调机制。 muduo主要使用的是利用 Callback的方式来实现回调,首先我们在自己的 EchoServer构造函数中有这样几行代码…

    Go语言 2023年5月25日
    044
  • GO后端开发+VUE实列

    因为我是从java转到go,代码结构跟我之前用java的很像 在这里只浅显的实战运用,没有过多理论讲解 工作环境:IDE:Goland , Go 1.17.7 框架 Gin+Gor…

    Go语言 2023年5月25日
    066
  • 流量管制-令牌桶与漏桶

    404. 抱歉,您访问的资源不存在。 可能是网址有误,或者对应的内容被删除,或者处于私有状态。 代码改变世界,联系邮箱 contact@cnblogs.com 园子的商业化努力-困…

    Go语言 2023年5月25日
    062
  • 支持首次触发的 Go Ticker

    促使我写这篇文章主要是在写一个关于虚拟货币账户监控的项目时使用 Ticker 的问题。 Ticker 的问题 如果用过 Ticker 的朋友会知道,创建 Ticker 后并不会马上…

    Go语言 2023年5月25日
    075
  • HTTP 尝试获取 Client IP

    HTTP 中获取 Client IP 相关策略需求, 在当下网络环境中多数只能提供建议作用. 更多的是 通过其它唯一标识来挖掘更多潜在价值. 本文主要就一个内容, 如何最大可能尝试…

    Go语言 2023年5月25日
    063
  • 开始读 Go 源码了

    学完 Go 的基础知识已经有一段时间了,那么接下来应该学什么呢?有几个方向可以考虑,比如说 Web 开发,网络编程等。 在写项目的过程中,发现一个问题。实现功能是没问题的,但不知道…

    Go语言 2023年5月25日
    048
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球