需求:
数据格式如下:
1950-10-02 12:21:02 41℃
。。。。。。
。。。。。。
。。。。。。
。。。。。。要求:
1、计算1949-1955年,每年温度最高的时间
2、计算1949-1955年,每年温度最高前十天的温度
思路:
1、按照年份升序排序,同时每一年温度降序排序。
2、按照年份分组,每一年对应一个reduce任务。
目的:
自定义排序、自定义分组、自定义分区
key的封装:
package com.ceaser.analysis.sort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class KeyPair implements WritableComparable<KeyPair> {
private int year;
private int hot;
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getHot() {
return hot;
}
public void setHot(int hot) {
this.hot = hot;
}
public void readFields(DataInput in) throws IOException {
this.year = in.readInt();
this.hot = in.readInt();
}
public void write(DataOutput out) throws IOException {
out.writeInt(year);
out.writeInt(hot);
}
public int compareTo(KeyPair o) {
int result = Integer.compare(year, o.getYear());
if(0!=result){
return result;
}
return Integer.compare(hot, o.getHot());
}
public String toString() {
return year+"/t"+hot;
}
public int hashCode() {
return new Integer(year+hot).hashCode();
}
}
排序的封装:
package com.ceaser.analysis.sort;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class SortHot extends WritableComparator {
SortHot(){
super(KeyPair.class,true);
}
public int compare(WritableComparable a, WritableComparable b) {
KeyPair k1 = (KeyPair)a;
KeyPair k2 = (KeyPair)b;
int result = Integer.compare(k1.getYear(), k2.getYear());
if(0!=result){
return result;
}
return -Integer.compare(k1.getHot(), k2.getHot());
}
}
分区的封装:
package com.ceaser.analysis.sort;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class FirstPartition extends Partitioner<KeyPair, Text> {
public int getPartition(KeyPair key, Text value, int numPartitions) {
return (key.getYear()*127) & numPartitions;
}
}
分组的封装:
package com.ceaser.analysis.sort;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class GroupHot extends WritableComparator {
GroupHot(){
super(KeyPair.class,true);
}
public int compare(WritableComparable a, WritableComparable b) {
KeyPair k1 = (KeyPair)a;
KeyPair k2 = (KeyPair)b;
return Integer.compare(k1.getYear(), k2.getYear());
}
}
主函数:
package com.ceaser.analysis.sort;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.ceaser.analysis.mapper.QqMapper;
public class RunJob {
static private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
static class HotMapper extends Mapper<LongWritable, Text, KeyPair, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] linearr = line.split("\t");
if (linearr.length > 2) {
try {
Date d = sdf.parse(linearr[0]);
Calendar c = Calendar.getInstance();
c.setTime(d);
int year = c.get(1);
int hot = Integer.valueOf(linearr[1].substring(0, linearr[1].indexOf("C")));
KeyPair kp = new KeyPair();
kp.setYear(year);
kp.setHot(hot);
context.write(kp, value);
} catch (ParseException e) {
e.printStackTrace();
}
}
}
}
static class HotReduce extends Reducer<KeyPair, Text, KeyPair, Text> {
protected void reduce(KeyPair key, Iterable<Text> value, Context context)
throws IOException, InterruptedException {
for (Text text : value) {
context.write(key, text);
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = new Job(conf, "hot");
job.setJarByClass(QqMapper.class);
job.setMapperClass(HotMapper.class);
job.setReducerClass(HotReduce.class);
job.setOutputKeyClass(KeyPair.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(3);
job.setPartitionerClass(FirstPartition.class);
job.setSortComparatorClass(SortHot.class);
job.setGroupingComparatorClass(GroupHot.class);
FileInputFormat.addInputPath(job, new Path("/user/hadoop/input/hot/"));
FileOutputFormat.setOutputPath(job, new Path("/user/hadoop/output/hot/"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
版权声明:本文为wzq6578702原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。