文章目录
Hadoop系列文章
Hadoop(一):在CentOS中搭建hadoop环境(伪分布式)
Hadoop(二):完全分布式搭建(腾讯云服务器+阿里云服务器)
MapReduce概念
Hadoop MapReduce是一个软件框架,基于该框架能够容易地编写应用程序,这些应用程序能够运行在由上千个商用机器组成的大集群上,并以一种可靠的,具有容错能力的方式并行地处理上TB级别的海量数据集。
MapReduce擅长处理大数据,思想就是分而治之。
Mapper负责“分”,即把复杂的任务分解为若干个简单的任务来处理:
- 缩小原任务的数据计算规模。
- 就近计算原则,任务会分配到存放着所需数据的节点上进行计算。
- 这些小任务可以并行计算,彼此间几乎没有依赖关系
Reducer负责对map阶段的结果进行汇总,Reducer个数,通过mapred-site.xml配置文件里设置参数mapred.reduce.tasks的值,默认值为1。
MapReduce工作原理
MapReduce这个术语来自两个基本的数据转换操作:map过程和reduce过程。
map过程:
即键值对的转换。map操作会将集合中的元素从一种形式转化成另一种形式,在这种情况下,输入的键值对会被转换成零到多个键值对输出。
reduce过程:
某个键的所有键值对都会被分发到同一个reduce操作中,也就是说,这个键和这个键所对应的所有值都会被传递给同一个Reducer。reduce过程的目的是将值的集合转换成一个值(例如求和或者求平均),或者转换成另一个集合。这个Reducer最终会产生一个键值对。
MapReduce架构
在MapReduce中,用于执行MapReduce任务的机器角色有两个:JobTracker和TaskTracker。其中JobTracker是用于调度工作的,TaskTracker是用于执行工作的。一个Hadoop集群中只有一台JobTracker。

当Client向JobTracker提交一个作业时,JobTracker会将作业拆分到多个TaskTracker去执行,TaskTracker会定时向JobTracker发送心跳信息,如果一段时间JobTracker未收到该TaskTracker的心跳信息,则认定该TaskTracker出现故障,会将该TaskTracker的任务分配给其他TackTracker。
MapReduce执行过程

- 客户端启动一个job
- 向JobTracker请求一个JobID。
- JobClient将运行作业所需要的资源复制到HDFS上,包括JAR文件、配置文件和客户端计算所得的输入划分信息。这些文件存放在以JobID为名的文件夹中。输入划分信息告诉JobTracker应该为这个作业启动多少个map任务等。
- JobTracker根据自己的调度算法调度该作业,根据输入划分信息为每个划分创建一个map任务,并将map任务分配给TaskTracker执行。
- TaskTracker每隔一段时间给JobTracker发送一个Heartbeat告诉JobTracker它仍然在运行,同时心跳还携带很多比如map任务完成的进度等信息。当JobTracker收到作业的最后一个任务完成信息时,便把作业设置成“成功”,JobClient再传达信息给用户。
MapReduce中的Map和Reduce
Map
MapReduce中的每个map任务可以细分4个阶段:record reader、mapper、combiner和partitioner。Map任务的输出被称为中间键和中间值,会被发送到Reducer做后续处理。
Reduce
reduce任务可以分为4个阶段:混排(shuffle)、排序(sort)、reducer和输出格式(output format)。

根据上图,结合下面例子,来具体分析Map和Reduce任务。
统计文件中单词个数
chinese,math,english,physics
math,chinese,chemistry,geography
biology,chinese,history,chinese
具体步骤如下:
Map任务处理
MapReduce读取HDFS中的文件并进行切割,将每一行解析成一个<k,v>,得到输入key-value对应为 “偏移量-本行数据”:
(0,"chinese math english physics") (26,"math chinese chemistry geography") (56,"biology,chinese,history,chinese")偏移量实际是该行起始的数据长度索引,可以理解为行号,例如第一行偏移量为0,数据25byte,则第二行偏移量为26。
Map阶段第二步执行我们实现的接口算法,并将结果的key-value(单词-每行的词频 如 chinese,1 )存入环形缓冲内存中(溢出的部分存入磁盘)。整个Map阶段都是完全并行执行的。具体数据如下:
(chinese,1),(math,1),(english,1),(physics,1) (math,1),(chinese,1),(chemistry,1),(geography,1) (biology,1),(chinese,1),(history,1),(chinese,1)Map任务将不同分区中的数据进行排序、分组,相同的 k 放到同一个集合中(溢出部分的数据存入本地磁盘),结果如下:
(chinese,[1,1,1,1]),(math,[1,1]),(english,[1]),(physics,[1]),(chemistry,[1]),(geography,[1]),(biology,[1]),(history,[1])Map任务对分组后的数据进行归约(可选),对每个分片的结果再次进行汇总成为一个最终结果。
注意: 通常一个分片对应hadoop中存储的一个块,即128M,这也可以避免载入内存文件过大,撑爆内存
Reduce任务处理
Reduce任务先处理多个Map任务的输出结果,再根据分区将其分配到不同的Reduce节点上(这个过程就是shuffle);
Reduce任务对多个Map的输出结果进行合并、排序、计算,生成新的 (k,v) 值,具体如下:
(chinese,4),(nath,2),(english,1),(physics,1),(chemistry,1),(geography,1),(biology,1),(history,1)Reduce任务会将上一步输出的<k,v>写到HDFS中,生成文件。
Shuffle执行过程
Shuffle过程横跨map与reduce两端的。在Hadoop这样的集群环境中,大部分map task与reduce task的执行是在不同的节点上。当然很多情况下Reduce执行时需要跨节点去拉取其它节点上的map task结果。
从最基本的要求来说,我们对Shuffle过程的期望可以有:
完整地从map task端拉取数据到reduce 端。
在跨节点拉取数据时,尽可能地减少对带宽的不必要消耗。
减少磁盘IO对task执行的影响。
Map端Shuffle执行过程

整个流程大概可以分为4步,简单些可以这样说,每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。
在map task执行时,它的输入数据来源于HDFS的block,也就是split。假设WordCount的例子中,map输入的字符出是
ericray。经过Mapper运行后,会输出一个键值对。即
key = 'ericray', value = '1',当前的map端只做+1的操作,在reduce task中才做合并操作。如果说这个job有3个reduce task,那么当前这个字符串应该交给哪个reduce去做,由MapReduce提供Partitioner接口来处理。Partitioner接口的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力。
接下来,key与value值都会被序列化成字节数组,key/value对以及Partition的结果都会被写入缓冲区。缓冲区的作用是批量收集map结果,减少磁盘IO的影响。
内存缓冲区的大小默认为100M。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill。这个溢写是由单独线程来完成。
整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。
当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。
合并溢写文件。每次溢写会在磁盘上生成一个溢写文件,如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个溢写文件存在。当map task真正完成时,内存缓冲区中的数据也全部溢写到磁盘中形成一个溢写文件。最终磁盘中会至少有一个这样的溢写文件存在(如果map的输出结果很少,当map执行完成时,只会产生一个溢写文件),因为最终的文件只有一个,所以需要将这些溢写文件归并到一起,这个过程就叫做Merge。
最终生成的这个文件也存放在TaskTracker够得着的某个本地目录内。每个reduce task不断地通过RPC从JobTracker那里获取map task是否完成的信息,如果reduce task得到通知,获知某台TaskTracker上的map task执行完成,Shuffle的后半段过程开始启动。
Reduce端Shuffle执行过程
简单地说,reduce task在执行之前的工作就是不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge,也最终形成一个文件作为reduce task的输入文件。

Copy过程,简单的拉取数据。
Merge过程,与map 端类似,这也是溢写的过程。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle用。
merge有三种形式:1)内存到内存 2)内存到磁盘 3)磁盘到磁盘。
默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的那个文件。
Reducer的输入文件。不断地merge后,最后会生成一个最终文件。默认情况下,这个文件是存放于磁盘中的。当Reducer的输入文件已定,整个Shuffle才最终结束。然后就是Reducer执行,把结果放到HDFS上。
MapReduce详细流程图

参考博客:https://www.jianshu.com/p/f4abfe0170