MapReduce——MR的执行流程(五次IO)
一个完整的mapreduce程序在分布式运行时有三类实例进程:
MRAppMaster:负责整个程序的过程调度及状态协调
mapTask:负责map阶段的整个数据处理流程
(maptask的数量是不能设置的,reducetask可以自己设置job.setNumReduceTasks(5);)
ReduceTask:负责reduce阶段的整个数据处理流程
对于MR的执行流程,我是通过其过程中的五次IO来记忆的,详细解释如下(最后有流程详图):
第一次io
1、在一个MR程序的启动的时候,最先启动的是MRAppMaster,MRAppMaster启动后根据本次job作业的描述信息,计算出需要的map task的实际数量,紧接着向集群申请服务器启动相应数量的map task进程。
1.1(读文件流程)从map task进程开始读取文件,是通过调用客户端指定的FileInputFormat(默认是TextInputFormat)生成一个RecordReader对象,调用RecordReader中的read()方法,然后它会,按行读取block块里面的内容(一次读一行)。
maptask会调用nextkeyvalue()方法,通过getCurrentKey,getCurrentValue得到k,v的键值对数据,返回(key,value)。
1.2(切片流程)通过调用getSplits()方法把输入文件按照一定的标准进行分片,每个输入片是由每个map task进程执行的,其大小是固定的。默认情况下,输入片(inputSplit)和数据块(block)是相同的,如果数据块Block的默认大小是64M,但是现在有俩个输入文件,一个是32M,一个是72M,那么小的文件是一个输入片,大的文件会分为俩个数据块Block,分成俩个输入片。一共生成三个输入片,每个输入片由一个Mapper进程处理,这里则由三个Mapper进程处理。
这边有俩个题外话,
a、如何控制切片的大小?
b、针对切片过程中,如果有一行数据被切割,正好在中间切割,例如:hello正被切成he和llo,这该怎么办?
在读取过程中,读取器不会管你是不是断开,它是整行读取,所以,提供以下办法:
解决策略(读取策略):每个maptask向下多读一行;
每个maptask都要抛弃读取的第一行数据;
第一个maptask不需要抛弃第一行;
最后一个maptask不需要读取下一行;
2、2.1 将上一步读文件流程中获取的(key,value)键值对经过Mapper的map方法逻辑处理形成新的(K,V)键值对,
2.2通过context.write输出到OutPutCollector收集器;
3、OutPutCollector收集器把收集到的(K,V)键值对写入环形缓冲区(环形缓冲区的默认大小是100M,环形,前面写入,后面有组件不断地清理,防止溢出),当里面的数据达到80%时将进行溢写
(但是在溢写前需要对数据进行分区排序,就是会对环形缓冲区的每个(k,v)键值对哈希一个partition值,然后相同partition值为同一个分区,这里分区的数量就是reduce任务运行的数量。然后把环形缓冲区中的数据按照partition和key值俩个关键字进行升序排序,同一个partition内的按照key值排序)。
在这里,MRAppMaster监控到所有的maptRask进程任务都完成后,就会根据用户指定的参数启动相应的数量的reducetask任务进程,并告知reducetask要处理的数据范围(数据分区)。
环形缓冲区中排序后的内存数据不断溢出到本地磁盘文件(每次溢出一个文件),如果map阶段处理的数据量较大,可能会溢出多个文件。
第二次io
溢出的多个文件会被merge合并成大的溢出文件,合并采用归并排序(溢写的文件也可以进行combiner操作,前提是汇总操作,求平均值不行),在溢出过程及合并过程中,都要用partitioner分区器进行分组,并且根据key进行排序,所以合并的maptask最终结果文件还是分区且分区有序的
第三次io
当reducetask任务进程启动后,它会根据自己的分区号,通过MRAppMaster告知其待处理数据所处位置,去各个maptask节点上copy拷贝相同partition的数据到reduce task 本地磁盘
第四次io
reduce task会把同一分区的来自不同maptask的结果文件,在进行merge再次合并成一个大文件,并归并排序(内部k,v按照看k有序),所以大文件也是有序的。
第五次io
在并成大文件之后,shuffle的过程结束,进入reducetask的逻辑运算过程,首先调用groupingComparator对大文件进行分组,从文件中调用一个个(k,v)键值对group(每次取出一组),调用用户自定义的reduce方法进行逻辑处理
最后通过OutPutFormat方法将结果数据写到part-r-000X结果文件中。
根据下面的截图对应上图理解