数据研发学习笔记4.6:大数据之MapReduce

1 概述

1.1 分布式并行编程

“摩尔定律”, CPU性能大约每隔18个月翻一番,但从2005年开始摩尔定律逐渐失效 ,需要处理的数据量快速增加,人们开始借助于分布式并行编程来提高程序性能

分布式程序运行在大规模计算机集群上,可以并行执行大规模数据处理任务,从而获得海量的计算能力。

谷歌公司最先提出了分布式并行编程模型MapReduce,Hadoop MapReduce是它的开源实现,后者比前者使用门槛低很多 。

  • 问题: 在MapReduce出现之前,已经有像MPI这样非常成熟的并行计算框架了,那么为什么Google还需要MapReduce?MapReduce相较于传统的并行计算框架有什么优势
    在这里插入图片描述

1.2 MapReduce模型简介

MapReduce将复杂的、运行于大规模集群上的并行计算过程高度地抽象到了两个函数:Map和Reduce。

编程容易,不需要掌握分布式并行编程细节,也可以很容易把自己的程序运行在分布式系统上,完成海量数据的计算。

  • MapReduce采用 “分而治之” 策略,一个存储在分布式文件系统中的大规模数据集,会被切分成许多独立的分片(split),这些分片可以被多个Map任务并行处理。

  • MapReduce设计的一个理念就是 “计算向数据靠拢” ,而不是“数据向计算靠拢”,因为,移动数据需要大量的网络传输开销。
    在这里插入图片描述

  • MapReduce框架采用了Master/Slave架构,包括一个Master和若干个Slave。Master上运行JobTracker,Slave上运行TaskTracker

  • Hadoop框架是用Java实现的,但是,MapReduce应用程序则不一定要用Java来写

1.3 Map和Reduce函数

在这里插入图片描述

2 MapReduce体系结构

MapReduce体系结构主要由四个部分组成,分别是:Client、JobTracker、TaskTracker以及Task。
在这里插入图片描述
(1)Client

  • 用户编写的MapReduce程序通过Client提交到JobTracker端
  • 用户可通过Client提供的一些接口查看作业运行状态

(2)JobTracker

  • JobTracker负责资源监控和作业调度
  • JobTracker 监控所有TaskTracker与Job的健康状况,一旦发现失败,就将相应的任务转移到其他节点
  • JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器(TaskScheduler),而调度器会在资源出现空闲时,选择合适的任务去使用这些资源

(3)TaskTracker

  • TaskTracker 会周期性地通过“心跳”将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker 发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)
  • TaskTracker 使用“slot”等量划分本节点上的资源量(CPU、内存等)。一个Task 获取到一个slot 后才有机会运行,而Hadoop调度器的作用就是将各个TaskTracker上的空闲slot分配给Task使用。slot 分为Map slot 和Reduce slot 两种,分别供MapTask 和Reduce Task 使用

(4)Task

  • Task 分为Map Task 和Reduce Task 两种,均由TaskTracker 启动

3 MapReduce工作流程

3.1 工作流程概述

在这里插入图片描述

  • 不同的Map任务之间不会进行通信
  • 不同的Reduce任务之间也不会发生任何信息交换
  • 用户不能显式地从一台机器向另一台机器发送消息
  • 所有的数据交换都是通过MapReduce框架自身去实现的

3.2 MapReduce各个执行阶段

在这里插入图片描述
关于Split(分片)
在这里插入图片描述
HDFS 以固定大小的block 为基本单位存储数据,而对于MapReduce 而言,其处理单位是split。split 是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。

Map任务的数量
Hadoop为每个split创建一个Map任务,split 的多少决定了Map任务的数目。大多数情况下,理想的分片大小是一个HDFS块。

Reduce任务的数量
最优的Reduce任务个数取决于集群中可用的reduce任务槽(slot)的数目。
通常设置比reduce任务槽数目稍微小一些的Reduce任务个数(这样可以预留一些系统资源处理可能发生的错误)。

3.3 Shuffle过程详解

(1)Shuffle过程简介
在这里插入图片描述
(2)Map端的Shuffle过程

在这里插入图片描述
(3)Reduce端的Shuffle过程
在这里插入图片描述

  • Reduce任务通过RPC向JobTracker询问Map任务是否已经完成,若完成,则领取数据
  • Reduce领取数据先放入缓存,来自不同Map机器,先归并,再合并,写入磁盘
  • 多个溢写文件归并成一个或多个大文件,文件中的键值对是排序的
  • 当数据很少时,不需要溢写到磁盘,直接在缓存中归并,然后输出给Reduce

3.4 MapReduce应用程序执行过程

在这里插入图片描述
(1)程序部署
(2)分配Map任务和Reduce任务
(3)读入数据
(4)本地写数据
(5)远程读数据
(6)写数据

输入输出在HDFS上,中间结果写入磁盘。

4 实例分析:WordCount

4.1 WordCount程序任务

在这里插入图片描述

4.2 WordCount设计思路

  • 首先,需要检查WordCount程序任务是否可以采用MapReduce来实现
  • 其次,确定MapReduce程序的设计思路
  • 最后,确定MapReduce程序的执行过程

4.3 一个WordCount执行过程的实例

在这里插入图片描述
在这里插入图片描述

5 MapReduce的具体应用

MapReduce可以很好地应用于各种计算问题:

  • 关系代数运算(选择、投影、并、交、差、连接)
  • 分组与聚合运算
  • 矩阵-向量乘法
  • 矩阵乘法

应用:用MapReduce实现关系的自然连接
在这里插入图片描述
假设有关系R(A,B)和S(B,C),对二者进行自然连接操作。

使用Map过程,把来自R的每个元组<a,b>转换成一个键值对<b, <R,a>>,其中的键就是属性B的值。把关系R包含到值中,这样做使得我们可以在Reduce阶段,只把那些来自R的元组和来自S的元组进行匹配。类似地,使用Map过程,把来自S的每个元组<b,c>,转换成一个键值对<b,<S,c>>。

所有具有相同B值的元组被发送到同一个Reduce进程中,Reduce进程的任务是,把来自关系R和S的、具有相同属性B值的元组进行合并。

Reduce进程的输出则是连接后的元组<a,b,c>,输出被写到一个单独的输出文件中。

在这里插入图片描述

6 MapReduce编程实践

6.1 任务要求
在这里插入图片描述
6.2 编写Map处理逻辑

  • Map输入类型为<key,value>
  • 期望的Map输出类型为<单词,出现次数>
  • Map输入类型最终确定为<Object,Text>
  • Map输出类型最终确定为<Text,IntWritable>
public static class MyMapper extends Mapper<Object,Text,Text,IntWritable>{
	                private final static IntWritable one = new IntWritable(1); 
	                private Text word = new Text(); 
	                public void map(Object key, Text value, Context context) throws IOException,InterruptedException{  
	                        StringTokenizer itr = new StringTokenizer(value.toString());  
	                        while (itr.hasMoreTokens())
	                        {  
	                                word.set(itr.nextToken());  
	                                context.write(word,one);  
	                        }  
	                }  
        } 

6.3 编写Reduce处理逻辑

  • 在Reduce处理数据之前,Map的结果首先通过Shuffle阶段进行整理
  • Reduce阶段的任务:对输入数字序列进行求和
  • Reduce的输入数据为<key,Iterable容器>
    在这里插入图片描述
public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ 
	                private IntWritable result = new IntWritable(); 
	                public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException{ 
	                        int sum = 0; 
	                        for (IntWritable val : values) 
	                        {
	                                sum += val.get(); 
	                        }
	                        result.set(sum); 
	                        context.write(key,result); 
	                } 
        } 

6.4 编写main方法

public static void main(String[] args) throws Exception{  
                Configuration conf = new Configuration();  //程序运行时参数
                String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();  
                if (otherArgs.length != 2)  
                {      System.err.println("Usage: wordcount <in> <out>");  
                        System.exit(2);  
                }  
                Job job = new Job(conf,"word count");  //设置环境参数
                job.setJarByClass(WordCount.class);  //设置整个程序的类名
                job.setMapperClass(MyMapper.class);  //添加MyMapper类
                job.setReducerClass(MyReducer.class);  //添加MyReducer类
                job.setOutputKeyClass(Text.class);  //设置输出类型
                job.setOutputValueClass(IntWritable.class);  //设置输出类型
                FileInputFormat.addInputPath(job,new Path(otherArgs[0]));  //设置输入文件
                FileOutputFormat.setOutputPath(job,new Path(otherArgs[1])); //设置输出文件
                System.exit(job.waitForCompletion(true)?0:1);  
        }
import java.io.IOException;  
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount{  
	        public static class MyMapper extends Mapper<Object,Text,Text,IntWritable>{  
	                private final static IntWritable one = new IntWritable(1);  
	                private Text word = new Text();  
	                public void map(Object key, Text value, Context context) throws IOException,InterruptedException{  
	                        StringTokenizer itr = new StringTokenizer(value.toString());  
	                        while (itr.hasMoreTokens()){  
	                                word.set(itr.nextToken());  
	                                context.write(word,one);  
	                        }  
	                }  
	        }  	  
	        public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable>{  
	                private IntWritable result = new IntWritable();  
	                public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException{  
	                        int sum = 0;  
	                        for (IntWritable val : values)  
	                        {  
	                                sum += val.get();  
	                        }  
	                        result.set(sum);  
	                        context.write(key,result);  
	                }  
	        }  
	        public static void main(String[] args) throws Exception{  
	                Configuration conf = new Configuration();  
	                String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();  
	                if (otherArgs.length != 2)  
	                {  
	                        System.err.println("Usage: wordcount <in> <out>");  
	                        System.exit(2);  
	                }  
	                Job job = new Job(conf,"word count");  
	                job.setJarByClass(WordCount.class);  
	                job.setMapperClass(MyMapper.class);  
	                job.setReducerClass(MyReducer.class);  
	                job.setOutputKeyClass(Text.class);  
	                job.setOutputValueClass(IntWritable.class);  
	                FileInputFormat.addInputPath(job,new Path(otherArgs[0]));  
	                FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));  
	                System.exit(job.waitForCompletion(true)?0:1);  
	        }  
}  

6.5 编译打包代码以及运行程序

实验步骤:

  • 使用java编译程序,生成.class文件
  • 将.class文件打包为jar包
  • 运行jar包(需要启动Hadoop)
  • 查看结果

6.6 Hadoop中执行MapReduce任务的几种方式

  • Hadoop jar
  • Pig
  • Hive
  • Python
  • Shell脚本

在解决问题的过程中,开发效率、执行效率都是要考虑的因素,不要太局限于某一种方法

详细编程实践指南请参考厦门大学数据库实验室建设的中国高校大数据课程公共服务平台的技术文章:《大数据原理与应用 第七章 MapReduce 学习指南》,访问地址:http://dblab.xmu.edu.cn/blog/631-2/

其他相关笔记:
数据研发学习笔记4.1:大数据之概述与处理架构Hadoop
数据研发学习笔记4.2:大数据之分布式文件系统HDFS
数据研发学习笔记4.3:大数据之分布式数据库HBase
数据研发学习笔记4.4:大数据之NoSQL数据库
数据研发学习笔记4.5:大数据之云数据库
数据研发学习笔记4.6:大数据之MapReduce
数据研发学习笔记4.7:大数据之数据仓库Hive
数据研发学习笔记4.8:大数据之Spark
数据研发学习笔记4.9:流计算
数据研发学习笔记4.10:图计算
数据研发学习笔记4.11:大数据之数据可视化


版权声明:本文为weixin_41961559原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。