整个Map阶段流程大体如上图所示。
简单概述:
inputFile通过split被逻辑切分为多个split文件,通过Record按行读取内容给 map(用户自己实现的)进行处理,数据被map处理结束之后交给OutputCollector收集 器,对其结果key进行分区(默认使用hash分区),然后写入buffer,每个map task都有 一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一 个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所 有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据
详细步骤
- 读取数据组件 InputFormat (默认 TextInputFormat) 会通过 getSplits 方法对输入目录中文件进行逻辑切片规划得到 splits, 有多少个 split 就对应启动多少个MapTask . split 与 block 的对应关系默认是一对一
- 将输入文件切分为 splits 之后, 由 RecordReader 对象 (默认是LineRecordReader) 进行读取, 以 \n 作为分隔符, 读取一行数据, 返回 <key,value> . Key 表示每行首字符 偏移值, Value 表示这一行文本内容
- 读取 split 返回 <key,value> , 进入用户自己继承的 Mapper 类中,执行用户重写 的 map 函数, RecordReader 读取一行这里调用一次
- Mapper逻辑结束之后,将Mapper的每条结果通过context.write进行collect数据收集. 在 collect 中, 会先对其进行分区处理,默认使用 HashPartitioner。MapReduce 提供 Partitioner 接口, 它的作用就是根据 Key 或 Value 及 Reducer 的数量来决定当前的这对输出数据最终应该交由哪个 Reduce task 处理, 默认对 Key Hash 后再以 Reducer 数量取模. 默认的取模方式只是为了平均 Reducer 的处理能力, 如果用户自己对 Partitioner 有需求, 可以订制并设 置到Job上. 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集 Mapper 结果, 减少磁盘 IO 的影响. 我们的 Key/Value 对以及 Partition 的结果都会被写入缓冲区. 当然, 写入之前,Key 与 Value 值都会被序列化成字节数组。环形缓冲区其实是一个数组, 数组中存放着 Key, Value 的序列化数据和 Key, Value 的元数据信息, 包括 Partition, Key 的起始位置, Value 的起始位置以及 Value 的长度.环形结构是一个抽象概念。缓冲区是有大小限制, 默认是 100MB. 当 Mapper 的输出结果很多时, 就可能 会撑爆内存, 所以需要在一定条件下将缓冲区中的数据临时写入磁盘, 然后重 新利用这块缓冲区. 这个从内存往磁盘写数据的过程被称为 Spill, 中文可译为溢写. 这个溢写是由单独线程来完成, 不影响往缓冲区写 Mapper 结果的线程. 溢写线程启动时不应该阻止 Mapper 的结果输出, 所以整个缓冲区有个溢写的比例 spill.percent . 这个比例默认是 0.8, 也就是当缓冲区的数据已经达到 阈值 buffer size * spill percent = 100MB * 0.8 = 80MB , 溢写线程启动, 锁定这 80MB 的内存, 执行溢写过程. Mapper 的输出结果还可以往剩下的 20MB 内存中写, 互不影响
- 当溢写线程启动后,需要对这80MB空间内的Key做排序(Sort).排序是MapReduce 模型默认的行为, 这里的排序也是对序列化的字节做的排序。如果 Job 设置过 Combiner, 那么现在就是使用 Combiner 的时候了. 将有相 同 Key 的 Key/Value 对的 Value 加起来, 减少溢写到磁盘的数据量. Combiner 会优化 MapReduce 的中间结果, 所以它在整个模型中会多次使用。那哪些场景才能使用 Combiner 呢? 从这里分析, Combiner 的输出是 Reducer 的输入, Combiner 绝不能改变最终的计算结果. Combiner 只应该用于那种 Reduce 的输入 Key/Value 与输出 Key/Value 类型完全一致, 且不影响 最终结果的场景. 比如累加, 最大值等. Combiner 的使用一定得慎重, 如果用好, 它对 Job 执行效率有帮助, 反之会影响 Reducer 的最终结果
- 合并溢写文件,每次溢写会在磁盘上生成一个临时文件(写之前判断是否有Combiner), 如果 Mapper 的输出结果真的很大, 有多次这样的溢写发生, 磁盘上相应的就会有多个 临时文件存在. 当整个数据处理结束之后开始对磁盘中的临时文件进行 Merge 合并, 因为最终的文件只有一个, 写入磁盘, 并且为这个文件提供了一个索引文件, 以记录每个 reduce对应数据的偏移量
版权声明:本文为iCloser原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。