go;gutter:true;collapse:false
import "fmt"
import "6.824/mr"
import "plugin"
import "os"
import "log"
import "io/ioutil"
import "sort"</p>
<p>// for sorting by key.</p>
<p>type ByKey []mr.KeyValue</p>
<p>// for sorting by key.</p>
<p>func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }</p>
<p>func main() {
if len(os.Args) < 3 {
fmt.Fprintf(os.Stderr, "Usage: mrsequential xxx.so inputfiles...\n" )
os.Exit(1)
}</p>
<pre><code>mapf, reducef := loadPlugin(os.Args[1])
//
// read each input file,
// pass it to Map,
// accumulate the intermediate Map output.
//
intermediate := []mr.KeyValue{}
for _, filename := range os.Args[2:] {
file, err := os.Open(filename)
if err != nil {
log.Fatalf( "cannot open %v" , filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf( "cannot read %v" , filename)
}
file.Close()
kva := mapf(filename, string(content))
intermediate = append(intermediate, kva...)
}
//
// a big difference from real MapReduce is that all the
// intermediate data is in one place, intermediate[],
// rather than being partitioned into NxM buckets.
//
sort.Sort(ByKey(intermediate))
oname := "mr-out-0"
ofile, _ := os.Create(oname)
//
// call Reduce on each distinct key in intermediate[],
// and print the result to mr-out-0.
//
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
// this is the correct format for each line of Reduce output.
fmt.Fprintf(ofile, "%v %v\n" , intermediate[i].Key, output)
i = j
}
ofile.Close()
</code></pre>
<p>}</p>
<p>//
// load the application Map and Reduce functions
// from a plugin file, e.g. ../mrapps/wc.so
//
func loadPlugin(filename string) ( func (string, string) []mr.KeyValue, func (string, []string) string) {
p, err := plugin.Open(filename)
if err != nil {
log.Fatalf( "cannot load plugin %v" , filename)
}
xmapf, err := p.Lookup( "Map" )
if err != nil {
log.Fatalf( "cannot find Map in %v" , filename)
}
mapf := xmapf.( func (string, string) []mr.KeyValue)
xreducef, err := p.Lookup( "Reduce" )
if err != nil {
log.Fatalf( "cannot find Reduce in %v" , filename)
}
reducef := xreducef.( func (string, []string) string)</p>
<pre><code>return mapf, reducef
</code></pre>
<p>}
摘自 MIT6.824
Original: https://www.cnblogs.com/thotf/p/16413164.html
Author: thotf
Title: 单机简易版mapReduce 实现
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/582256/
转载文章受原作者版权保护。转载请注明原作者出处!