说明:输入文件为北京市2010年1月份到5月份每天每间隔3小时的温度记录,数据格式为yyyyMMddHHmm temp,如下截图
(图中温度为华氏温度)
需求:求出每个月份温度最高的5天
解决思路:1、以月份+温度为key进行排序,月份升序,温度降序
2、每个月份单独生成一个文件,读取每个文件前5条记录,即为每个月份温度最高的5天
程序如下
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.namenode.dfshealth_jsp;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.jcraft.jsch.jce.SHA1;
import org.apache.hadoop.mapreduce.Job;
/*
* 根据月份分组
* 每月按照温度降序排列
* 每个月单独输出一个文件
*/
public class JobRun {
public static void main(String[] args) {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
try {
Job job = new Job(conf);
job.setJarByClass(JobRun.class);
job.setMapperClass(LocalMap.class);
job.setReducerClass(LocalReduce.class);
job.setMapOutputKeyClass(DateKey.class);
job.setMapOutputValueClass(Text.class);
job.setSortComparatorClass(LocalSort.class);//设置成自己写的排序类
job.setGroupingComparatorClass(LocalGroup.class);//设置成自己写的分组类
job.setPartitionerClass(LocalPartitioner.class);//设置成自己写的分区类
job.setNumReduceTasks(5);//设置reduce个数为5,因为数据文件包含5个月,每个月分生成一个文件
FileInputFormat.addInputPath(job, new Path("/user/root/temp/input"));
FileOutputFormat.setOutputPath(job, new Path("/user/root/temp/output"));
System.exit(job.waitForCompletion(true)?0:1);
} catch (Exception e) {
e.printStackTrace();
}
}
//编写map
public static class LocalMap extends Mapper<LongWritable, Text, DateKey, Text> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, DateKey, Text>.Context context)
throws IOException, InterruptedException {
String[] s = value.toString().split("\t");
s[1]=s[1].trim();
DateKey dk = new DateKey();
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
if(s.length==2){
try {
Date d = sdf.parse(s[0]);
Calendar c =Calendar.getInstance();
c.setTime(d);
int month=c.get(Calendar.MONTH)+1;
dk.setMonth(month);
} catch (ParseException e) {
e.printStackTrace();
}
dk.setTemp(Integer.parseInt(s[1]));
context.write(dk, value);
}
}
}
//编写reduce
public static class LocalReduce extends Reducer<DateKey, Text, Text,Text>{
@Override
protected void reduce(DateKey key, Iterable<Text> value, Reducer<DateKey, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
for(Text t:value){
context.write(new Text(key.toString()), t);
}
}
}
//自定义封装writable类
public static class DateKey implements WritableComparable<DateKey>{
public int getMonth() {
return month;
}
public void setMonth(int month) {
this.month = month;
}
public int getTemp() {
return temp;
}
public void setTemp(int temp) {
this.temp = temp;
}
private int month;
private int temp;
@Override
public void readFields(DataInput input) throws IOException {
this.month = input.readInt();
this.temp = input.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(month);
out.writeInt(temp);
}
@Override
public int compareTo(DateKey o) {
int res = Integer.compare(month, o.getMonth());
if(res!=0) return res;
else{
return Integer.compare(temp, o.getTemp());
}
}
@Override
public String toString() {
return month+"\t"+temp;
}
@Override
public int hashCode() {
return new Integer(month+temp).hashCode();
}
}
//自定义排序,月份相同按照温度降序
public static class LocalSort extends WritableComparator{
@Override
public int compare(WritableComparable a, WritableComparable b) {
DateKey d1 = (DateKey) a;
DateKey d2 = (DateKey) b;
int res = Integer.compare(d1.getMonth(),d2.getMonth());
if(res!=0) return res;
else{
return -Integer.compare(d1.getTemp(),d2.getTemp());
}
}
public LocalSort() {
super(DateKey.class, true);
}
}
//自定义partition,每个月份单独生成一个reduce
public static class LocalPartitioner extends Partitioner<DateKey, Text>{
@Override
public int getPartition(DateKey key, Text value, int num) {
return key.getMonth()%num;
}
}
//自定义分组,map后默认按照相同key合并,本例中需要按照key.month合并
public static class LocalGroup extends WritableComparator{
@Override
public int compare(WritableComparable a, WritableComparable b) {
DateKey d1 = (DateKey) a;
DateKey d2 = (DateKey) b;
return Integer.compare(d1.getMonth(),d2.getMonth());
}
public LocalGroup(){
super(DateKey.class, true);
}
}
}
执行结果如下图:可以看到共生成了5个文件,打开文件可以看到温度都已经按照降序排列
版权声明:本文为super_ozman原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。