Hadoop系列
注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。
今天继续和大家分享一下MapReduce基础入门2
#博学谷IT学习技术支持
前言

1、MapReduce会将一个大的计算任务进行拆分,拆分成小任务,让这些小任务在不同的计算机中进行处理,最后再将这些小任务的结果记性整体汇总
2、MapReduce分为两个阶段,一个Map阶段负责任务的拆分,一个是Reduce阶段,负责任务的汇总
3、整个MapReduce工作流程可以分为3个阶段:map、shuffle、reduce。
作者这里用又一个简单的案例来说明shuffle阶段partition的用法
一、分区partition是什么?
1、分区就是分文件,本质是将不同的减值对,最后输出的不同的文件中,理解为将数据进行拆分
2、实现的方式是;在Map阶段对k2打标记,标记相同的数据就会分到同一个分区,同一个分区的数据会被同一个redue拉取
if(id % 2 == 0){
return 0;
}else{
return 1;
}
3、如果设置了分区则会有多个文件输出,则需要有多个reduce,相同标记的K2数据会被同一个reduce
处理,多个reduce就会产生多个结果文件
4、partition原来就是map结束之后根据key值不同,自定义打个标签。。。然后再由不同的reducer拉一下。。
二、使用步骤
1.数据准备
日期 县 州 县编码 确诊人数 死亡人数
2021-01-28,Autauga,Alabama,01001 , 5554 , 69
需求:将每一个州的疫情数据分到不同的文件中
2.Map阶段
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class CovidPartitionerMapper extends Mapper<LongWritable, Text,Text,Text> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
String k2 = value.toString().split(",")[2];
context.write(new Text(k2),value);
}
}
3.Partition阶段
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class CovidPartitionerPartition extends Partitioner<Text,Text> {
@Override
public int getPartition(Text text, Text text2, int i) {
return (text.hashCode() & 2147483647) % i;
}
}
4.Reduce阶段
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class CovidPartitionerReducer extends Reducer<Text,Text,Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
for (Text k3 : values) {
context.write(k3,NullWritable.get());
}
}
}
5.Driver运行入口
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
public class CovidPartitionerDrvier {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "CovidPartitioner");
job.setJarByClass(CovidPartitionerDrvier.class);
FileInputFormat.addInputPath(job,new Path("hdfs://node1:8020/input"));
job.setMapperClass(CovidPartitionerMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(CovidPartitionerReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setPartitionerClass(CovidPartitionerPartition.class);
job.setNumReduceTasks(50);
Path outPath = new Path("hdfs://node1:8020/output/covid19");
FileOutputFormat.setOutputPath(job,outPath);
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration());
boolean exists = fileSystem.exists(outPath);
if (exists){
fileSystem.delete(outPath,true);
}
//3.提交yarn执行
boolean bl = job.waitForCompletion(true);
//退出
System.exit(bl ? 0 : 1);
}
}
总结
通过一个小案例说明Partition的作用和其代码实现,partition就是map结束之后根据key值不同,自定义打个标签。。。然后再由不同的reducer拉一下。一点都不深澳,不要想的太复杂。