大数据之Hadoop集群中MapReduce的Join操作

如下两张输入表格

order

id pid amount 1001 01 1 1002 02 2 1003 03 3 1004 01 4 1005 02 5 1006 03 6

pd

pid pname 01 小米 02 华为 03 格力

id pname amount 1001 小米 1 1004 小米 4 1002 华为 2 1005 华为 5 1003 格力 3 1006 格力 6

创建一个 TableBean对象,其包含两个文件的所有属性,方便在map阶段封装数据

public class TableBean implements Writable {

    private String id;
    private String pid;
    private Integer amount;
    private String pname;
    private String flag;

    public TableBean() {
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getPid() {
        return pid;
    }

    public void setPid(String pid) {
        this.pid = pid;
    }

    public Integer getAmount() {
        return amount;
    }

    public void setAmount(Integer amount) {
        this.amount = amount;
    }

    public String getPname() {
        return pname;
    }

    public void setPname(String pname) {
        this.pname = pname;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(id);
        dataOutput.writeUTF(pid);
        dataOutput.writeInt(amount);
        dataOutput.writeUTF(pname);
        dataOutput.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.id = dataInput.readUTF();
        this.pid = dataInput.readUTF();
        this.amount = dataInput.readInt();
        this.pname = dataInput.readUTF();
        this.flag = dataInput.readUTF();
    }

    @Override
    public String toString() {
        return id + '\t' + pname + '\t' + amount;
    }
}

在map阶段根据文件名来区分加载对象, setup方法一个文件只会执行一次,在该方法中获取文件名称,在 map方法中根据文件名来执行不同的操作,值得注意的是属性不能为默认的 NULL

public class TableMapper extends Mapper {

    private String filename;
    private Text outK = new Text();
    private TableBean outV = new TableBean();
    @Override
    protected void setup(Mapper.Context context) throws IOException, InterruptedException {
        // 初始化
        FileSplit inputSplit = (FileSplit) context.getInputSplit();
        filename = inputSplit.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        //判断是哪个文件
        if (filename.contains("order")) {
            String[] split = line.split("\t");
            // 封装k v
            outK.set(split[1]);
            outV.setId(split[0]);
            outV.setPid(split[1]);
            outV.setAmount(Integer.parseInt(split[2]));
            outV.setPname("");
            outV.setFlag("order");
        } else {
            String[] split = line.split("\t");
            // 封装k v
            outK.set(split[0]);
            outV.setId("");
            outV.setPid(split[0]);
            outV.setAmount(0);
            outV.setPname(split[1]);
            outV.setFlag("pd");
        }
        //写出
        context.write(outK, outV);
    }
}

由于使用 pidkey,两个表中相同的 pid会进入同一个 reduce,再根据 flag判断是哪个表中的数据,如果是 order将其保存到数组中,如果是 pd则获取其 pname,循环 order数组赋值。值得注意的是,由于 values并非 Java中默认的迭代器,如果只是 add(value)赋值的是地址,无法达到预期要求。

public class TableReducer extends Reducer {

    @Override
    protected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
        ArrayList orderBeans = new ArrayList<>();
        TableBean pBean = new TableBean();
        for (TableBean value : values) {
            if ("order".equals(value.getFlag())) {
                TableBean tempTableBean = new TableBean();
                try {
                    BeanUtils.copyProperties(tempTableBean, value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
                orderBeans.add(tempTableBean);
            } else {
                try {
                    BeanUtils.copyProperties(pBean, value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }
        // 遍历orderBeans
        for (TableBean orderBean : orderBeans) {
            orderBean.setPname(pBean.getPname());
            context.write(orderBean, NullWritable.get());
        }

    }
}

总结:如果数据量非常大,所有的压力都会来到 reduce阶段,这样会导致数据倾斜。为了防止发生,可以将 Join操作放到 map阶段,因为 map阶段处理的数据都是块大小 128M

Map Join适用与一张十分小、一张很大的表的场景

在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。

采用DistributedCache的方法:

(1)在Mapper的setup阶段,将文件读取到缓存集合中

(2)在Driver驱动类中加载缓存

// 缓存普通文件到Task运行节点
job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
// 如果是集群运行,需要设置HDFS路径
job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));
public class MapJoinMapper extends Mapper {

    private HashMap pdMap = new HashMap<>();
    private Text outK = new Text();

    @Override
    protected void setup(Mapper.Context context) throws IOException, InterruptedException {
        // 获取缓存文件,并把文件内容封装到集合中 pd.txt
        URI[] cacheFiles = context.getCacheFiles();
        URI cacheFile = cacheFiles[0];
        FileSystem fs = FileSystem.get(context.getConfiguration());
        FSDataInputStream fis = fs.open(new Path(cacheFile));

        // 从流中读取数据
        BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
        String line;
        while (StringUtils.isNotEmpty(line=reader.readLine())) {
            // 切割
            String[] fields = line.split("\t");
            pdMap.put(fields[0], fields[1]);
        }
        IOUtils.closeStream(reader);
    }

    @Override
    protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {

        // 处理 order.txt
        String line = value.toString();
        String[] split = line.split("\t");
        String pName = pdMap.get(split[1]);

        outK.set(split[0] + "\t" + pName + "\t" + split[2]);
        context.write(outK, NullWritable.get());
    }
}
public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
        Job job = Job.getInstance(new Configuration());
        job.setMapperClass(MapJoinMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        job.addCacheFile(new URI("file:///D:/hadoop/input/mapjoincache/pd.txt"));
        // 不需要reduce阶段
        job.setNumReduceTasks(0);

        FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\input\\mapjoin"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output\\mapjoin"));

        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}

Original: https://www.cnblogs.com/sw-code/p/16387043.html
Author: sw-code
Title: 大数据之Hadoop集群中MapReduce的Join操作

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/585570/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • PHP 获取数组长度

    count()函数,默认是获取一维数组,参数为:COUNT_NORMAL,添加第二个参数:COUNT_RECURSIVE,则可以获取多维关联数组的长度(意思为递归获取),例如:co…

    Linux 2023年6月7日
    0117
  • 没学习的恐惧

    已经三个月没有接触新知识,每次上线之后就有一些bug,觉得自己作为一个点点点的测试很失败。我很迷茫,我都不知道自己一天是如何过的,反正就觉得时间过的很快,而且发现什么事都没做一天就…

    Linux 2023年6月8日
    090
  • 访问权限控制

    一.编译单元 1-1 概念 一个Java源代码文件通常被称为一个编译单元,每个编译单元的后缀需是.java,并且每个编译单元中最多只能有1个public类(当然,可以为0个)。 1…

    Linux 2023年6月8日
    0107
  • jarwarSpringBoot加载包内外资源的方式,告别FileNotFoundException吧

    工作中常常会用到文件加载,然后又经常忘记,印象不深,没有系统性研究过,从最初的war包项目到现在的springboot项目,从加载外部文件到加载自身jar包内文件,也发生了许多变化…

    Linux 2023年6月6日
    098
  • LeetCode-16. 最接近的三数之和

    题目来源 题目详情 给你一个长度为 n 的整数数组 nums和 一个目标值 target。请你从 nums 中选出三个整数,使它们的和与 target 最接近。 返回这三个数的和。…

    Linux 2023年6月7日
    0100
  • Redis主从复制的配置和实现原理

    Redis的持久化功能在一定程度上保证了数据的安全性,即便是服务器宕机的情况下,也可以保证数据的丢失非常少。通常,为了避免服务的单点故障,会把数据复制到多个副本放在不同的服务器上,…

    Linux 2023年5月28日
    082
  • vue3中iview框架初始值赋值问题

    问题:今天有个小伙伴说我们前端有个页面异常奇怪,没有显示数据。 开发小伙伴进行查询之后(非专业前端),明明进行了绑定,页面死活无法渲染,归属于灵异事件了,对于这种灵异事件,我总会想…

    Linux 2023年6月7日
    086
  • 1. 文件与I/O

    内核是如何处理系统调用 每个系统调用被赋予了一个系统调用号 在i386平台上,执行一个系统调用是通过int 0X80指令完成的 eax存放系统的调用号 ebx,ecx,edx,es…

    Linux 2023年6月6日
    084
  • 操作系统实战45讲 -04 业界成熟的内核架构长什么样

    Linux 系统性能稳定且开源。在很多公司企业网络中被当作服务器来使用,这是 Linux 的一大亮点,也是它得以壮大的关键。 上图中大致分为五大重要组件,每个组件又分成许多模块从上…

    Linux 2023年6月7日
    0116
  • 统计算法_概率基础

    本次有以下函数 1、简单边际概率 2、联合概率 3、条件概率 4、随机变量期望值 5、随机变量方差 6、随机变量协方差 7、联合协方差 8、组合期望回报 9、投资组合风险 说概率前…

    Linux 2023年6月6日
    081
  • redis普通手动启动安装步骤

    redis还提供了普通安装,也就是不写入开机启动,步骤如下: 部署的是redis-4.0.10版本, 部署步骤如下: 1、上传到服务器,解压 tar zxvf redis-4.0….

    Linux 2023年5月28日
    096
  • phpcmsv9 后台统计编辑发稿数量

    直切正题: 每个人,每个栏目,发稿数量统计 SELECT a.realname AS 姓名, c.catname AS 栏目名称, count(1) AS 发稿量FROM v9_n…

    Linux 2023年6月13日
    091
  • 【深度学习】RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cuda:0 and cpu!

    报错代码: if __name__ == ‘__main__’: model = Perception(2, 3, 2).cuda() input = torch.randn(4,…

    Linux 2023年6月13日
    088
  • 双系统设置默认启动系统

    在原有windows系统下,我们装完Ubuntu系统后,会出现Ubuntu的grub引导界面(倒计时后自动进入Ubuntu),如下图所示。 假设我们需要将倒计时后默认启动的系统改为…

    Linux 2023年5月27日
    0102
  • 面试题:Java中为什么只有值传递?

    作者:小牛呼噜噜 | https://xiaoniuhululu.com计算机内功、JAVA底层、面试相关资料等更多精彩文章在公众号「小牛呼噜噜 」 经典的问题 形参&实参…

    Linux 2023年6月6日
    0129
  • C++ 之多态总结

    前言 最近为了完成数据库系统的实验,又复习起了《C++ Primer》,上一次看这本巨著也是大二下的六月份,那时看面向对象程序编程这一章还云里雾里的,没有领会多态的奥妙,学完 Ja…

    Linux 2023年6月7日
    085
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球