MapReduce天气案例

需求:

1949-10-01 14:21:02	34c
1949-10-02 14:01:02	36c
1950-01-01 11:21:02	32c
1950-10-01 12:21:02	37c
1951-12-01 12:21:02	23c
1950-10-02 12:21:02	41c
1950-10-03 12:21:02	27c
1951-07-01 12:21:02	45c
1951-07-02 12:21:02	46c
1951-07-03 12:21:03	47c

统计每一年的每一个月中的气温最高的前三个,且一年的数据结果输出到一个文件。

需求分析:

分组这里还会用到比较器,要按照年、月分组,因此这个自定义KEY比较复杂了就,包含了年、月、温度。

案例实现:

在MR过程中Key往往用于分组或排序,当hadoop内置的key键的数据类型不能满足需求时,就需要自定义key了。接下来马上先定义一个键。

1.自定义Key

我们以前自定义的Mapper类中碰见最多的键是Text,来参考一下:public class Text extends BinaryComparable implements WritableComparable。他的类实现了一个接口WritableComparable。根据上面的注释:

<p>Example:</p>
 * <p><blockquote><pre>
 *     public class MyWritableComparable implements WritableComparable<MyWritableComparable> {
 *       // Some data
 *       private int counter;
 *       private long timestamp;
 *       
 *       public void write(DataOutput out) throws IOException {
 *         out.writeInt(counter);
 *         out.writeLong(timestamp);
 *       }
 *       
 *       public void readFields(DataInput in) throws IOException {
 *         counter = in.readInt();
 *         timestamp = in.readLong();
 *       }
 *       
 *       public int compareTo(MyWritableComparable o) {
 *         int thisValue = this.value;
 *         int thatValue = o.value;
 *         return (thisValue &lt; thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
 *       }
 *
 *       public int hashCode() {
 *         final int prime = 31;
 *         int result = 1;
 *         result = prime * result + counter;
 *         result = prime * result + (int) (timestamp ^ (timestamp &gt;&gt;&gt; 32));
 *         return result
 *       }
 *     }
 * </pre></blockquote></p>

这里要提两个接口:Writable接口和WritableComparable接口。在MR最终实现Writable接口的类可以是值,而实现WritableComparable接口的类可以是键,也可以是值。我自己可以定义一个MyWritableComparable来实现这个接口。这个接口继承了两个接口,writable接口定义了序列化和反序列化,compareable就负责比较。

spill to disk的过程中,调用快排算法的时候会调用比较器,优先调用户自定义比较器,其次才是KEY自己的比较器。把“温度”放到KEY里面去比较,比较合适(根据需求)。并且,有可能会用到分组来做聚合,分组得用“年-月”来分组,聚合的时候把数据按照分组聚合。

package com.husky.hadoop.weather;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class MyKey implements WritableComparable<MyKey>{
	private int year;
	private int month;
	private double temperature;
	

	public MyKey() {
		super();
	}

	public MyKey(int year, int month, double temperature) {
		super();
		this.year = year;
		this.month = month;
		this.temperature = temperature;
	}

	public int getYear() {
		return year;
	}

	public void setYear(int year) {
		this.year = year;
	}

	public int getMonth() {
		return month;
	}

	public void setMonth(int month) {
		this.month = month;
	}

	public double getTemperature() {
		return temperature;
	}

	public void setTemperature(double temperature) {
		this.temperature = temperature;
	}

	/**
	 * 把对象写到流里面去,就是序列化和反序列化
	 * */
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(year);
		out.writeInt(month);
		out.writeDouble(temperature);
		
	}

	/**
	 * 把对象从输入流里面读出来
	 * */
	@Override
	public void readFields(DataInput in) throws IOException {
		this.year=in.readInt();
		this.month=in.readInt();
		this.temperature=in.readDouble();
		
	}

	/**
	 * 当前key的比较方法,在排序时调用。返回0、正数、负数
	 * 不能只比较温度,必须得在年月相同的情况下,再去比较温度
	 * */
	@Override
	public int compareTo(MyKey o) {
		int r1 = Integer.compare(this.getYear(), o.getYear());
		if (r1==0) {
			int r2 = Integer.compare(this.getMonth(), o.getMonth());
			if (r2==0) {
				//降序排序
				return -Double.compare(this.getTemperature(), o.getTemperature());
			}
			return r2;
		}
		return r1;
	}

}

2.Mapper类

map是先读取一行数据,1949-10-01 14:21:02 34c ——> MyKey(1949,10,36):Text

K-V默认用的是偏移量和读取的一条记录,用到的是FileInputFormat,但是我们可以换掉。用KeyValueTextInputFormat,它用到的是KeyValueLineRecordReader:

public class KeyValueTextInputFormat extends FileInputFormat<Text, Text> {

  @Override
  protected boolean isSplitable(JobContext context, Path file) {
    final CompressionCodec codec =
      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
    if (null == codec) {
      return true;
    }
    return codec instanceof SplittableCompressionCodec;
  }

  public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit,
      TaskAttemptContext context) throws IOException {
    
    context.setStatus(genericSplit.toString());
    return new KeyValueLineRecordReader(context.getConfiguration());
  }

}

看一下KeyValueLineRecordReader的一个关键方法setKeyValue:

 public static void setKeyValue(Text key, Text value, byte[] line,
      int lineLen, int pos) {
    if (pos == -1) {
      key.set(line, 0, lineLen);
      value.set("");
    } else {
      key.set(line, 0, pos);
      value.set(line, pos + 1, lineLen - pos - 1);
    }
  }

pos==-1表示没有制表符,就把整个一行都作为key,把value set(“”)。否则,制表符前的key(1949-10-01 14:21:02),后面为value(34c)。所以KEYIN就是Text

package com.husky.hadoop.weather;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WeatherMapper extends Mapper<Text, Text, MyKey, Text>{
	static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd");//得到时间对象
	@Override
	protected void map(Text key, Text value, Context context)
			throws IOException, InterruptedException {
			//KEY:1949-10-01 14:21:02	;VALUE:36C
		try {
			//根据时间对象去把年、月取出来
			Date date = sdf.parse(key.toString());
			Calendar c = Calendar.getInstance();//下面继续拿年和月
			c.setTime(date);
			int year = c.get(Calendar.YEAR);//拿出year
			int month = c.get(Calendar.MONTH);//拿出month
			//把34C切割,拿出34
			double temperature = Double.parseDouble(value.toString().substring(0, value.toString().length()-1));
			
			MyKey outkey = new MyKey(year,month,temperature);
			Text outvalue = new Text(key+"\t"+value);
			context.write(outkey, outvalue);
		} catch (ParseException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

3.自定义分区类

一条记录经过Map之后,K-V要打上P的标签明确自己未来要去哪个分区。很明显这里不能根据value的值来确定分区号,必须得根据year来确定分区号:

package com.husky.hadoop.weather;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
//从Mapper出来的数据类型就是<MyKey,Text>
public class MyPartitioner extends Partitioner<MyKey, Text>{

	@Override
	public int getPartition(MyKey key, Text value, int numPartitions) {
		// TODO Auto-generated method stub
		return key.getYear()%numPartitions;//分区的数量就是reduce的数量,这里是3
	}

}

4.自定义分组比较器

根据年月分组,此时已经排好序了,因此温度就不用再考虑了。只需要管年月并进行分组就可以了。自定义分组比较器一定有构造方法,并提供比较方法

package com.husky.hadoop.weather;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class MyGroupCompareTo extends WritableComparator{
	
	//构造方法
	public MyGroupCompareTo(){
		super(MyKey.class,true);//指定类,告诉它用哪个类比较,true表示是否构造当前对象
	}
	public int compare(WritableComparable a,WritableComparable b){
		//类型强转
		MyKey k1 = (MyKey)a;
		MyKey k2 = (MyKey)b;
		
		//先比较年,后比较月,得到结果直接return
		int r1 = Integer.compare(k1.getYear(), k2.getYear());
		if (r1==0) {
			return Integer.compare(k1.getMonth(), k2.getMonth());//返回0
		}
		return r1;//返回非0
		
	}

}

5.自定义Reducer类

数据经过分组,就可以流入Reducer中了。Reducer的输入,就是Mapper的输出,所以KEYIN为MyKey,VALUEIN为Text。反观Reducer的输出就很自由了,KEYOUT可以是Text,也可以是NullWritable,VALUEOUT也是如此。我们传入Reducer的是一整坨数据“1949-10-01 14:21:02 34c”

package com.husky.hadoop.weather;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WeatherReducer extends Reducer<MyKey, Text, Text, NullWritable>{

	@Override
	protected void reduce(MyKey key, Iterable<Text> iter, Context context)
			throws IOException, InterruptedException {
		int num = 0;
		for(Text value : iter){
			if (num>=3) {
				break;
			}
			//key:1950-10-02 12:21:02	41c;value为null。反过来也行,我想咋滴就咋滴
			context.write(value, NullWritable.get());//输出
			num++;
		}
	}
}

6.Client客户端类

在提交Job任务之前,需要对分组比较器、输入格式化类、分区器作出设置

package com.husky.hadoop.weather;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.husky.hadoop.wc.MyMapper;
import com.husky.hadoop.wc.MyReducer;
import com.husky.hadoop.wc.MyWC;

public class RunJob {
	public static void main(String[] args) {
	客户端自动读取配置文件,并把配置信息加载到conf对象中
			Configuration conf = new Configuration(true);
			
			try {
				//job
				Job job = Job.getInstance(conf);
				FileSystem fs = FileSystem.get(conf);
				
				//必须要配置的,入口类
				job.setJarByClass(RunJob.class);
				//设置job name
				job.setJobName("weather");
				
				//设置Mapper和Reducer
				job.setMapperClass(WeatherMapper.class);
				job.setReducerClass(WeatherReducer.class);
				
				//设置分组比较器
				job.setGroupingComparatorClass(MyGroupCompareTo.class);
				//弃用FileInputFormat,改用KeyValueTextInputFormat
				job.setInputFormatClass(KeyValueTextInputFormat.class);
				
				//指定自定义分区类
				job.setPartitionerClass(MyPartitioner.class);
				
				//设置输出的K-V类型
				job.setOutputKeyClass(MyKey.class);
				job.setOutputValueClass(Text.class);
		
				//设置reduce的数量,默认1
				job.setNumReduceTasks(3);
				
				//设置计算输入数据,path就是hdfs上的文件路径
				FileInputFormat.addInputPath(job, new Path("/input/weather"));
				//设置计算输出目录,最后的计算结果要在这该目录中
				Path outPath = new Path("/output/weather/");//该目录必须不存在,否则计算容易出错
				if (fs.exists(outPath)) { //如果目录存在就删除
					fs.delete(outPath,true);
				}
				
				FileOutputFormat.setOutputPath(job, outPath);
				
				//开始执行
				boolean f = job.waitForCompletion(true);
				if (f) {
					System.out.println("MapReduce程序执行成功!");
				}
			} catch (Exception e) {
				// TODO: handle exception
			}
	}

}

执行结果

根据年来定义的分区数量,其中一个的结果为:

1950-01-01 11:21:02	32c
1950-10-02 12:21:02	41c
1950-10-01 12:21:02	37c
1950-10-03 12:21:02	27c

版权声明:本文为qq_36299025原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。