hadoop自定义排序、分组、分区(温度统计)

需求:
数据格式如下:

1950-10-02 12:21:02 41℃
。。。。。。
。。。。。。
。。。。。。
。。。。。。

要求:
1、计算1949-1955年,每年温度最高的时间
2、计算1949-1955年,每年温度最高前十天的温度
思路:
1、按照年份升序排序,同时每一年温度降序排序。
2、按照年份分组,每一年对应一个reduce任务。

目的:
自定义排序、自定义分组、自定义分区
key的封装:

package com.ceaser.analysis.sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class KeyPair implements WritableComparable<KeyPair> {

    private int year;
    private int hot;

    public int getYear() {
        return year;
    }

    public void setYear(int year) {
        this.year = year;
    }

    public int getHot() {
        return hot;
    }

    public void setHot(int hot) {
        this.hot = hot;
    }

    public void readFields(DataInput in) throws IOException {
        this.year = in.readInt();
        this.hot = in.readInt();

    }

    public void write(DataOutput out) throws IOException {
        out.writeInt(year);
        out.writeInt(hot);
    }

    public int compareTo(KeyPair o) {
        int result = Integer.compare(year, o.getYear());
        if(0!=result){
            return result;
        }
        return Integer.compare(hot, o.getHot());
    }


    public String toString() {
        return year+"/t"+hot;
    }

    public int hashCode() {
        return new Integer(year+hot).hashCode();

    }

}

排序的封装:

package com.ceaser.analysis.sort;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class SortHot extends WritableComparator {


    SortHot(){
        super(KeyPair.class,true);
    }


    public int compare(WritableComparable a, WritableComparable b) {
        KeyPair k1 = (KeyPair)a;
        KeyPair k2 = (KeyPair)b;
        int result = Integer.compare(k1.getYear(), k2.getYear());
        if(0!=result){
            return result;
        }
        return -Integer.compare(k1.getHot(), k2.getHot());
    }

}

分区的封装:

 package com.ceaser.analysis.sort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class FirstPartition extends Partitioner<KeyPair, Text> {

    public int getPartition(KeyPair key, Text value, int numPartitions) {

        return (key.getYear()*127) & numPartitions;
    }

}

分组的封装:

package com.ceaser.analysis.sort;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class GroupHot extends WritableComparator {


    GroupHot(){
        super(KeyPair.class,true);
    }


    public int compare(WritableComparable a, WritableComparable b) {
        KeyPair k1 = (KeyPair)a;
        KeyPair k2 = (KeyPair)b;
        return Integer.compare(k1.getYear(), k2.getYear());
    }

}

主函数:

package com.ceaser.analysis.sort;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.ceaser.analysis.mapper.QqMapper;

public class RunJob {

    static private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    static class HotMapper extends Mapper<LongWritable, Text, KeyPair, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String line = value.toString();
            String[] linearr = line.split("\t");
            if (linearr.length > 2) {
                try {
                    Date d = sdf.parse(linearr[0]);
                    Calendar c = Calendar.getInstance();
                    c.setTime(d);
                    int year = c.get(1);
                    int hot = Integer.valueOf(linearr[1].substring(0, linearr[1].indexOf("C")));
                    KeyPair kp = new KeyPair();
                    kp.setYear(year);
                    kp.setHot(hot);
                    context.write(kp, value);
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            }

        }
    }

    static class HotReduce extends Reducer<KeyPair, Text, KeyPair, Text> {
        protected void reduce(KeyPair key, Iterable<Text> value, Context context)
                throws IOException, InterruptedException {
            for (Text text : value) {
                context.write(key, text);
            }
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "hot");
        job.setJarByClass(QqMapper.class);
        job.setMapperClass(HotMapper.class);
        job.setReducerClass(HotReduce.class);
        job.setOutputKeyClass(KeyPair.class);
        job.setOutputValueClass(Text.class);

        job.setNumReduceTasks(3);
        job.setPartitionerClass(FirstPartition.class);
        job.setSortComparatorClass(SortHot.class);
        job.setGroupingComparatorClass(GroupHot.class);
        FileInputFormat.addInputPath(job, new Path("/user/hadoop/input/hot/"));
        FileOutputFormat.setOutputPath(job, new Path("/user/hadoop/output/hot/"));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

版权声明:本文为wzq6578702原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。