MapReduce基础入门5

Hadoop系列

注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。
今天继续和大家分享一下MapReduce基础入门5
#博学谷IT学习技术支持



前言

在这里插入图片描述
1、MapReduce会将一个大的计算任务进行拆分,拆分成小任务,让这些小任务在不同的计算机中进行处理,最后再将这些小任务的结果记性整体汇总

2、MapReduce分为两个阶段,一个Map阶段负责任务的拆分,一个是Reduce阶段,负责任务的汇总

3、整个MapReduce工作流程可以分为3个阶段:map、shuffle、reduce。

作者这里用又一个简单的案例来说明shuffle阶段grouping分组的用法


一、分组是什么?

1、分组的作用就是去K2进行去重,然后相同K2的V2存入集合
hello 1
hello 1 -----分组--------> hello [1,1,1] ----reduceTask----> hello 3
hello 1

2、MR默认的分组是根据K2来决定的,相同K2的数据会被分到同一组

3、当默认的分组,不满足我们的需求时,我们可以使用自定义分组

4、当我们没有指定分组规则时,系统默认调用K2类中的compareTo方法,如果我们自定义了分组规则,则就按照我们的方式来实现分组

二、使用步骤

1.数据准备

和前几个案例一样还是使用相同的数据
日期 县 州 县编码 确诊人数 死亡人数
2021-01-28,Autauga,Alabama,01001 , 5554 , 69

需求:找出每个州前3的确诊认识,实质上就是一个topN问题

2.封装Java Bean

因为这里要用到排序,所以首先还是要先封装一个Java Bean对象

import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class CovidBean_pra implements WritableComparable<CovidBean_pra>{
    private String state;
    private Integer cases;

    public CovidBean_pra() {
    }

    public CovidBean_pra(String state, Integer cases) {
        this.state = state;
        this.cases = cases;
    }

    public String getState() {
        return state;
    }

    public void setState(String state) {
        this.state = state;
    }

    public Integer getCases() {
        return cases;
    }

    public void setCases(Integer cases) {
        this.cases = cases;
    }

    @Override
    public String toString() {
        return "CovidBean_pra{" +
                "state='" + state + '\'' +
                ", cases=" + cases +
                '}';
    }

    @Override
    public int compareTo(CovidBean_pra o) {
        int result = this.state.compareTo(o.state);
        if (result == 0){
            return (this.cases - o.cases) * -1;
        }
        return result;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(state);
        out.writeInt(cases);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.state = in.readUTF();
        this.cases = in.readInt();
    }
}

3.Map阶段

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class Mapper_pra extends Mapper<LongWritable,Text,CovidBean_pra,Text>{
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CovidBean_pra, Text>.Context context) throws IOException, InterruptedException {
        String[] array = value.toString().split(",");
        if (array.length !=6 || "Unknown".equals(array[1])){
            return;
        }
        else {
            CovidBean_pra covidBean_pra = new CovidBean_pra();
            covidBean_pra.setState(array[2]);
            covidBean_pra.setCases(Integer.parseInt(array[4]));

            context.write(covidBean_pra,value);
        }
    }
}

4.分区操作

这里分区操作不是必须的,就是为了联系一下分区操作

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class Partitioner_pra extends Partitioner<CovidBean_pra, Text> {
    @Override
    public int getPartition(CovidBean_pra covidBean_pra, Text text, int i) {

        return ( (covidBean_pra.getState()).hashCode()  & 2147483647 ) % i;
    }
}

5.分组操作

由于k2封装了Java Bean对象,无法自动将每个州的数据分到同一个组,所以这里需要自定义分组,将同一个州的数据分到一起。

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class Grouping_pra extends WritableComparator{
    public Grouping_pra(){
        super(CovidBean_pra.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        CovidBean_pra c1 = (CovidBean_pra) a;
        CovidBean_pra c2 = (CovidBean_pra) b;

        return c1.getState().compareTo(c2.getState());
    }
}

6.Reduce阶段

这里对分完组的数据取topN即可。

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class Reducer_pra  extends Reducer<CovidBean_pra,Text,Text,NullWritable>{
    @Override
    protected void reduce(CovidBean_pra key, Iterable<Text> values, Reducer<CovidBean_pra, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        int n = 0;
        for (Text value : values) {
            context.write(value,NullWritable.get());
            if (++n >=3){
                break;
            }
        }
    }
}

7.Driver运行入口

最后写入口,因为用到了分区和分组别忘记setPartitionerClass,setNumReduceTasks,setGroupingComparatorClass

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.net.URI;

public class Driver_pra {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration, "Grouping_pra");

        FileInputFormat.addInputPath(job,new Path("hdfs://node1:8020/input/covid19.dat"));

        job.setMapperClass(Mapper_pra.class);
        job.setMapOutputKeyClass(CovidBean_pra.class);
        job.setMapOutputValueClass(Text.class);

        job.setPartitionerClass(Partitioner_pra.class);
        job.setNumReduceTasks(50);
        job.setGroupingComparatorClass(Grouping_pra.class);

        job.setReducerClass(Reducer_pra.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        Path outputPath = new Path("hdfs://node1:8020/output/covid_grouping");
        FileOutputFormat.setOutputPath(job,outputPath);

        FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration(), "root");
        if (fileSystem.exists(outputPath)){
            fileSystem.delete(outputPath,true);
        }

        boolean bl = job.waitForCompletion(true);
        System.exit(bl? 0 : 1);
    }
}

在这里插入图片描述


总结

这个案例主要是一个通过topN问题阐明如何分组的一个案例,顺便加入了前面的排序和分区的知识。


版权声明:本文为weixin_53280379原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。