hadoop多文件格式输入

版本号:

CDH5.0.0 (hdfs:2.3。mapreduce:2.3,yarn:2.3)

hadoop多文件格式输入,一般能够使用MultipleInputs类指定不同的输入文件路径以及输入文件格式。

例如,有如下要求:

[En]

For example, there are requirements such as the following:

有两条数据可用:

[En]

Two pieces of data are available:

phone:

123,good number
124,common number
125,bad number

user:

zhangsan,123
lisi,124
wangwu,125

如今须要把user和phone依照phone number连接起来,得到以下的结果:

zhangsan,123,good number
lisi,124,common number
wangwu,125,bad number

然后,您可以使用MultipleInput,其中用户和电话分别上载到HDFS文件夹/Multiple/User/User,/Multiple/Phone/Phone。

[En]

Then you can use MultipleInputs, where user and phone are uploaded to the hdfs folder, / multiple/user/user, / multiple/phone/phone, respectively.

设计的MultipleDriver例如以下:

package multiple.input;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
/**
 * input1(/multiple/user/user):
 * username,user_phone
 *
 * input2(/multiple/phone/phone):
 *  user_phone,description
 *
 * output: username,user_phone,description
 *
 * @author fansy
 *
 */
public class MultipleDriver extends Configured implements Tool{
//  private  Logger log = LoggerFactory.getLogger(MultipleDriver.class);

    private String input1=null;
    private String input2=null;
    private String output=null;
    private String delimiter=null;

    public static void main(String[] args) throws Exception {
        Configuration conf=new Configuration();
//      conf.set("fs.defaultFS", "hdfs://node33:8020");
//        conf.set("mapreduce.framework.name", "yarn");
//        conf.set("yarn.resourcemanager.address", "node33:8032");

        ToolRunner.run(conf, new MultipleDriver(), args);
    }

    @Override
    public int run(String[] arg0) throws Exception {
        configureArgs(arg0);
        checkArgs();

        Configuration conf= getConf();
        conf.set("delimiter", delimiter);
         @SuppressWarnings("deprecation")
        Job job = new Job(conf, "merge user and phone information ");
        job.setJarByClass(MultipleDriver.class);

        job.setReducerClass(MultipleReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlagStringDataType.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        job.setNumReduceTasks(1);
        MultipleInputs.addInputPath(job, new Path(input1), TextInputFormat.class, Multiple1Mapper.class);
        MultipleInputs.addInputPath(job, new Path(input2), TextInputFormat.class, Multiple2Mapper.class);
        FileOutputFormat.setOutputPath(job, new Path(output));

        int res = job.waitForCompletion(true) ? 0 : 1;
        return res;
    }

    /**
     * check the args
     */
    private void checkArgs() {
        if(input1==null||"".equals(input1)){
            System.out.println("no user input...");
            printUsage();
            System.exit(-1);
        }
        if(input2==null||"".equals(input2)){
            System.out.println("no phone input...");
            printUsage();
            System.exit(-1);
        }
        if(output==null||"".equals(output)){
            System.out.println("no output...");
            printUsage();
            System.exit(-1);
        }
        if(delimiter==null||"".equals(delimiter)){
            System.out.println("no delimiter...");
            printUsage();
            System.exit(-1);
        }

    }

    /**
     * configuration the args
     * @param args
     */
    private void configureArgs(String[] args) {
        for(int i=0;i这里指定两个mapper和一个reducer,两个mapper分别相应处理user和phone的数据,分别例如以下:mapper1(处理user数据):package multiple.input;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * input :
 * username,phone
 *
 * output:
 *   -->
 * @author fansy
 *
 */
public class Multiple1Mapper extends Mapper{
    private  Logger log = LoggerFactory.getLogger(Multiple1Mapper.class);
    private String delimiter=null; // default is comma
    @Override
    public void setup(Context cxt){
        delimiter= cxt.getConfiguration().get("delimiter", ",");
        log.info("This is the begin of Multiple1Mapper");
    }

    @Override
    public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
        String info= new String(value.getBytes(),"UTF-8");
        String[] values = info.split(delimiter);
        if(values.length!=2){
            return;
        }
        log.info("key-->"+values[1]+"=========value-->"+"[0,"+values[0]+"]");
        cxt.write(new Text(values[1]), new FlagStringDataType(0,values[0]));
    }
}
mapper2(处理phone数据):package multiple.input;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * input :
 * phone,description
 *
 * output:
 *   -->
 * @author fansy
 *
 */
public class Multiple2Mapper extends Mapper{
    private  Logger log = LoggerFactory.getLogger(Multiple2Mapper.class);
    private String delimiter=null; // default is comma
    @Override
    public void setup(Context cxt){
        delimiter= cxt.getConfiguration().get("delimiter", ",");
        log.info("This is the begin of Multiple2Mapper");
    }

    @Override
    public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
        String[] values= value.toString().split(delimiter);
        if(values.length!=2){
            return;
        }
        log.info("key-->"+values[0]+"=========value-->"+"[1,"+values[1]+"]");
        cxt.write(new Text(values[0]), new FlagStringDataType(1,values[1]));
    }
}
这里的FlagStringDataType是自己定义的:package multiple.input;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.primitives.Ints;

public class FlagStringDataType implements WritableComparable {
    private  Logger log = LoggerFactory.getLogger(FlagStringDataType.class);
  private String value;
  private int flag;
  public FlagStringDataType() {
  }

  public FlagStringDataType(int flag,String value) {
    this.value = value;
    this.flag=flag;
  }

  public String get() {
    return value;
  }

  public void set(String value) {
    this.value = value;
  }

  @Override
  public boolean equals(Object other) {
    return other != null && getClass().equals(other.getClass())
            && ((FlagStringDataType) other).get() == value
            &&((FlagStringDataType) other).getFlag()==flag;
  }

  @Override
  public int hashCode() {
    return Ints.hashCode(flag)+value.hashCode();
  }

  @Override
  public int compareTo(FlagStringDataType other) {

    if (flag >= other.flag) {
      if (flag > other.flag) {
        return 1;
      }
    } else {
      return -1;
    }
    return value.compareTo(other.value);
  }

  @Override
  public void write(DataOutput out) throws IOException {
    log.info("in write()::"+"flag:"+flag+",vlaue:"+value);
    out.writeInt(flag);
    out.writeUTF(value);
  }

  @Override
  public void readFields(DataInput in) throws IOException {
      log.info("in read()::"+"flag:"+flag+",vlaue:"+value);
      flag=in.readInt();
      value = in.readUTF();
      log.info("in read()::"+"flag:"+flag+",vlaue:"+value);
  }

public int getFlag() {
    return flag;
}

public void setFlag(int flag) {
    this.flag = flag;
}

public String toString(){
    return flag+":"+value;
}

}
这定义了类本身,使用一个标志来指定哪些数据。价值是与其价值相对应的。这样做的好处是可以从Reduce端的FLAG值推断出输出位置。这样的设计可以极大地帮助集成多个输入,正如在mahout中所看到的那样。缩减器(汇总输出数据):包乘数。输入<details><summary>*<font color='gray'>[En]</font>*</summary>*<font color='gray'>This defines the class itself, using a flag to specify which data. And value is corresponding to its value. The advantage of this is that the output location can be inferred from the value of flag on the reduce side. Such a design can be of great help to the integration of multiple inputs, as can be seen in mahout. Reducer (summary output data): package multiple.input</font>*</details>

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultipleReducer extends Reducer{
    private  Logger log = LoggerFactory.getLogger(MultipleReducer.class);
    private String delimiter=null; // default is comma
    @Override
    public void setup(Context cxt){
        delimiter= cxt.getConfiguration().get("delimiter", ",");
    }
    @Override
    public void reduce(Text key, Iterable values,Context cxt) throws IOException,InterruptedException{
        log.info("================");
        log.info("         =======");
        log.info("              ==");
        String[] value= new String[3];
        value[2]=key.toString();
        for(FlagStringDataType v:values){
            int index= v.getFlag();
            log.info("index:"+index+"-->value:"+v.get());
            value[index]= v.get();
        }
        log.info("              ==");
        log.info("         =======");
        log.info("================");
        cxt.write(new Text(value[2]+delimiter+value[0]+delimiter+value[1]),NullWritable.get());
    }
}
这种设计的优点是可以对不同的输入数据进行不同的逻辑处理。并且不同的输入数据可以是序列文件的格式。这里有一种方法可以与上面的进行比较。这有点不够用。但有一定的借鉴意义。第一个是DIVER:Package Multiple.int。<details><summary>*<font color='gray'>[En]</font>*</summary>*<font color='gray'>The advantage of this design is that it can take different logical processing for different input data. And different input data can be in the format of sequence files. Here is a way to compare with the above. It's a little inadequate. But it can be used for reference. The first is Driver:package multiple.input.</font>*</details>

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
/**
 * input1(/multiple/user/user):
 * username,user_phone
 *
 * input2(/multiple/phone/phone):
 *  user_phone,description
 *
 * output: username,user_phone,description
 *
 * @author fansy
 *
 */
public class MultipleDriver2 extends Configured implements Tool{
//  private  Logger log = LoggerFactory.getLogger(MultipleDriver.class);

    private String input1=null;
    private String input2=null;
    private String output=null;
    private String delimiter=null;

    public static void main(String[] args) throws Exception {
        Configuration conf=new Configuration();
//      conf.set("fs.defaultFS", "hdfs://node33:8020");
//        conf.set("mapreduce.framework.name", "yarn");
//        conf.set("yarn.resourcemanager.address", "node33:8032");

        ToolRunner.run(conf, new MultipleDriver2(), args);
    }

    @Override
    public int run(String[] arg0) throws Exception {
        configureArgs(arg0);
        checkArgs();

        Configuration conf= getConf();
        conf.set("delimiter", delimiter);
         @SuppressWarnings("deprecation")
        Job job = new Job(conf, "merge user and phone information ");
        job.setJarByClass(MultipleDriver2.class);
        job.setMapperClass(MultipleMapper.class);
        job.setReducerClass(MultipleReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlagStringDataType.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        job.setNumReduceTasks(1);
        FileInputFormat.addInputPath(job, new Path(input1));
        FileInputFormat.addInputPath(job, new Path(input2));
        FileOutputFormat.setOutputPath(job, new Path(output));

        int res = job.waitForCompletion(true) ? 0 : 1;
        return res;
    }

    /**
     * check the args
     */
    private void checkArgs() {
        if(input1==null||"".equals(input1)){
            System.out.println("no user input...");
            printUsage();
            System.exit(-1);
        }
        if(input2==null||"".equals(input2)){
            System.out.println("no phone input...");
            printUsage();
            System.exit(-1);
        }
        if(output==null||"".equals(output)){
            System.out.println("no output...");
            printUsage();
            System.exit(-1);
        }
        if(delimiter==null||"".equals(delimiter)){
            System.out.println("no delimiter...");
            printUsage();
            System.exit(-1);
        }

    }

    /**
     * configuration the args
     * @param args
     */
    private void configureArgs(String[] args) {
        for(int i=0;i这里加入路径直接使用FileInputFormat加入输入路径,这样的话,针对不同的输入数据的不同业务逻辑能够在mapper中先推断眼下正在处理的是那个数据。然后依据其路径来进行相应的业务逻辑处理:package multiple.input;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
 * input1 :
 * username,phone
 *
 * input2
 * phone,description
 *
 * output:
 *   -->
 *   -->
 * @author fansy
 *
 */
public class MultipleMapper extends Mapper{

    private String delimiter=null; // default is comma
    private boolean flag=false;
    @Override
    public void setup(Context cxt){
        delimiter= cxt.getConfiguration().get("delimiter", ",");
        InputSplit input=cxt.getInputSplit();
        String filename=((FileSplit) input).getPath().getParent().getName();
        if("user".equals(filename)){
            flag=true;
        }
    }

    @Override
    public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
        String[] values= value.toString().split(delimiter);
        if(values.length!=2){
            return;
        }
        if(flag){
            cxt.write(new Text(values[1]), new FlagStringDataType(0,values[0]));
        }else{
            cxt.write(new Text(values[0]), new FlagStringDataType(1,values[1]));
        }
    }
}
作为一个整体。事实上,这种方法没有第一种方法好,第一种方法需要在每个MAP函数中进行推断。比第一次多得多的手术;同时。对于不同的序列文件,这种方法不能处理(在不同类型的键和值的情况下)。所以对于多文件格式的输入,不妨使用第一种方法。分享,成长,快乐。<details><summary>*<font color='gray'>[En]</font>*</summary>*<font color='gray'>As a whole. In fact, this approach is not as good as the first, which needs to be inferred in every map function. A lot more operations than the first; at the same time. For different sequence files, this approach can not be handled (in the case of different types of Key and value). So for multi-file format input, you might as well use the first method. Share, grow, be happy.</font>*</details>

Original: https://www.cnblogs.com/zsychanpin/p/7403569.html
Author: zsychanpin
Title: hadoop多文件格式输入

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/7177/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

免费咨询
免费咨询
扫码关注
扫码关注
联系站长

站长Johngo!

大数据和算法重度研究者!

持续产出大数据、算法、LeetCode干货,以及业界好资源!

2022012703491714

微信来撩,免费咨询:xiaozhu_tec

分享本页
返回顶部
最近整理资源【免费获取】:   👉 程序员最新必读书单  | 👏 互联网各方向面试题下载 | ✌️计算机核心资源汇总