如下两张输入表格
order
表
id pid amount 1001 01 1 1002 02 2 1003 03 3 1004 01 4 1005 02 5 1006 03 6
pd
表
pid pname 01 小米 02 华为 03 格力
id pname amount 1001 小米 1 1004 小米 4 1002 华为 2 1005 华为 5 1003 格力 3 1006 格力 6
创建一个 TableBean
对象,其包含两个文件的所有属性,方便在map阶段封装数据
public class TableBean implements Writable {
private String id;
private String pid;
private Integer amount;
private String pname;
private String flag;
public TableBean() {
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public Integer getAmount() {
return amount;
}
public void setAmount(Integer amount) {
this.amount = amount;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(id);
dataOutput.writeUTF(pid);
dataOutput.writeInt(amount);
dataOutput.writeUTF(pname);
dataOutput.writeUTF(flag);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readUTF();
this.pid = dataInput.readUTF();
this.amount = dataInput.readInt();
this.pname = dataInput.readUTF();
this.flag = dataInput.readUTF();
}
@Override
public String toString() {
return id + '\t' + pname + '\t' + amount;
}
}
在map阶段根据文件名来区分加载对象, setup
方法一个文件只会执行一次,在该方法中获取文件名称,在 map
方法中根据文件名来执行不同的操作,值得注意的是属性不能为默认的 NULL
。
public class TableMapper extends Mapper {
private String filename;
private Text outK = new Text();
private TableBean outV = new TableBean();
@Override
protected void setup(Mapper.Context context) throws IOException, InterruptedException {
// 初始化
FileSplit inputSplit = (FileSplit) context.getInputSplit();
filename = inputSplit.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
String line = value.toString();
//判断是哪个文件
if (filename.contains("order")) {
String[] split = line.split("\t");
// 封装k v
outK.set(split[1]);
outV.setId(split[0]);
outV.setPid(split[1]);
outV.setAmount(Integer.parseInt(split[2]));
outV.setPname("");
outV.setFlag("order");
} else {
String[] split = line.split("\t");
// 封装k v
outK.set(split[0]);
outV.setId("");
outV.setPid(split[0]);
outV.setAmount(0);
outV.setPname(split[1]);
outV.setFlag("pd");
}
//写出
context.write(outK, outV);
}
}
由于使用 pid
为 key
,两个表中相同的 pid
会进入同一个 reduce
,再根据 flag
判断是哪个表中的数据,如果是 order
将其保存到数组中,如果是 pd
则获取其 pname
,循环 order
数组赋值。值得注意的是,由于 values
并非 Java
中默认的迭代器,如果只是 add(value)
赋值的是地址,无法达到预期要求。
public class TableReducer extends Reducer {
@Override
protected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
ArrayList orderBeans = new ArrayList<>();
TableBean pBean = new TableBean();
for (TableBean value : values) {
if ("order".equals(value.getFlag())) {
TableBean tempTableBean = new TableBean();
try {
BeanUtils.copyProperties(tempTableBean, value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
orderBeans.add(tempTableBean);
} else {
try {
BeanUtils.copyProperties(pBean, value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
// 遍历orderBeans
for (TableBean orderBean : orderBeans) {
orderBean.setPname(pBean.getPname());
context.write(orderBean, NullWritable.get());
}
}
}
总结:如果数据量非常大,所有的压力都会来到 reduce
阶段,这样会导致数据倾斜。为了防止发生,可以将 Join
操作放到 map
阶段,因为 map
阶段处理的数据都是块大小 128M
。
Map Join适用与一张十分小、一张很大的表的场景
在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。
采用DistributedCache的方法:
(1)在Mapper的setup阶段,将文件读取到缓存集合中
(2)在Driver驱动类中加载缓存
// 缓存普通文件到Task运行节点
job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
// 如果是集群运行,需要设置HDFS路径
job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));
public class MapJoinMapper extends Mapper {
private HashMap pdMap = new HashMap<>();
private Text outK = new Text();
@Override
protected void setup(Mapper.Context context) throws IOException, InterruptedException {
// 获取缓存文件,并把文件内容封装到集合中 pd.txt
URI[] cacheFiles = context.getCacheFiles();
URI cacheFile = cacheFiles[0];
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataInputStream fis = fs.open(new Path(cacheFile));
// 从流中读取数据
BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
String line;
while (StringUtils.isNotEmpty(line=reader.readLine())) {
// 切割
String[] fields = line.split("\t");
pdMap.put(fields[0], fields[1]);
}
IOUtils.closeStream(reader);
}
@Override
protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
// 处理 order.txt
String line = value.toString();
String[] split = line.split("\t");
String pName = pdMap.get(split[1]);
outK.set(split[0] + "\t" + pName + "\t" + split[2]);
context.write(outK, NullWritable.get());
}
}
public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
Job job = Job.getInstance(new Configuration());
job.setMapperClass(MapJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.addCacheFile(new URI("file:///D:/hadoop/input/mapjoincache/pd.txt"));
// 不需要reduce阶段
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\input\\mapjoin"));
FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output\\mapjoin"));
boolean b = job.waitForCompletion(true);
System.exit(b?0:1);
}
}
Original: https://www.cnblogs.com/sw-code/p/16387043.html
Author: sw-code
Title: 大数据之Hadoop集群中MapReduce的Join操作
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/585570/
转载文章受原作者版权保护。转载请注明原作者出处!