Hadoop多文件合并

Hadoop多文件合并

文件合并

  • map的小表联大表合并

    • 使用场景;

      只适用于两个文件,并且一个大文件另一个小文件

  • reducer 多文件合并

实现思路

  • map
    1. job对象添加缓存文件,在mapper程序中读取缓存文件,在mapper端完成数据合并
  • reduce
    1. mapper端获取文件名识别文件,设置文件编号,输出key为连接条件,value为bean;
    2. reducer端遍历value的bean,通过bean的文件编号提取属性,合并所有属性,输出数据,完成数据合并。

案例

数据准备

water.txt
3620300500,校医院南+,
3290100100,XXX校医院,
3620303100,车队+,
3620300100,XXX花圃+,
3210100200,XXX成教院XXX分院,
3421200300,XXX田径场厕所,
3363000100,离退休活动室,
3313800500,XXXL馆,
3370100100,XXXL楼,
3313200200,XXXS馆,
3370300100,XXXK,
3370200100,XXXK酒店,
3030100100,XXX体育馆,
3210100100,XXX干训楼,

water_info.txt
3620300500,30089,100,
3290100100,30023,50,
3620303100,30094,15,
3620300100,30090,150,
3210100200,30022,50,
3421200300,30045,50,
3363000100,30043,40,
3313800500,30041,50,
3370100100,30027,80,
3313200200,30024,80,
3370300100,30012,50,
3370200100,30011,80,
3030100100,30002,80,
3210100100,30001,80,

water_meter.txt
3620300500,416X,,,,
3290100100,416X,41601X,,,
3620303100,416X,41601X,4160101T,,
3620300100,405X,,,,
3210100200,405X,40511X,,,
3421200300,405X,40511X,4051101T,,
3363000100,405X,40509T,,,
3313800500,405X,40508T,,,
3370100100,405X,40507T,,,
3313200200,405X,40506T,,,
3370300100,405X,40504T,,,
3370200100,405X,40503T,,,
3030100100,405X,40502T,,,
3210100100,405X,40501T,,,

需求分析

water.txt water_info.txt water_meter.txt三文件ID字段相同,需要完成三个文件的数据合并。

代码实现

WaterBean.java

package hadoop.mr.join;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class WaterBean implements WritableComparable<WaterBean> {

    //水表号
    String ID;
    //水表名
    String name;
    // 用户号
    String userID;
    // 口径
    String caliber;
    // 一级表计编码
    String primaryMeterCcode;
    // 二级表计编码
    String secondaryMeterCcode;
    // 三级表计编码
    String threeMeterCcode;
    // 四级表计编码
    String fourMeterCcode;
    // 文件标识
    Integer mark;


    public String getID() {
        return ID;
    }

    public void setID(String ID) {
        this.ID = ID;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getUserID() {
        return userID;
    }

    public void setUserID(String userID) {
        this.userID = userID;
    }

    public String getCaliber() {
        return caliber;
    }

    public void setCaliber(String caliber) {
        this.caliber = caliber;
    }

    public String getPrimaryMeterCcode() {
        return primaryMeterCcode;
    }

    public void setPrimaryMeterCcode(String primaryMeterCcode) {
        this.primaryMeterCcode = primaryMeterCcode;
    }

    public String getSecondaryMeterCcode() {
        return secondaryMeterCcode;
    }

    public void setSecondaryMeterCcode(String secondaryMeterCcode) {
        this.secondaryMeterCcode = secondaryMeterCcode;
    }

    public String getThreeMeterCcode() {
        return threeMeterCcode;
    }

    public void setThreeMeterCcode(String threeMeterCcode) {
        this.threeMeterCcode = threeMeterCcode;
    }

    public String getFourMeterCcode() {
        return fourMeterCcode;
    }

    public void setFourMeterCcode(String fourMeterCcode) {
        this.fourMeterCcode = fourMeterCcode;
    }

    public Integer getMark() {
        return mark;
    }

    public void setMark(Integer mark) {
        this.mark = mark;
    }

    @Override
    public int compareTo(WaterBean o) {
        return this.getID().compareTo(o.getID());
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(ID);
        out.writeUTF(name);
        out.writeUTF(userID);
        out.writeUTF(caliber);
        out.writeUTF(primaryMeterCcode);
        out.writeUTF(secondaryMeterCcode);
        out.writeUTF(threeMeterCcode);
        out.writeUTF(fourMeterCcode);
        out.writeInt(mark);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        ID=in.readUTF();
        name=in.readUTF();
        userID=in.readUTF();
        caliber=in.readUTF();
        primaryMeterCcode=in.readUTF();
        secondaryMeterCcode=in.readUTF();
        threeMeterCcode=in.readUTF();
        fourMeterCcode=in.readUTF();
        mark=in.readInt();
    }

    @Override
    public String toString() {
        return "WaterBean{" +
                "ID='" + ID + '\'' +
                ", name='" + name + '\'' +
                ", userID='" + userID + '\'' +
                ", caliber='" + caliber + '\'' +
                ", primaryMeterCcode='" + primaryMeterCcode + '\'' +
                ", secondaryMeterCcode='" + secondaryMeterCcode + '\'' +
                ", threeMeterCcode='" + threeMeterCcode + '\'' +
                ", fourMeterCcode='" + fourMeterCcode + '\'' +
                '}';
    }
}

需要注意的是Bean的属性必须进行初始化,否则可能出现空指针异常。

WaterDrive.java

package hadoop.mr.join;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.Iterator;

/**
 * reducer 合并数据
 * 多文件合并
 */
public class WaterDrive {

    public static class WaterMapper extends Mapper<LongWritable, Text, Text, WaterBean> {
        /**
         * map端实现数据标记,标记完后输出key和value
         * @param key
         * @param value
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //获取文件名
            FileSplit fileSplit = (FileSplit) context.getInputSplit();
            String filename = fileSplit.getPath().getName();
            //切割数据
            String[] values = value.toString().split(",");
            // water的bean
            WaterBean waterBean = new WaterBean();
            // water
            if("water.txt".equals(filename)){
                waterBean.setID(values[0]);
                waterBean.setName(values[1]);
                waterBean.setMark(1);
            }else if("water_info.txt".equals(filename)){
                // water_info
                waterBean.setID(values[0]);
                waterBean.setUserID(values[1]);
                waterBean.setCaliber(values[2]);
                waterBean.setMark(2);
            }else {
                // water_meter
                waterBean.setID(values[0]);
                waterBean.setPrimaryMeterCcode(values[1]);
                if (values.length>=3){
                    waterBean.setSecondaryMeterCcode(values[2]);
                }
                if (values.length>=4){
                    waterBean.setThreeMeterCcode(values[3]);
                }
                if (values.length>=5){
                    waterBean.setFourMeterCcode(values[4]);
                }
                waterBean.setMark(3);
            }
            context.write(new Text(waterBean.getID()),waterBean);
        }
    }

    public static class WaterReducer extends Reducer<Text, WaterBean, NullWritable, WaterBean> {
        /**
         * reduce 合并文件
         * 相同的key 视为 一条数据
         *  通过标记识别文件
         *  合并每个文件的属性
         * @param key
         * @param values
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text key, Iterable<WaterBean> values, Context context) throws IOException, InterruptedException {
            Iterator<WaterBean> iterator = values.iterator();
            WaterBean bean = new WaterBean();
            while (iterator.hasNext()) {
                WaterBean waterBean = iterator.next();
                if (waterBean.getMark()==1){
                    bean.setID(waterBean.getID());
                    bean.setName(waterBean.getName());
                }else if(waterBean.getMark()==2){
                    bean.setUserID(waterBean.getUserID());
                    bean.setCaliber(waterBean.getCaliber());
                }else {
                    bean.setPrimaryMeterCcode(waterBean.getPrimaryMeterCcode());
                    bean.setSecondaryMeterCcode(waterBean.getSecondaryMeterCcode());
                    bean.setThreeMeterCcode(waterBean.getThreeMeterCcode());
                    bean.setFourMeterCcode(waterBean.getFourMeterCcode());
                }
            }
            context.write(NullWritable.get(),bean);
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 设置文件路径
        args = new String[]{"D:\\BigData\\hadoop\\mr\\join\\left\\input", "D:\\BigData\\hadoop\\mr\\join\\left\\output"};
        // 创建job任务
        Job job = Job.getInstance(new Configuration(), WaterDrive.class.getName());
        // job设置运行类
        job.setJarByClass(WaterDrive.class);
        // job设置Mapper类
        job.setMapperClass(WaterMapper.class);
        // Mapper设置key类型
        job.setMapOutputKeyClass(Text.class);
        // Mapper设置value类型
        job.setMapOutputValueClass(WaterBean.class);
        // job设置Reducer类
        job.setReducerClass(WaterReducer.class);
        // 设置输出key类型
        job.setOutputKeyClass(NullWritable.class);
        // 设置输出value类型
        job.setOutputValueClass(WaterBean.class);
        // 设置输入文件目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 设置输出文件目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 执行job任务
        System.out.println(job.waitForCompletion(true) ? "执行成功" : "执行失败");
    }
}

合并结果

WaterBean{ID='3030100100', name='XXX体育馆', userID='30002', caliber='80', primaryMeterCcode='405X', secondaryMeterCcode='40502T', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3210100100', name='XXX干训楼', userID='30001', caliber='80', primaryMeterCcode='405X', secondaryMeterCcode='40501T', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3210100200', name='XXX成教院XXX分院', userID='30022', caliber='50', primaryMeterCcode='405X', secondaryMeterCcode='40511X', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3290100100', name='XXX校医院', userID='30023', caliber='50', primaryMeterCcode='416X', secondaryMeterCcode='41601X', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3313200200', name='XXXS馆', userID='30024', caliber='80', primaryMeterCcode='405X', secondaryMeterCcode='40506T', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3313800500', name='XXXL馆', userID='30041', caliber='50', primaryMeterCcode='405X', secondaryMeterCcode='40508T', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3363000100', name='离退休活动室', userID='30043', caliber='40', primaryMeterCcode='405X', secondaryMeterCcode='40509T', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3370100100', name='XXXL楼', userID='30027', caliber='80', primaryMeterCcode='405X', secondaryMeterCcode='40507T', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3370200100', name='XXXK酒店', userID='30011', caliber='80', primaryMeterCcode='405X', secondaryMeterCcode='40503T', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3370300100', name='XXXK', userID='30012', caliber='50', primaryMeterCcode='405X', secondaryMeterCcode='40504T', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3421200300', name='XXX田径场厕所', userID='30045', caliber='50', primaryMeterCcode='405X', secondaryMeterCcode='40511X', threeMeterCcode='4051101T', fourMeterCcode=''}
WaterBean{ID='3620300100', name='XXX花圃+', userID='30090', caliber='150', primaryMeterCcode='405X', secondaryMeterCcode='', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3620300500', name='校医院南+', userID='30089', caliber='100', primaryMeterCcode='416X', secondaryMeterCcode='', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3620303100', name='车队+', userID='30094', caliber='15', primaryMeterCcode='416X', secondaryMeterCcode='41601X', threeMeterCcode='4160101T', fourMeterCcode=''}


版权声明:本文为Phalaris_arundinacea原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。