Hadoop实战之温度排序

说明:输入文件为北京市2010年1月份到5月份每天每间隔3小时的温度记录,数据格式为yyyyMMddHHmm    temp,如下截图

(图中温度为华氏温度)


需求:求出每个月份温度最高的5天

解决思路:1、以月份+温度为key进行排序,月份升序,温度降序

2、每个月份单独生成一个文件,读取每个文件前5条记录,即为每个月份温度最高的5天

程序如下

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.namenode.dfshealth_jsp;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.jcraft.jsch.jce.SHA1;

import org.apache.hadoop.mapreduce.Job;
/*
 * 根据月份分组
 * 每月按照温度降序排列
 * 每个月单独输出一个文件
 */

public class JobRun {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		Configuration conf = new Configuration();
		try {
			Job job = new Job(conf);
			job.setJarByClass(JobRun.class);
			job.setMapperClass(LocalMap.class);
			job.setReducerClass(LocalReduce.class);
			job.setMapOutputKeyClass(DateKey.class);
			job.setMapOutputValueClass(Text.class);
			job.setSortComparatorClass(LocalSort.class);//设置成自己写的排序类
			job.setGroupingComparatorClass(LocalGroup.class);//设置成自己写的分组类
			job.setPartitionerClass(LocalPartitioner.class);//设置成自己写的分区类
			job.setNumReduceTasks(5);//设置reduce个数为5,因为数据文件包含5个月,每个月分生成一个文件
			FileInputFormat.addInputPath(job, new Path("/user/root/temp/input"));
			FileOutputFormat.setOutputPath(job, new Path("/user/root/temp/output"));
			System.exit(job.waitForCompletion(true)?0:1);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	//编写map
	public static class LocalMap extends Mapper<LongWritable, Text, DateKey, Text> {

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, DateKey, Text>.Context context)
				throws IOException, InterruptedException {
			String[] s = value.toString().split("\t");
			s[1]=s[1].trim();
			DateKey dk = new DateKey();
			SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
			if(s.length==2){
				try {
					Date d = sdf.parse(s[0]);
					Calendar c =Calendar.getInstance();
					c.setTime(d);
					int month=c.get(Calendar.MONTH)+1;
					dk.setMonth(month);
				} catch (ParseException e) {
					e.printStackTrace();
				}
				dk.setTemp(Integer.parseInt(s[1]));
				context.write(dk, value);
			}
			}
			
		
	}
	//编写reduce
	public static class LocalReduce extends Reducer<DateKey, Text, Text,Text>{

		@Override
		protected void reduce(DateKey key, Iterable<Text> value, Reducer<DateKey, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			for(Text t:value){
				context.write(new Text(key.toString()), t);
			}
		}
		
	}
	//自定义封装writable类
	public static class DateKey implements WritableComparable<DateKey>{
		public int getMonth() {
			return month;
		}
		public void setMonth(int month) {
			this.month = month;
		}
		public int getTemp() {
			return temp;
		}
		public void setTemp(int temp) {
			this.temp = temp;
		}
		private int month;
		private int temp;
		@Override
		public void readFields(DataInput input) throws IOException {
			this.month = input.readInt();
			this.temp = input.readInt();
		}
		@Override
		public void write(DataOutput out) throws IOException {
			out.writeInt(month);
			out.writeInt(temp);
		}
		@Override
		public int compareTo(DateKey o) {
			int res = Integer.compare(month, o.getMonth());
			if(res!=0) return res;
			else{
				return Integer.compare(temp, o.getTemp());
			}
		}
		@Override
		public String toString() {
			return month+"\t"+temp;
		}
		@Override
		public int hashCode() {
			return new Integer(month+temp).hashCode();
		}
	}
	//自定义排序,月份相同按照温度降序
	public static class LocalSort extends WritableComparator{

		@Override
		public int compare(WritableComparable a, WritableComparable b) {
			DateKey d1 = (DateKey) a;
			DateKey d2 = (DateKey) b;
			int res = Integer.compare(d1.getMonth(),d2.getMonth());
			if(res!=0) return res;
			else{
				return -Integer.compare(d1.getTemp(),d2.getTemp());
			}
		}

		public LocalSort() {
			super(DateKey.class, true);
		}

		
	}
	//自定义partition,每个月份单独生成一个reduce
	public static class LocalPartitioner extends Partitioner<DateKey, Text>{

		@Override
		public int getPartition(DateKey key, Text value, int num) {
			return key.getMonth()%num;
		}
		
	}
	//自定义分组,map后默认按照相同key合并,本例中需要按照key.month合并
	public static class LocalGroup extends WritableComparator{
		@Override
		public int compare(WritableComparable a, WritableComparable b) {
			DateKey d1 = (DateKey) a;
			DateKey d2 = (DateKey) b;
			return Integer.compare(d1.getMonth(),d2.getMonth());
		}

		public LocalGroup(){
			super(DateKey.class, true);
		}
		
	}
}
执行结果如下图:

可以看到共生成了5个文件,打开文件可以看到温度都已经按照降序排列



版权声明:本文为super_ozman原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。