Hadoop_MapReduce架构

MapReduce

4.1 MapReduce原理

  • 4.1.1 概述
  • 4.1.2MapReduce 的主要功能
    • 4.1.3MapReduce 的处理流程

4.1.1 MapReduce概述

MapReduce是面向大数据并行处理的计算模型、框架和平台,它包含以下三层含义。

(1)MapReduce是一个基于集群的高性能并行计算平台。
(2)MapReduce是一个并行计算与运行软件框架。
(3)MapReduce是一个并行程序设计模型与方法。

4.1.2 MapReduce的主要功能

1.数据划分和计算任务调度

Job 和 Task

Job待处理的数据划分为多个数据块
Task自动调度计算节点来处理相应的数据块

2.数据/代码互定位

本地化数据处理:一个计算节点尽可能处理其本地磁盘上所分布存储的数据,实现代码向数据的迁移。

3.系统优化

中间结果进入Reduce节点前会进行合并处理,一个Reduce节点所处理的数据可能来自多个Map节点。

4.出错检测和恢复

系统将维护数据存储的可靠性,用多备份冗余存储机制提高数据存储的可靠性,并能及时检测和恢复出错数据。

4.1.3 MapReduce的处理流程

Map

Shuffle

Reduce

MapShuffle

ReduceShuffle

1.Map和Reduce函数用户自定义,shuffle是由系统自动实现。
2.最终得到一个分区有序文件(具有Partition值的键值对存储在一起,并按key值进行升序排序)

4.2 Mapreduce 的编程基础

4.2.1内置数据类型

  • [Hadoops数据类型的使用]
    下面展示一些 数据类型

数据类型ValueBooleanWritable布尔型ByteWritable单字节数值DoubleWritable双字节数FlowWritable浮点IntWritable整形LongWritable长整型TextUTF-8格式存储的文本NullWritable当中的key或value为NULL时使用ArrayWritable数组

1) Hadoop数据类型的使用


package com.etc;

import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;

public class HadoopDataType {

    public static void testText() {
    System.out.println("testText");
    Text text = new Text ("hello hadoop!");
    System.out.println (text .getLength());
    System.out.println(text .find("a"));
    System. out.println (text.toString());
    }

    public static void testArrayWritable() {
    System. out . println ("testArrayWritable");
    ArrayWritable arr = new ArrayWritable (IntWritable.class);
    IntWritable year = new IntWritable (2017) ;
    IntWritable month = new IntWritable(07) ;
    IntWritable date = new IntWritable(01) ;
    arr.set (new IntWritable[] { year, month, date });
    System.out.println (String . format ("year=%d, month=%d, date=%d",
            ((IntWritable) arr .get()[0]).get(),
            ((IntWritable) arr.get()[1]).get(),
            ((IntWritable)arr.get() [2]) .get()));
    }

    public static void testMapWritable() {
    System.out.println("testMapWritable");
    MapWritable map = new MapWritable();
    Text k1 = new Text("name");
    Text v1 = new Text ("tonny");
    Text k2 = new Text ("password");
    map.put(k1, v1);
    map.put(k2, NullWritable.get());
    System.out.println(map.get (k1).toString());
    System.out.println(map.get(k2) .toString());
    }

    public static void main(String[] args) {

    testText();
    testArrayWritable();
    testMapWritable();
    }
}

2) 运行结果:

Hadoop_MapReduce架构

4.2.2 Hadoop MapReduce 架构

  • Hadoop MapReduce1.0体系架构 由 Client(客户端)、JobTracker(作业跟踪器)、TaskTracker (任务跟踪器)、Task(任务)组成。

Hadoop_MapReduce架构

MapReduce 设计的一个核心理念就是”计算向数据靠拢”,而不是传统计算模式的”数据向计算靠拢”。这是因为移动大量数据需要的网络传输开销太大,同时也大大降低了数据处理的效率。

; 4.2.3 MapReduce的工作流程

Input

Map

Sort

Combine

Partition

Reduce

Output

4.3MapReduce示例
4.3.1 WordCont原理

Hadoop_MapReduce架构
  • *流程

Hadoop_MapReduce架构
  • 示例
    Hadoop_MapReduce架构
package com.etc;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.StringTokenizer;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length < 2) {
          System.err.println("Usage: wordcount  [...] ");
          System.exit(2);
        }
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        for (int i = 0; i < otherArgs.length - 1; ++i) {
          FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job,
          new Path(otherArgs[otherArgs.length - 1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
    }
4.3.2 学生平均成绩
  • 示例
    *程序代码
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text;
import org.apache.hadoopmapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib。output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericoptionsParser;

public class Score {
public static class Map extends

Mapper<LongWritable,Text,Text,IntWritable>
{

public void map(LongWritable key,Text value,Context context)
      throws IoException, InterruptedExcepti {

      string line = value . toString();

  StringTokenizer tokenizerArticle =new StringTokenizer(tokenizerArticle.nextToken());

 while (tokenizerArticle.hasMoreElements ()){

 StringTokenizer tokenizerLine = new StringTokenizer(
  TokenizerArticle.nextToken());
  String strNmae = tokenizerLine.nextToken();
  String strScore = tokenizerLine.nextToken();
  Text name = new Text (strName);
  int scoreInt = Integer.parseInt(strScore);

context.write (name, new Intwritable (scoreInt));
    }
  }
}
   public static class Reduce extends
 Reducer<Text, Intwritable, Text, IntWritable> {

   public void reduce(Text key, Iterable<IntWritable> values,
      Context context) throws IOException, InterruptedException{
    int sum= 0;
    int count = 0;
    Iterator<Intwritable> iterator = values.iterator();

    while (iterator.hasNext()) {
     Sum += iterator.next().get();
     count++;
    }

    int average- (int) sum/count;
    context .write(key, new IntWritable(average));
      }
}

public static void main(String[] args) throws Exception {

  Configuration conf = new Contguration();
  conf.set ("mapred.job.tracker","hadoop:9001");
  String[] ioArgs = new String[]
    {"hdfs://hadoop0:9000/input/score/*","hdfs://hadoop0:9000/output/scoreout"};
String[] otherArgs = new GenericOptionsParset(conf, ioArgs).getRemainingArgs();
   if (otherArgs.length !=2) {
     System.err.println("Usage: Score Average  ");
     System.exit(2);
  Job job = new Job(conf, "Score Average");
  job.setJarByClass(Score.class);

  job.setMapperClass(Map.class);
  job.setCombinerClass(Reduce.class) ;
  job.setReducerClass(Reduce.class) ;
    //设置输出类型
  job.setOutputKeyClass(Text.class);
  job.setOutputvalueClass (Intwritable.class);
    //将输入的数据集分割成小数据块splites,提供一个RecordReder的实现
  job.setInputFormatClass (TextInputFormat.class) ;
    //提供一个RecordWriter的实现,负责数据输出
  job.setOutputFormatClass (TextOutputFormat.class) ;
    //设置输入和输出目录
  FileInputFormat.addInputPath(job, new Path(otherArgs[0]);
  FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
}

Original: https://www.cnblogs.com/lang12/p/15361029.html
Author: Aurora*
Title: Hadoop_MapReduce架构

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/565853/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球