Hadoop多文件合并
文件合并
map的小表联大表合并
使用场景;
只适用于两个文件,并且一个大文件另一个小文件
reducer 多文件合并
实现思路
- map
- 在
job
对象添加缓存文件,在mapper
程序中读取缓存文件
,在mapper
端完成数据合并
- 在
- reduce
- 在
mapper
端获取文件名识别文件,设置文件编号,输出key为连接条件,value为bean; - 在
reducer
端遍历value的bean,通过bean的文件编号提取属性,合并所有属性,输出数据,完成数据合并。
- 在
案例
数据准备
water.txt
3620300500,校医院南+,
3290100100,XXX校医院,
3620303100,车队+,
3620300100,XXX花圃+,
3210100200,XXX成教院XXX分院,
3421200300,XXX田径场厕所,
3363000100,离退休活动室,
3313800500,XXXL馆,
3370100100,XXXL楼,
3313200200,XXXS馆,
3370300100,XXXK,
3370200100,XXXK酒店,
3030100100,XXX体育馆,
3210100100,XXX干训楼,
water_info.txt
3620300500,30089,100,
3290100100,30023,50,
3620303100,30094,15,
3620300100,30090,150,
3210100200,30022,50,
3421200300,30045,50,
3363000100,30043,40,
3313800500,30041,50,
3370100100,30027,80,
3313200200,30024,80,
3370300100,30012,50,
3370200100,30011,80,
3030100100,30002,80,
3210100100,30001,80,
water_meter.txt
3620300500,416X,,,,
3290100100,416X,41601X,,,
3620303100,416X,41601X,4160101T,,
3620300100,405X,,,,
3210100200,405X,40511X,,,
3421200300,405X,40511X,4051101T,,
3363000100,405X,40509T,,,
3313800500,405X,40508T,,,
3370100100,405X,40507T,,,
3313200200,405X,40506T,,,
3370300100,405X,40504T,,,
3370200100,405X,40503T,,,
3030100100,405X,40502T,,,
3210100100,405X,40501T,,,
需求分析
water.txt
water_info.txt
water_meter.txt
三文件ID字段相同,需要完成三个文件的数据合并。
代码实现
WaterBean.java
package hadoop.mr.join;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class WaterBean implements WritableComparable<WaterBean> {
//水表号
String ID;
//水表名
String name;
// 用户号
String userID;
// 口径
String caliber;
// 一级表计编码
String primaryMeterCcode;
// 二级表计编码
String secondaryMeterCcode;
// 三级表计编码
String threeMeterCcode;
// 四级表计编码
String fourMeterCcode;
// 文件标识
Integer mark;
public String getID() {
return ID;
}
public void setID(String ID) {
this.ID = ID;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getUserID() {
return userID;
}
public void setUserID(String userID) {
this.userID = userID;
}
public String getCaliber() {
return caliber;
}
public void setCaliber(String caliber) {
this.caliber = caliber;
}
public String getPrimaryMeterCcode() {
return primaryMeterCcode;
}
public void setPrimaryMeterCcode(String primaryMeterCcode) {
this.primaryMeterCcode = primaryMeterCcode;
}
public String getSecondaryMeterCcode() {
return secondaryMeterCcode;
}
public void setSecondaryMeterCcode(String secondaryMeterCcode) {
this.secondaryMeterCcode = secondaryMeterCcode;
}
public String getThreeMeterCcode() {
return threeMeterCcode;
}
public void setThreeMeterCcode(String threeMeterCcode) {
this.threeMeterCcode = threeMeterCcode;
}
public String getFourMeterCcode() {
return fourMeterCcode;
}
public void setFourMeterCcode(String fourMeterCcode) {
this.fourMeterCcode = fourMeterCcode;
}
public Integer getMark() {
return mark;
}
public void setMark(Integer mark) {
this.mark = mark;
}
@Override
public int compareTo(WaterBean o) {
return this.getID().compareTo(o.getID());
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(ID);
out.writeUTF(name);
out.writeUTF(userID);
out.writeUTF(caliber);
out.writeUTF(primaryMeterCcode);
out.writeUTF(secondaryMeterCcode);
out.writeUTF(threeMeterCcode);
out.writeUTF(fourMeterCcode);
out.writeInt(mark);
}
@Override
public void readFields(DataInput in) throws IOException {
ID=in.readUTF();
name=in.readUTF();
userID=in.readUTF();
caliber=in.readUTF();
primaryMeterCcode=in.readUTF();
secondaryMeterCcode=in.readUTF();
threeMeterCcode=in.readUTF();
fourMeterCcode=in.readUTF();
mark=in.readInt();
}
@Override
public String toString() {
return "WaterBean{" +
"ID='" + ID + '\'' +
", name='" + name + '\'' +
", userID='" + userID + '\'' +
", caliber='" + caliber + '\'' +
", primaryMeterCcode='" + primaryMeterCcode + '\'' +
", secondaryMeterCcode='" + secondaryMeterCcode + '\'' +
", threeMeterCcode='" + threeMeterCcode + '\'' +
", fourMeterCcode='" + fourMeterCcode + '\'' +
'}';
}
}
需要注意的是Bean的属性必须进行初始化,否则可能出现空指针异常。
WaterDrive.java
package hadoop.mr.join;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.Iterator;
/**
* reducer 合并数据
* 多文件合并
*/
public class WaterDrive {
public static class WaterMapper extends Mapper<LongWritable, Text, Text, WaterBean> {
/**
* map端实现数据标记,标记完后输出key和value
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取文件名
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String filename = fileSplit.getPath().getName();
//切割数据
String[] values = value.toString().split(",");
// water的bean
WaterBean waterBean = new WaterBean();
// water
if("water.txt".equals(filename)){
waterBean.setID(values[0]);
waterBean.setName(values[1]);
waterBean.setMark(1);
}else if("water_info.txt".equals(filename)){
// water_info
waterBean.setID(values[0]);
waterBean.setUserID(values[1]);
waterBean.setCaliber(values[2]);
waterBean.setMark(2);
}else {
// water_meter
waterBean.setID(values[0]);
waterBean.setPrimaryMeterCcode(values[1]);
if (values.length>=3){
waterBean.setSecondaryMeterCcode(values[2]);
}
if (values.length>=4){
waterBean.setThreeMeterCcode(values[3]);
}
if (values.length>=5){
waterBean.setFourMeterCcode(values[4]);
}
waterBean.setMark(3);
}
context.write(new Text(waterBean.getID()),waterBean);
}
}
public static class WaterReducer extends Reducer<Text, WaterBean, NullWritable, WaterBean> {
/**
* reduce 合并文件
* 相同的key 视为 一条数据
* 通过标记识别文件
* 合并每个文件的属性
* @param key
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<WaterBean> values, Context context) throws IOException, InterruptedException {
Iterator<WaterBean> iterator = values.iterator();
WaterBean bean = new WaterBean();
while (iterator.hasNext()) {
WaterBean waterBean = iterator.next();
if (waterBean.getMark()==1){
bean.setID(waterBean.getID());
bean.setName(waterBean.getName());
}else if(waterBean.getMark()==2){
bean.setUserID(waterBean.getUserID());
bean.setCaliber(waterBean.getCaliber());
}else {
bean.setPrimaryMeterCcode(waterBean.getPrimaryMeterCcode());
bean.setSecondaryMeterCcode(waterBean.getSecondaryMeterCcode());
bean.setThreeMeterCcode(waterBean.getThreeMeterCcode());
bean.setFourMeterCcode(waterBean.getFourMeterCcode());
}
}
context.write(NullWritable.get(),bean);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 设置文件路径
args = new String[]{"D:\\BigData\\hadoop\\mr\\join\\left\\input", "D:\\BigData\\hadoop\\mr\\join\\left\\output"};
// 创建job任务
Job job = Job.getInstance(new Configuration(), WaterDrive.class.getName());
// job设置运行类
job.setJarByClass(WaterDrive.class);
// job设置Mapper类
job.setMapperClass(WaterMapper.class);
// Mapper设置key类型
job.setMapOutputKeyClass(Text.class);
// Mapper设置value类型
job.setMapOutputValueClass(WaterBean.class);
// job设置Reducer类
job.setReducerClass(WaterReducer.class);
// 设置输出key类型
job.setOutputKeyClass(NullWritable.class);
// 设置输出value类型
job.setOutputValueClass(WaterBean.class);
// 设置输入文件目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 设置输出文件目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 执行job任务
System.out.println(job.waitForCompletion(true) ? "执行成功" : "执行失败");
}
}
合并结果
WaterBean{ID='3030100100', name='XXX体育馆', userID='30002', caliber='80', primaryMeterCcode='405X', secondaryMeterCcode='40502T', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3210100100', name='XXX干训楼', userID='30001', caliber='80', primaryMeterCcode='405X', secondaryMeterCcode='40501T', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3210100200', name='XXX成教院XXX分院', userID='30022', caliber='50', primaryMeterCcode='405X', secondaryMeterCcode='40511X', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3290100100', name='XXX校医院', userID='30023', caliber='50', primaryMeterCcode='416X', secondaryMeterCcode='41601X', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3313200200', name='XXXS馆', userID='30024', caliber='80', primaryMeterCcode='405X', secondaryMeterCcode='40506T', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3313800500', name='XXXL馆', userID='30041', caliber='50', primaryMeterCcode='405X', secondaryMeterCcode='40508T', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3363000100', name='离退休活动室', userID='30043', caliber='40', primaryMeterCcode='405X', secondaryMeterCcode='40509T', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3370100100', name='XXXL楼', userID='30027', caliber='80', primaryMeterCcode='405X', secondaryMeterCcode='40507T', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3370200100', name='XXXK酒店', userID='30011', caliber='80', primaryMeterCcode='405X', secondaryMeterCcode='40503T', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3370300100', name='XXXK', userID='30012', caliber='50', primaryMeterCcode='405X', secondaryMeterCcode='40504T', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3421200300', name='XXX田径场厕所', userID='30045', caliber='50', primaryMeterCcode='405X', secondaryMeterCcode='40511X', threeMeterCcode='4051101T', fourMeterCcode=''}
WaterBean{ID='3620300100', name='XXX花圃+', userID='30090', caliber='150', primaryMeterCcode='405X', secondaryMeterCcode='', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3620300500', name='校医院南+', userID='30089', caliber='100', primaryMeterCcode='416X', secondaryMeterCcode='', threeMeterCcode='', fourMeterCcode=''}
WaterBean{ID='3620303100', name='车队+', userID='30094', caliber='15', primaryMeterCcode='416X', secondaryMeterCcode='41601X', threeMeterCcode='4160101T', fourMeterCcode=''}
版权声明:本文为Phalaris_arundinacea原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。