单机简易版mapReduce 实现

go;gutter:true;collapse:false import "fmt" import "6.824/mr" import "plugin" import "os" import "log" import "io/ioutil" import "sort"</p> <p>// for sorting by key.</p> <p>type ByKey []mr.KeyValue</p> <p>// for sorting by key.</p> <p>func (a ByKey) Len() int           { return len(a) } func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] } func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }</p> <p>func main() { if len(os.Args) < 3 { fmt.Fprintf(os.Stderr, "Usage: mrsequential xxx.so inputfiles...\n" ) os.Exit(1) }</p> <pre><code>mapf, reducef := loadPlugin(os.Args[1]) // // read each input file, // pass it to Map, // accumulate the intermediate Map output. // intermediate := []mr.KeyValue{} for _, filename := range os.Args[2:] { file, err := os.Open(filename) if err != nil { log.Fatalf( "cannot open %v" , filename) } content, err := ioutil.ReadAll(file) if err != nil { log.Fatalf( "cannot read %v" , filename) } file.Close() kva := mapf(filename, string(content)) intermediate = append(intermediate, kva...) } // // a big difference from real MapReduce is that all the // intermediate data is in one place, intermediate[], // rather than being partitioned into NxM buckets. // sort.Sort(ByKey(intermediate)) oname := "mr-out-0" ofile, _ := os.Create(oname) // // call Reduce on each distinct key in intermediate[], // and print the result to mr-out-0. // i := 0 for i < len(intermediate) { j := i + 1 for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key { j++ } values := []string{} for k := i; k < j; k++ { values = append(values, intermediate[k].Value) } output := reducef(intermediate[i].Key, values) // this is the correct format for each line of Reduce output. fmt.Fprintf(ofile, "%v %v\n" , intermediate[i].Key, output) i = j } ofile.Close() </code></pre> <p>}</p> <p>// // load the application Map and Reduce functions // from a plugin file, e.g. ../mrapps/wc.so // func loadPlugin(filename string) ( func (string, string) []mr.KeyValue, func (string, []string) string) { p, err := plugin.Open(filename) if err != nil { log.Fatalf( "cannot load plugin %v" , filename) } xmapf, err := p.Lookup( "Map" ) if err != nil { log.Fatalf( "cannot find Map in %v" , filename) } mapf := xmapf.( func (string, string) []mr.KeyValue) xreducef, err := p.Lookup( "Reduce" ) if err != nil { log.Fatalf( "cannot find Reduce in %v" , filename) } reducef := xreducef.( func (string, []string) string)</p> <pre><code>return mapf, reducef </code></pre> <p>}

摘自 MIT6.824

