MapReduce
4.1 MapReduce原理
- 4.1.1 概述
- 4.1.2MapReduce 的主要功能
- 4.1.3MapReduce 的处理流程
4.1.1 MapReduce概述
MapReduce是面向大数据并行处理的计算模型、框架和平台,它包含以下三层含义。
(1)MapReduce是一个基于集群的高性能并行计算平台。
(2)MapReduce是一个并行计算与运行软件框架。
(3)MapReduce是一个并行程序设计模型与方法。
4.1.2 MapReduce的主要功能
1.数据划分和计算任务调度
Job 和 Task
Job待处理的数据划分为多个数据块
Task自动调度计算节点来处理相应的数据块
2.数据/代码互定位
本地化数据处理:一个计算节点尽可能处理其本地磁盘上所分布存储的数据,实现代码向数据的迁移。
3.系统优化
中间结果进入Reduce节点前会进行合并处理,一个Reduce节点所处理的数据可能来自多个Map节点。
4.出错检测和恢复
系统将维护数据存储的可靠性,用多备份冗余存储机制提高数据存储的可靠性,并能及时检测和恢复出错数据。
4.1.3 MapReduce的处理流程
Map
Shuffle
Reduce
MapShuffle
ReduceShuffle
1.Map和Reduce函数用户自定义,shuffle是由系统自动实现。
2.最终得到一个分区有序文件(具有Partition值的键值对存储在一起,并按key值进行升序排序)
4.2 Mapreduce 的编程基础
4.2.1内置数据类型
- [Hadoops数据类型的使用]
下面展示一些数据类型
。
数据类型ValueBooleanWritable布尔型ByteWritable单字节数值DoubleWritable双字节数FlowWritable浮点IntWritable整形LongWritable长整型TextUTF-8格式存储的文本NullWritable当中的key或value为NULL时使用ArrayWritable数组
1) Hadoop数据类型的使用
package com.etc;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
public class HadoopDataType {
public static void testText() {
System.out.println("testText");
Text text = new Text ("hello hadoop!");
System.out.println (text .getLength());
System.out.println(text .find("a"));
System. out.println (text.toString());
}
public static void testArrayWritable() {
System. out . println ("testArrayWritable");
ArrayWritable arr = new ArrayWritable (IntWritable.class);
IntWritable year = new IntWritable (2017) ;
IntWritable month = new IntWritable(07) ;
IntWritable date = new IntWritable(01) ;
arr.set (new IntWritable[] { year, month, date });
System.out.println (String . format ("year=%d, month=%d, date=%d",
((IntWritable) arr .get()[0]).get(),
((IntWritable) arr.get()[1]).get(),
((IntWritable)arr.get() [2]) .get()));
}
public static void testMapWritable() {
System.out.println("testMapWritable");
MapWritable map = new MapWritable();
Text k1 = new Text("name");
Text v1 = new Text ("tonny");
Text k2 = new Text ("password");
map.put(k1, v1);
map.put(k2, NullWritable.get());
System.out.println(map.get (k1).toString());
System.out.println(map.get(k2) .toString());
}
public static void main(String[] args) {
testText();
testArrayWritable();
testMapWritable();
}
}
2) 运行结果:

4.2.2 Hadoop MapReduce 架构
- Hadoop MapReduce1.0体系架构 由 Client(客户端)、JobTracker(作业跟踪器)、TaskTracker (任务跟踪器)、Task(任务)组成。

MapReduce 设计的一个核心理念就是”计算向数据靠拢”,而不是传统计算模式的”数据向计算靠拢”。这是因为移动大量数据需要的网络传输开销太大,同时也大大降低了数据处理的效率。
; 4.2.3 MapReduce的工作流程
Input
Map
Sort
Combine
Partition
Reduce
Output
4.3MapReduce示例
4.3.1 WordCont原理

- *流程

- 示例
package com.etc;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.StringTokenizer;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount [...] ");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
4.3.2 学生平均成绩
- 示例
*程序代码
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text;
import org.apache.hadoopmapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib。output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericoptionsParser;
public class Score {
public static class Map extends
Mapper<LongWritable,Text,Text,IntWritable>
{
public void map(LongWritable key,Text value,Context context)
throws IoException, InterruptedExcepti {
string line = value . toString();
StringTokenizer tokenizerArticle =new StringTokenizer(tokenizerArticle.nextToken());
while (tokenizerArticle.hasMoreElements ()){
StringTokenizer tokenizerLine = new StringTokenizer(
TokenizerArticle.nextToken());
String strNmae = tokenizerLine.nextToken();
String strScore = tokenizerLine.nextToken();
Text name = new Text (strName);
int scoreInt = Integer.parseInt(strScore);
context.write (name, new Intwritable (scoreInt));
}
}
}
public static class Reduce extends
Reducer<Text, Intwritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException{
int sum= 0;
int count = 0;
Iterator<Intwritable> iterator = values.iterator();
while (iterator.hasNext()) {
Sum += iterator.next().get();
count++;
}
int average- (int) sum/count;
context .write(key, new IntWritable(average));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Contguration();
conf.set ("mapred.job.tracker","hadoop:9001");
String[] ioArgs = new String[]
{"hdfs://hadoop0:9000/input/score/*","hdfs://hadoop0:9000/output/scoreout"};
String[] otherArgs = new GenericOptionsParset(conf, ioArgs).getRemainingArgs();
if (otherArgs.length !=2) {
System.err.println("Usage: Score Average ");
System.exit(2);
Job job = new Job(conf, "Score Average");
job.setJarByClass(Score.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class) ;
job.setReducerClass(Reduce.class) ;
//设置输出类型
job.setOutputKeyClass(Text.class);
job.setOutputvalueClass (Intwritable.class);
//将输入的数据集分割成小数据块splites,提供一个RecordReder的实现
job.setInputFormatClass (TextInputFormat.class) ;
//提供一个RecordWriter的实现,负责数据输出
job.setOutputFormatClass (TextOutputFormat.class) ;
//设置输入和输出目录
FileInputFormat.addInputPath(job, new Path(otherArgs[0]);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Original: https://www.cnblogs.com/lang12/p/15361029.html
Author: Aurora*
Title: Hadoop_MapReduce架构
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/565853/
转载文章受原作者版权保护。转载请注明原作者出处!