一、shuffle简介
--------------------------------------------
1.在spark中,主要是以下几个算子:groupByKey、reduceByKey、countByKey、join,等等。会发生shuffle操作
2.例如,groupByKey,要把分布在集群各个节点上的数据中的同一个key,对应的values,都给集中到一块儿,
集中到集群中同一个节点上,更严密一点说,就是集中到一个节点的一个executor的一个task中。
然后,集中一个key对应的values之后,才能交给我们来进行处理,<key, Iterable<value>>;
reduceByKey,算子函数去对values集合进行reduce操作,最后变成一个value;
countByKey,需要在一个task中,获取到一个key对应的所有的value,然后进行计数,
统计总共有多少个value;join,RDD<key, value>,RDD<key, value>,只要是两个RDD中,
key相同对应的2个value,都能到一个节点的executor的task中,给我们进行处理。
二、shuffle调优之合并map端输出文件
---------------------------------------------
1.第一个stage的每个task,都会给第二个stage的每个task,创建一份map端的输出文件
2.第二个stage的每个task,都会去上一个stage上去拉取属于自己的那一份文件
3.这时,就会发生大量的磁盘读写操作,基本上就是shuffle中性能消耗最严重的部分
4.举例 100个节点,100个excutor,每个节点2个core, task并行数为2,每个stage总共1000个task,
stage0 1000个task, task并行度2, 100个节点,每个节点10个task
stage1 1000个task, task并行度2, 100个节点,每个节点10个task
因为stage0的每个task都要为stage1的每个task准备一份数据,也就是说stage0每个task要准备1000份数据
那么每个节点10个task, 要准备的数据 10 * 1000 = 1 万
那么10个节点总共准备的数据就是 100 * 1 万 = 100万
那么这个shuffle阶段产生的文件就是100万份文件
5.什么是map端输出文件合并?
每个stage都会同时并行运行cpu core个task
对于每个excutor,只有并行执行的task会去创建新的输出文件;
下一批并行执行的task,就会去复用之前已有的输出文件;
但是有一个例外,比如2个task并行在执行,但是此时又启动要执行2个task;
那么这个时候的话,就无法去复用刚才的2个task创建的输出文件了;而是还是只能去创建新的输出文件。
要实现输出文件的合并的效果,必须是一批task先执行,然后下一批task再执行,才能复用之前的输出文件;
负责多批task同时起来执行,还是做不到复用的。
6.开启map端输出文件的合并机制之后:
还是上面的例子
100个节点,100个excutor,每个节点2个core, task并行数为2,每个stage总共1000个task,
stage0 1000个task, task并行度2, 100个节点,每个节点10个task
stage1 1000个task, task并行度2, 100个节点,每个节点10个task
stage0的每个task都要为stage1的每个task准备一份数据,但是因为开启了map端文件合并,
会复用并行文件,那么 stage0同时并行2个task,这两个task算一批次,那么这批次产生的map数量为 2 * 1000 = 2000个
所以之后的task会复用这2000份文件
这样每个节点产生的map文件数量问2000
100个节点,产生的数量为 2000 * 100 = 20万
7.合并map端输出文件之后,有哪些性能方面的优化?
a.stage0 -- 为stage1 准备的文件由100万减少到20万,磁盘读写io减少5倍
b.stage1 -- 拉取stage0的文件,也由原来的10000,变为2000,网络传输也减少了5倍
8.如何开启map端聚合?
new SparkConf().set("spark.shuffle.consolidateFiles", "true")
开启shuffle map端输出文件合并的机制;默认情况下,是不开启的,
就是会发生如上所述的大量map端输出文件的操作,严重影响性能。
三、Shuffle调优之调节map端内存缓冲与reduce端内存占比
--------------------------------------------------------
1.默认情况下,shuffle的map端的task,需要为reduce端的task准备数据,那么这些数据默认是写到
每个task自己关联的一个内存缓冲区中。这个缓冲区的默认大小是32k,当这个换冲区满了,多余的数据
就会溢出到磁盘。
2.shuffle阶段的reduce端会拉取map端给予的文件,并将数据对应成hashmap的格式,来对各个key对应的values
进行reduce,执行相应的reduce逻辑。而在这个过程中,将数据分组汇聚到hashmap,以及对hashmap应用reduce函数
的时候,都会涉及一些中间数据,这些中间数据就会存放到自己的excutor的内存中。默认分配的内存占比是0.2,占总
内存的20%.(所有的内存划分: 堆,jvm进程内存,excutor内存, 持久化内存等)
当拉取的数据太多,超出0.2分配的内存之后,多余的数据就会溢出到磁盘中
3.上面两种情况一般都是同时发生的,一旦发生,就会造成大量的磁盘io,从而影响性能
4.实际生产环境中,我们在什么时候来调节两个参数?
进入spark ui , 查看每个task的shuffle write和shuffle read的量,shuffle的磁盘和内存,读写的数据量。
如果发现shuffle 磁盘的write和read,很大。这个时候,就意味着最好调节一些shuffle的参数。
进行调优。首先当然是考虑开启map端输出文件合并机制。
其次调节上面说的那两个参数。调节的时候的原则。spark.shuffle.file.buffer,每次扩大一倍,
然后看看效果,64,128;spark.shuffle.memoryFraction,每次提高0.1,看看效果。
5.调节效果?
map task内存缓冲变大了,减少spill到磁盘文件的次数;
reduce端聚合内存变大了,减少spill到磁盘的次数,而且减少了后面聚合读取磁盘文件的数量。
四、Shuffle调优之HashShuffleManager与SortShuffleManager
------------------------------------------------------------
1.参数说明
spark.shuffle.manager:hash、sort、tungsten-sort(自己实现内存管理)
spark.shuffle.sort.bypassMergeThreshold:200
2.spark 1.5 以前默认使用的shuffle manager 是HashShuffleManager, 1.5之后使用的是SortShuffleManager.
二者的区别主要在于,shuffle 的map端为reduce端的task准备文件时,有所不同
SortShuffleManager 比 Hash 多的是会对map端的文件的key进行排序。而且不会像hash那样默认是创建多分文件的,已经是map端聚合的了
一个task为下一个stage的task只会生成一份文件,其中用offset进行划分各个小文件
默认情况下,hashmanager 的stage0 的task 会为stage1的每个task创建一份文件,
但是sortmanager 会在其基础上,stage0 task为每个stage1创建的文件,合并成一个文件,并且是排序的。其中用offset来标记stage1的task需要的文件
具体使哪一个。
当stage1的task拉取文件的时候,只拉取task对应的那部分offset的数据即可。
3.声明一点:之前讲解的一些调优的点,比如consolidateFiles机制、map端缓冲、reduce端内存占比。这些对任何shuffle manager都是有用的。
4.先选择sort的shuffle方式:spark.shuffle.manager = sort
然后设定sort文件的阈值:spark.shuffle.sort.bypassMergeThreshold
阈值:自己可以设定一个阈值,默认是200,当reduce task数量少于等于200;map task创建的输出文件小于等于200的;
最后会将所有的输出文件合并为一份文件,但是不会进行排序。
这样做的好处,就是避免了少量文件的sort排序,节省了性能开销。
而且还能将多个reduce task的文件合并成一份文件。节省了reduce task拉取数据的时候的磁盘IO的开销。
5.第三种shuffle方式 -- tungsten-sort钨丝排序
钨丝sort shuffle manager。官网上一般说,钨丝-sort shuffle manager,效果跟sort shuffle manager是差不多的。
但是,唯一的不同之处在于,钨丝manager,是使用了自己实现的一套内存管理机制,性能上有很大的提升,
而且可以避免shuffle过程中产生的大量的OOM,GC,等等内存相关的异常。
6.如何选择shuffle方式?
a.需不需要排序?
1.如果不需要的话,那么建议如使用最基本的HashShuffleManager,因为最开始就是考虑的是不排序,换取高性能;
2.如果需要排序的话,那么建议使用SortShuffleManager,并且合理调整spark.shuffle.sort.bypassMergeThreshold:200
参数,保证排序
3.如果不需要排序,而且希望将map端输出文件合并成一个文件,那么建议使用SortShuffleManager,并且将spark.shuffle.sort.bypassMergeThreshold
设置成足够大,至少大于reduce数量
b.总结
1、在生产环境中,不建议大家贸然使用第三点
2、如果你不想要你的数据在shuffle时排序,那么就自己设置一下,用hash shuffle manager。
3、如果你的确是需要你的数据在shuffle时进行排序的,那么就默认不用动,默认就是sort shuffle manager;
4、或者是什么?如果你压根儿不care是否排序这个事儿,那么就默认让他就是sort的。
调节一些其他的参数(consolidation机制)。(80%,都是用这种)
c.开启方式
spark.shuffle.manager:hash、sort、tungsten-sort
new SparkConf().set("spark.shuffle.manager", "hash")
new SparkConf().set("spark.shuffle.manager", "tungsten-sort")
// 默认就是,new SparkConf().set("spark.shuffle.manager", "sort")
new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold", "550")
版权声明:本文为xcvbxv01原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。