Hadoop系列
注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。
今天继续和大家分享一下MapReduce基础入门5
#博学谷IT学习技术支持
前言
1、MapReduce会将一个大的计算任务进行拆分,拆分成小任务,让这些小任务在不同的计算机中进行处理,最后再将这些小任务的结果记性整体汇总
2、MapReduce分为两个阶段,一个Map阶段负责任务的拆分,一个是Reduce阶段,负责任务的汇总
3、整个MapReduce工作流程可以分为3个阶段:map、shuffle、reduce。
作者这里用又一个简单的案例来说明shuffle阶段grouping分组的用法
一、分组是什么?
1、分组的作用就是去K2进行去重,然后相同K2的V2存入集合
hello 1
hello 1 -----分组--------> hello [1,1,1] ----reduceTask----> hello 3
hello 1
2、MR默认的分组是根据K2来决定的,相同K2的数据会被分到同一组
3、当默认的分组,不满足我们的需求时,我们可以使用自定义分组
4、当我们没有指定分组规则时,系统默认调用K2类中的compareTo方法,如果我们自定义了分组规则,则就按照我们的方式来实现分组
二、使用步骤
1.数据准备
和前几个案例一样还是使用相同的数据
日期 县 州 县编码 确诊人数 死亡人数
2021-01-28,Autauga,Alabama,01001 , 5554 , 69
需求:找出每个州前3的确诊认识,实质上就是一个topN问题
2.封装Java Bean
因为这里要用到排序,所以首先还是要先封装一个Java Bean对象
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class CovidBean_pra implements WritableComparable<CovidBean_pra>{
private String state;
private Integer cases;
public CovidBean_pra() {
}
public CovidBean_pra(String state, Integer cases) {
this.state = state;
this.cases = cases;
}
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
public Integer getCases() {
return cases;
}
public void setCases(Integer cases) {
this.cases = cases;
}
@Override
public String toString() {
return "CovidBean_pra{" +
"state='" + state + '\'' +
", cases=" + cases +
'}';
}
@Override
public int compareTo(CovidBean_pra o) {
int result = this.state.compareTo(o.state);
if (result == 0){
return (this.cases - o.cases) * -1;
}
return result;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(state);
out.writeInt(cases);
}
@Override
public void readFields(DataInput in) throws IOException {
this.state = in.readUTF();
this.cases = in.readInt();
}
}
3.Map阶段
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class Mapper_pra extends Mapper<LongWritable,Text,CovidBean_pra,Text>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CovidBean_pra, Text>.Context context) throws IOException, InterruptedException {
String[] array = value.toString().split(",");
if (array.length !=6 || "Unknown".equals(array[1])){
return;
}
else {
CovidBean_pra covidBean_pra = new CovidBean_pra();
covidBean_pra.setState(array[2]);
covidBean_pra.setCases(Integer.parseInt(array[4]));
context.write(covidBean_pra,value);
}
}
}
4.分区操作
这里分区操作不是必须的,就是为了联系一下分区操作
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class Partitioner_pra extends Partitioner<CovidBean_pra, Text> {
@Override
public int getPartition(CovidBean_pra covidBean_pra, Text text, int i) {
return ( (covidBean_pra.getState()).hashCode() & 2147483647 ) % i;
}
}
5.分组操作
由于k2封装了Java Bean对象,无法自动将每个州的数据分到同一个组,所以这里需要自定义分组,将同一个州的数据分到一起。
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class Grouping_pra extends WritableComparator{
public Grouping_pra(){
super(CovidBean_pra.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
CovidBean_pra c1 = (CovidBean_pra) a;
CovidBean_pra c2 = (CovidBean_pra) b;
return c1.getState().compareTo(c2.getState());
}
}
6.Reduce阶段
这里对分完组的数据取topN即可。
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Reducer_pra extends Reducer<CovidBean_pra,Text,Text,NullWritable>{
@Override
protected void reduce(CovidBean_pra key, Iterable<Text> values, Reducer<CovidBean_pra, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
int n = 0;
for (Text value : values) {
context.write(value,NullWritable.get());
if (++n >=3){
break;
}
}
}
}
7.Driver运行入口
最后写入口,因为用到了分区和分组别忘记setPartitionerClass,setNumReduceTasks,setGroupingComparatorClass
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.net.URI;
public class Driver_pra {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "Grouping_pra");
FileInputFormat.addInputPath(job,new Path("hdfs://node1:8020/input/covid19.dat"));
job.setMapperClass(Mapper_pra.class);
job.setMapOutputKeyClass(CovidBean_pra.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(Partitioner_pra.class);
job.setNumReduceTasks(50);
job.setGroupingComparatorClass(Grouping_pra.class);
job.setReducerClass(Reducer_pra.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
Path outputPath = new Path("hdfs://node1:8020/output/covid_grouping");
FileOutputFormat.setOutputPath(job,outputPath);
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration(), "root");
if (fileSystem.exists(outputPath)){
fileSystem.delete(outputPath,true);
}
boolean bl = job.waitForCompletion(true);
System.exit(bl? 0 : 1);
}
}
总结
这个案例主要是一个通过topN问题阐明如何分组的一个案例,顺便加入了前面的排序和分区的知识。