MR是一个分布式运算框架,主要分为Map和reduce两个阶段,map负责把一个任务分解成多个任务,reduce负责把分解后多个任务的处理结果汇总
优缺点
① 易编程,简单的实现一些接口,就可以完成一个分布式程序,可以分布到大量机器上运行.
② 易扩展, 当计算力不够的时候,可以通过简单的增加机器来扩展计算能力
③ 高容错, 当一个机器挂了,可以把上面的计算任务转移到另一个节点上,不至于任务失败
④ 大数据量, 可以实现上千台服务器集群并发工作,提供数据处理能力
缺点;
① 不适合实时
② 不适合流式处理,流式处理数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化
③ 不适合DAG(有向图)计算, 多个程序存在依赖关系,后一个应用程序的输入为前一个的输出,在这种情况下,MapReduce的作业输出都会写入到磁盘,会造成大量的磁盘IO,导致性能低下MapReduce处理思想

① 第一个阶段的MapTask并发实例,完全并行运行,互不干涉
② 第二个阶段的ReduceTask并发实例互不相关,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出
③ MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑复杂,那只能多个MapReduce程序,串行运行MapReduce进程
一个完整的MapReduce程序在分布式运行时有三类实例进程
① MrAppMaster: 负责整个程序的过程调度及状态协调.
② Map Task: 负责Map阶段的整个数据处理流程
③ ReduceTask: 负责Reduce阶段的整个数据处理流程常用数据序列化类型
Java类型 Hadoop Writable类型 boolean BooleanWritable byte ByteWritable int IntWritable float FloatWritable long LongWritable double DoubleWritable String Text map MapWritable array ArrayWritableMapReduce 编程
编写MapReduce程序分为三个部分: Mapper Reducer Driver
①Mapper
用户自定义的Mapper要继承自己的父类,Mapper的输入数据是KV对的形式,KV的类型可以自定义,Mapper中的业务逻辑写在map()方法中,Mapper的输出数据是KV对的形式(可自定义KV类型),map()方法对每个<K,V>
② Reducer阶段
用户自定义的reducer要继承自己的父类,Reducer的输入数据类型对应Mapper的输出数据类型,也是KV,Reducer的业务逻辑写在reduce()方法中,ReduceTask进程对每一组形同K的<K,V>组调用一次reduce()方法
③ Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到Yarn集群,提交的是封装了MapReduce程序相关运行参数的job对象序列化
序列化就是吧内存中的对象转换成字节序列(或者其他数据传输协议)以便于持久化到磁盘和网络传输.反序列化就是将收到的字节序列(或者其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象.
Java的序列化是一个重量级的序列化框架(Serializable), 一个对象序列化后,会附带很多额外的信息(各种校验信息,集成体系等),不便于在网络中高效传输,所以,Hadoop自己开发了一套序列化机制(Writable).
优点: ① 紧凑,高效的使用存储空间
② 快速,读写数据的额外开销小
③ 可扩展,可以随着通信协议的升级而升级
④ 互操作,支持多语言的交互
自定义bean对象实现序列化接口(Writable)
① 实现Writable
② 反序列化时,需要反射调用空参构造函数,所以必须有空参构造器
③ 重写序列化方法: write()
④ 重写反序列化方法: readFields()
⑤ 注意反序列化的顺序和序列化的顺序完全一致
⑥ 要想输出结果到文件中,需要重写toString()方法,使用分隔符将字段隔开
⑦ 如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce中的shuffle过程要求对key必须能排序.MapReduce原理
9.1 InputFormat 数据输入
① 切片和MapTask并行度
数据块: Block是HDFS物理上把数据分成一块一块的
数据切片: 数据切片只是在逻辑上对输入进行切片,并不会在磁盘上将其切分成片进行存储
一个job的Map阶段并行度由客户端在提交job时的切片数决定,每个split切片分配一个Maptask并行实例处理,默认情况下,切片大小=BlockSize,切片时不考虑数据集的整体,而是逐个针对每个文件单独切片
② job提交流程关键点
waitForCompletion()–>job.submit()
建立连接: connect() --> 创建代理: new Cluster(getConfiguration())–>判断模式: initialize(jobTrackAddr,conf)
submitter.submitJobInternal(Job.this,cluster)
创建Stag路径: Path jobStagingArea=JobSubmissionFiles.getStagingDir(cluster,conf)
创建job路径: JobID jobId = submitClient.getNewJobID()
拷贝jar包到集群: copyAndConfigureFiles(job,submitJobDir);rUploader.uploadFiles(job,jobSubmitDir);
计算切片: writeSplits(job,submitJobDir);maps=writeNewSplits(job,jobSubmitDir);input.getSplits(job)
向Stag路径写XML配置文件: writeConf(conf,submitJobFile); conf.writeXml(out);
提交job,返回提交状态: status = submitClient.submitJob(jobId,submitJobDir.toString(),job.getCredentials());
③ input.getSplit(job) 切片关键源码
程序先找到数据存储目录,开始遍历目录下的每个文件,规划切片
先获取文件大小: fs.sizeOf(xxx.txt)
计算切片大小: computeSplitSize(Math.max(minSize,Math.min(maxSize,blocksize)))
默认情况下,切片大小=blocksize
每次切片都判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片
将切片信息写到一个切片规划文件里,inputSplit 只记录了切片的元数据信息,如起始位置,长度,所在节点列表等
提交切片规划文件到yarn上,yarn上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数
④ 切片参数
切片计算公式: Math.max(minSize, Math.min(maxSize,blockSzie))
mapreduce.input.fileinputformat.split.minsize=1 (默认值1)
mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue (默认值)
所以,默认情况下,切片大小=blockSize
maxsize(切片最大值) : 参数如果调的比blockSize小,则会让切片变小,而且等于设置的maxsize值
minsize(切片最小值): 参数如果调的比blockSize大,则会让切片变大,就是设置的minsize值
获取切片信息: String name= inputSplit.getPath().getName();
FileSplit inputSplit = (FileSplit) context.getInputSplit();
⑤ CombineTextInputFormat 切片机制
框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,如果有大量小文件的话,就会产生大量的MapTask.
CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理.
CombineTextInputFormat.setMaxInputSplitSize(job,splitsize);
切片过程: 虚拟存储过程 + 切片过程
虚拟存储过程: 将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的值,逻辑上划分一个块.如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片).例如 设置的值为4M,有个输入文件为8.2M,首先逻辑上切分4M,剩余4.2M,如果按照逻辑切分,则会产生一个0.2M的文件,所以将剩下的4.2M文件切分成2.1M+2.1M 两个文件.
切片过程: 判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独一个切片
如果不大于则跟下一个虚拟存储文件合并,共同形成一个切片.如上图,6个小文件最终形成了三个切片
代码: 在驱动类中增加代码
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, splitSize);
⑥ FileInputformat的实现类
Fileinputformat常见的接口实现类包括: TextInputFormat KeyValueTextInputFormat NLineInputFormat CombineTextInputFormat 自定义InputFormat等
1) TextInputFormat
默认的实现类,按行读取每条记录,键是存储该行在整个文件中的起始字节偏移量,LongWritable类型,值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型
2) KeyValueTextInputFormat
每行为一条记录,被分隔符分割为Key,value,可以通过在驱动类中设置分隔符,默认分隔符是tab
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,"\t");
3) NLineInputFormat
如果使用NLineInputFormat ,则代表每个map进程处理的InputSplit不再按Block块去划分,而是按照NLineInputFormat指定的行数N来划分,即输入文件的总行数/N=切片数,如果不整除,切片数=商+1
4) 自定义InputFormat
首先自定义一个类继承FileInputFormat,需要重写isSplitable()方法,然后重写createRecordReader(),创建自定义的RecordReader对象并初始化.改写RecordReader,然后设置Driver
设置输入的inputFormat: job.setInputFormatClass(xxx.class);
设置输出的outputFormat: job.setOutputFormatClass(xxx.class);9.2 MapReduce工作流程


- 客户端submit()前,根据参数设置,形成任务分配规划,向yarn提交切片信息,jar包,任务XML文件
- Mr appmaster 根据提交的切片信息,计算出mapTask数量
- 读取文件,默认TextInputFormat,调用了RecordReader,reader()
- 读取的数据进入map方法,处理逻辑
- 处理完成后,Context.write(K,V),写到outputCollector,环形缓冲区
- 环形缓冲区一侧写元数据,另一侧写数据,元数据包括kstart,valstart,partition,index,数据包括key,value,unused. 默认大小100M,写数据到80%的时候,开始反向写入,分区排序后溢写到磁盘文件(分区且区内有序)
- 多个溢写小文件进行归并排序成一个大文件 MergeSort
- 如果使用了combiner组件,则会对数据进行合并
- 所有的maptask任务完成后,启动相应数量的reducetask,并告知reducetask处理数据范围(数据分区)
- reducetask将对应的maptask输出的文件数据拷贝到本地磁盘
- reducetask合并文件归并排序,然后将数据传入reduce(k,v)进行处理,然后context.write(k,v)
- 默认TextOutputFormat,调用RecordWriter,Write(k,v)
9.3 shuffle
map方法之后,reduce方法之前的数据处理过程称为shuffle.
1) Maptask收集map方法输出的kv 对,放入环形缓冲区
2) 从环形缓冲区中不断溢写出本地磁盘文件
3) 多个溢写文件会被合并成大的溢写文件
4) 在溢写过程和合并的过程中,都要调用Partitioner进行分区和针对key进行排序
5) reduceTask 根据自己的分区号,去各个maptask机器上获取响应的结果分区数据
6) reducetask会取到来自不同maptask的同一分区的文件,然后将这些文件再归并排序
7) 合并成大文件之后,shuffle阶段结束,之后进入reducetask的逻辑运算,reduce()
shuffle中的缓冲区大小会影响MapReduce的执行效率,原则上说,缓冲区越大,磁盘IO的次数越少,执行速度就越快,缓冲区的大小可以通过参数调整, io.sort.mb 默认100M
9.4 分区
默认partition分区: hash分区
自定义分区:① 自定义类继承Partitioner,重写getPartition()方法
② 在Job驱动中,设置自定义Partitioner. job.setPartitionerClass(xxx.class)
③ 自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask. job.setNumReduceTasks(5);
注意: ① 如果reducetask的数量 > getPartition的结果数,则会多产生介个空的输出文件
② 如果 1 < reducetask数量 < getPartition的结果数, 则有一部分分区数据无处安放,会Exception
③ 如果reducetask的数量=1,则不管maptask端输出多少个分区文件,最终结果都交给一个reducetask,最终也产生了一个结果文件part-r-00000;
④ 分区号从零开始,逐一累加
9.5 排序
maptask和reducetask均会对数据按照key进行排序,该操作属于hadoop默认行为,默认排序是按照字典顺序排序,且实现该排序的方法是快速排序.
maptask会将处理的结果暂时放到环形缓冲区,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序.
reducetask,从每个maptask上远程拷贝响应的数据文件,如果文件大小超过一定阈值,则溢写到磁盘上,否则存储在内存中. 如果磁盘上的文件达到一定阈值,则进行一次归并排序生成一个更大文件,如果内存中文件大小或数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上,当所有数据拷贝完毕后,reducetask统一对内存和磁盘上的所有数据进行一次归并排序.
辅助排序(GroupingComparator): 在reduce端对key进行分组
自定义排序: 实现WritableComparable接口重写compareTo方法
9.6 Combiner合并
Combiner是MR程序中Mapper和Reducer之外的一种组件,父类是Reducer.
Combiner和reducer的区别在于运行的位置,Combiner是在每个MapTask所在的节点运行,Reducer是接收全局所有的Mapper的输出结果.
Combiner的意义就是对每个MapTask的输出进行局部汇总,以减少网络传输量.
Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reducer的输入KV类型对应起来.
自定义Combiner: 继承Reducer,重写Reduce方法,然后再驱动类中设置,job.setCombinerClass(xxx.class)
9.7 辅助排序(分组排序 GroupingComparator)
对Reduce阶段的数据根据某一个或几个字段进行分组.
分组排序: ① 自定义类继承 WritableComparator
② 重写compare()方法
③ 创建一个构造将比较对象的类传给父类
在驱动类中设置: job.setGroupingComparatorClass(xxxx.class)
9.8 maptask工作机制
- Read阶段: MapTask通过RecordReader,从输入InputSplit中解析出一个个key/value
- Map阶段: 该阶段主要是将解析出的key/value交给map()方法处理
- Collect阶段: 在map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果,在该函数内部,将生成的key/value写入环形缓冲区
- 溢写阶段: 当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件,在写入磁盘之前,会对数据进行一次本地排序,并在必要时对数据进行合并 压缩等操作.
① 利用快速排序对缓冲区内的数据进行排序,先按照分区编号Partition进行排序,然后按照key进行排序.这样,数据分区聚集且分区内有序
② 按照分区编号由小到大一次将每个分区中的数据写入任务工作目录下临时文件中,如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作
③ 将分区数据的元信息写到内存索引结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量,压缩前数据大小和压缩后数据大小,如果当前内存索引大小超过1MB,则将内存索引写入文件output/spillN.out.index中. - Combine阶段: 当所有数据处理完成后,MapTask对所有的临时文件进行一次合并,确保最终只会生成一个数据文件.并保存到文件output/file.out中,同时生成对应的索引文件output/file.out.index
在进行文件合并的过程中,MapTask以分区为单位进行合并,对于某个分区,它将采用多轮递归合并的方式,每轮合并io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件.
让每个Maptask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销.
9.9 ReduceTask工作机制
- copy阶段: ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘,否则直接放到内存中
- Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘文件过多
- sort阶段: 用户编写的reduce()函数输入数据是按照key进行聚集的一组数据,为了将相同key的数据聚在一起,hadoop采用了基于排序的策略,由于在map阶段,结果数据已经是局部排序了,因此,reducetask只需要对所有数据进行一次归并排序即可.
- reduce阶段: reduce()函数将计算结果写到HDFS上.
reducetask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量可以手动设置.默认值是1
job.setNumReduceTasks(x);
① ReduceTask=0,表示没有Reduce阶段,输出文件个数和Map个数一致
② ReduceTask默认值就是1,所以输出文件个数为一
③ 如果数据分布不均匀,就有可能在Reduce阶段产生数据倾斜
④ ReduceTask数量并不是任意设置,还要考虑业务逻辑需求
⑤ 具体需要多少ReduceTask,还需要考虑集群性能
⑥ 如果分区数不是1,但是ReduceTask为1,是否执行分区过程(不执行,在MapTask源码中,执行分区的前提是先判断ReduceNum个数是否大于1,不大于1肯定不执行)
outputFormat
OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口.常见的实现类:
1) TextOutputFormat
默认输出格式就是TextOutputFormat,它把每条记录写为文本行,它的键和值可以是任意类型,因为TextOutputFormat调用toString()方法把它们转换成字符串.
2) SequenceFileOutputFormat
将SequenceFileOutputFormat 输出作为后续MapReduce任务的输入.因为它的格式紧凑,很容易被压缩
3) 自定义OutputFormat
① 自定义一个类继承FileOutputFormat
② 改写RecordWriter,具体改写输出数据的方法write()
设置: job.setOutputFormatClass(xxx.class)join
reduce join
map端的主要工作: 为来自不同数据源的key/value对,打标签以区别不同来源,然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出.
reduce端的工作:在reduce端以连接字段作为key的分组已经完成,只需要在每个分组中将那些来源于不同文件的记录(在Map阶段已经打标签)分开,然后合并就可以了.
缺点: 这种方式,合并的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段容易产生数据倾斜.
解决方案: 在map端实现数据合并map join
map join适用于一张表很小,一张表很大的场景.
在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据压力,尽可能的减少数据倾斜.
①在mapper的setup阶段,将文件读取到缓存集合中,在驱动函数中加载缓存.
② job.addCacheFile(xxx)
MapReduce开发总结
1) 输入数据接口: InputFormat
2) 逻辑处理接口: Mapper ,需要实现三个方法: map() setup() cleanup()
3) Partitioner分区: 默认hash分区
4) Comparable排序: 当用自定义对象作为key时,必须实现WriteComparable接口,重写compareTo()方法
5) Combiner合并: 可以提高效率,减少IO,但是不能影响业务逻辑
6) Reduce端分组: GroupingComparator
7) 逻辑处理接口:Reducer ,需要实现三个方法: reduce() setup() cleanup()
8) 输出数据接口: OutputFormatHadoop数据压缩
压缩技术能有效减少HDFS读写字节数,提高网络带宽和磁盘空间的效率,在运行MR程序时,I/O操作,网络数据传输,shuffle和merge要花大量的时间,尤其是数据规模很大和工作负载密集的情况下,因此,使用数据压缩显得非常重要.
压缩是提高hadoop运行效率的一种优化策略.通过对Mapper.Reducer运行过程的数据进行压缩,减少磁盘IO,提高MR运行速度.
基本原则: 运算密集型job,少用压缩,IO密集型job,多用IOMR支持的压缩编码

对应的编码/解码器
- Gzip压缩
优点: 压缩率高,压缩/解压缩速度也比较快;hadoop本身支持,在应用中处理Gzip格式的文件就和直接处理文本一样,大部分Linux系统都自带Gzip命令,使用方便
缺点:不支持split
应用: 当每个文件压缩后在一个块大小左右,都可以考虑使用Gzip压缩格式 - Bzip2 压缩
优点: 支持split,具有很高的压缩率,比Gzip压缩率高,hadoop本身自带,使用方便
缺点: 压缩/解压速度慢
应用: 适合对速度要求不高,但需要很高的压缩率 - Lzo压缩
优点:压缩/解压速度也比较快,合理的压缩率,支持split,是hadoop中最流行的压缩格式;可以在Linux系统下安装lzop命令,使用方便
缺点: 压缩率比Gzip低一些,hadoop本身不支持,需要安装,在应用中对Lzo格式的文件需要做处理(为了支持split,需要建立索引,还需要指定InputFormat为Lzo格式)
应用: 一个很大的文本文件,压缩只有还大于200M以上的可以考虑,而且单个文件越大,Lzo的优点越明显 - Snappy压缩
优点: 高速压缩和合理的压缩率
缺点: 不支持Split;压缩率比Gzip低,hadoop本身不支持,需要安装
应用: 当MapReduce作业的Map输出的数据比较大的时候,作为Map到Reduce的中间数据的压缩格式;或者作为一个MapReduce作业的输出和另一个MapReduce作业的输入

- yarn资源调度器
Yarn是个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而MapReduce等运算程序则相当于操作系统之上的应用程序.
yarn 主要由ResourceManager,NodeManager,ApplicationMaster和Container等组件构成.
yarn的工作机制
- MR程序提交到客户端所在的节点
- YarnRunner向ResourceManager申请一个Application
- RM将该应用的资源路径返回给YarnRunner
- YarnRunner将运行所需资源提交到HDFS上
- 程序资源提交完毕后,申请运行mrAppMaster
- RM将用户的请求初始化成一个Task
- 其中一个NodeManager领取到Task任务
- 该NodeManager创建容器Container,并产生MRAppmaster
- Container从HDFS上拷贝资源到本地
- MRAppmaster想RM申请运行MapTask的资源
- RM将运行MapTask任务分配给另外的NodeManager,被分配任务的NodeManager分别领取任务并创建容器
- MR向接收到任务的NodeManger发送程序启动脚本,NodeManager分别启动MapTask
- MRAppmaster等待所有的MapTask运行完毕后,向RM申请容器,运行ReduceTask
- ReduceTask向MapTask获取相应分区数据
- 程序运行完毕后,MR向RM申请注销自己
作业提交过程
1) client调用job.waitForCompletion方法,向集群提交MapReduce作业
2) client向RM申请一个作业id
3) RM给client返回该job资源提交路径和作业id
4) client提交jar包,切片信息和配置文件到指定的资源提交路径
5) client提交完资源后,向RM申请运行MrAppMaster
6) 当RM收到client的请求后,将该job添加到容量调度器中
7) 某个空闲NodeManager领取到任务
8) 该NodeManager创建Container,并产生MRAppmaster
9) 下载client提交的资源到本地
10) MRAppmaster向ResourceManager申请运行多个MapTask的任务资源
11) ResourceManager将MapTask任务分配给其他NodeManager,领取到任务的NM分别创建container
12) MR向接收到任务的NM发送程序启动脚本,接收任务的NM分别启动MapTask
13) MrAppMaster等待所有的MapTask任务运行完毕后,向RM申请容器,运行ReduceTask
14) ReduceTask向MapTask获取相应分区的数据
15) 程序运行完毕后,MR会向RM申请注销自己
作业进度和状态更新
yarn中的任务将其进度和状态返回应用管理器,客户端每秒(通过 mapreduce.client.progressmonitor.pollinterval 设置)向应用管理器请求进度更新,展示给用户,客户端每5秒回通过调用waitForCompletion()来检查作业是否完成,时间间隔可以通过mapreduce.client.completion.pollinterval来设置,作业完成后,应用管理器和container会清理工作状态,作业的信息会被作业历史服务器存储,以备之后用户核查.
资源调度器 FIFO CapacityScheduler FairScheduler 三种,默认资源调度器是CapacityScheduler.
FIFO调度器
CapacityScheduler(容量调度器)
FairScheduler(公平调度器)
任务推测执行
- 作业完成时间取决于最慢的任务完成时间
- 某个任务运行速度远远慢于任务平均水平,为此任务启动一个备份任务,哪个先运行完,采用哪个结果
- 执行推测任务的前提条件
① 每个task只有一个备份任务
② 当前job已完成的task必须不小于5%
③ 开启推测执行参数设置.mapred-site.xml文件默认是打开的 - 不能启用推测执行的机制情况
① 任务间存在严重负载倾斜
② 特殊任务,比如向数据库中写入数据 - 推测执行算法逻辑

Hadoop优化手段
- MapReduce程序效率瓶颈在于两点
① 服务器性能: CPU 内存 磁盘 网络
② I/O操作优化
数据倾斜,Map和reduce个数,小文件过多,不可分块的大文件,spill次数过多,merge次数过多 - 优化从六个方面考虑
- 数据输入
①合并小文件: 在执行MR任务前将小文件进行合并,大量的小文件会产生大量的map任务,增大Map任务装载次数,而任务的装载比较耗时,从而导致MR运行较慢
② 采用CombineTextInputFormat作为输入,解决输入端大量小文件场景 - Map阶段
① 减少溢写(spill)次数:通过调整io.sort.mb和sort.spill.percent参数,增大触发spill的内存上限,减少spill次数,从而减少磁盘IO
② 减少合并(merge)次数: 通过调整io.sort.factor参数,增大merge的文件数目,减少merge的次数,从而缩短MR处理时间
③ 在map之后,不影响业务逻辑的前提下,可以进行combine处理,减少io - reduce阶段
① 合理设置map和reduce个数: 两个不能设置太少或太多,太少,导致task等待,延长处理时间,太多,会导致map,reduce任务间竞争资源,造成处理超时等错误
② 设置map,reduce并存: 调整slowstart.completedmap参数,使Map运行到一定程度后,reduce也开始运行,减少reduce的等待时间
③ 避免使用reduce: reduce在连接数据集的时候会产生大量的网络消耗
④ 合理设置reduce端的buffer: 默认情况下,数据达到一个阈值的时候,buffer中的数据就会写入磁盘,然后reduce会从磁盘中获得所有的数据.也就是说,buffer和reduce是没有直接关联的,中间多次写磁盘->读磁盘过程,可以通过参数设置,使得buffer中的一部分数据可以直接输送到reduce,从而减少io开销;mapreduce.reduce.input.buffer.percent,默认0.0,当值大于0的时候,会保留指定比例的内存读buffer中的数据直接拿给reduce使用.这样设置之后,buffer需要内存,读数据需要内存,计算需要内存,所以要根据作业运行情况进行调整. - I/O传输
① 采用数据压缩的方式,减少网络io的时间,安装snappy和lzo压缩编码器
② 使用sequencefile二进制文件 - 数据倾斜问题
① 数据频率倾斜: 某个地区的数据量远大于其他地区的数据
数据大小倾斜: 部分记录的大小远大于平均值
② 抽样和范围分区: 可以通过对原始数据进行抽样得到的结果集来预设分区边界值
③ 自定义分区: 基于输出键的背景知识自定义分区
④ Combiner: 使用Combiner可以大量的减少数据倾斜,在可能的情况下,combiner的目的就是聚合精简数据
⑤ 采用mapjoin,尽量避免reducejoin - 常用调优参数
① mapred-default.xml
mapreduce.map.memory.mb 一个maptask可使用的资源上限
mapreduce.reduce.memory.mb 一个reducetask可使用的资源上限
mapreduce.map.cup.vcores 每个maptask可使用最多CPUcore数目
mapreduce.reduce.cup.vcores 每个reducetask可使用的最多CPUcore数目
mapreduce.reduce.shuffle.parallelcopies 每个reduce去map中取数据的并行数
mapreduce.reduce.shuffle.merge.percent buffer中的数据达到多少比例开始写入磁盘,默认0.66
mapreduce.reduce.shuffle.input.buffer.percent Buffer大小占reduce可用内存比例,默认0.7
mpareduce.reduce.input.buffer.percent 指定多少比例的内存来存放buffer中的数据,默认值0.0
② yarn-default.xml
yarn.scheduler.minimum-allocation-mb 给应用程序container分配的最小内存
yarn.scheduler.maximum-allocation-mb 给应用程序container分配的最大内存
yarn.scheduler.minimum-allocation-vcores 每个container申请的最小CPU数
yarn.scheduler.maximum-allocation-vcores 每个container申请的最大CPU数
yarn.nodemanager.resource.memory-mb 给containers分配的最大物理内存
③ mapred-default.xml
mapreduce.task.io.sort.mb shuffle的环形缓冲区大小,默认100M
mapreduce.map.sort.spill.percent 环形缓冲区溢出的阈值,默认80%
④ 容错
mapreduce.map.maxattempts 每个maptask最大重试次数,默认4
mapreduce.reduce.maxattempts 每个reducetask最大重试次数,默认4
mapreduce.task.timeout task超时时间
- 数据输入
- 小文件优化
- HDFS上每个文件都要在NameNode上建立一个索引,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会占用大量的NameNode内存空间,另一方面就是索引文件过多使得索引速度变慢.
- 解决方案
① 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
② 在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并
③ 在MapReduce处理时,采用CombineTextInputFormat提高效率
④ Hadoop Archive
是一个高效的将小文件放入HDFS块中的文件存档工具,它能将多个小文件打包成一个HAR文件,这样就减少了NameNode的内存使用
⑤ Sequencefile
Sequence File由一系列的二进制key/value组成,如果key为文件名,value为文件内容,则可以将大批小文件合并成一个大文件.
⑥ CombineFileInputFormat
CombineFileInputFormat是一种新的InputFormat,用于将多个文件合并成一个单独的split,另外,它会考虑数据的存储位置.
⑦ 开启JVM重用
对于大量小文件job,可以开启JVM重用会减少45%的运行时间.
JVM重用原理: 一个map运行在一个JVM上,开启重用的话,该Map在JVM上运行完毕后,JVM继续运行其他的map
具体设置: mapreduce.job.jvm.numtasks 值在10-20之间
- MapReduce程序效率瓶颈在于两点