Spark调优:驱动器OutOfMemory优化

之前一直用driver.memory=30g和driver.cores=8
处理亿级的数据,都没什么问题,直到有一天,处理区区一万条的数据时,居然报Driver OutOfMemoryErrors。

首先说一下背景,我样本量1万,特征近1000个,用spark计算特征的缺失值比例,最开始想到的是遍历每个特征,分别计算,代码如下。

def Identify_Missing(df, columns, all_cnt, missing_threshold=0.80):
	
	drop_columns = []
	# 计算缺失率
	progress_bar = tqdm(columns)
	for c in progress_bar:
		# 显示进度
		progress_bar.set_description('缺失值处理:{}'.format(c))
		nan_cnt = df.select(F.count(c)).collect()[0][0]
		nan_rate = nan_cnt/all_cnt
		# 大于阈值,则丢弃
		if nan_rate>=missing_threshold:
			drop_columns.append(c)

	return drop_columns

但很糟心的是,死活跑不出来,跑到26%就自动kill了,报错说Driver Memory不够。所以,想当然的就是加内存,配置参数如下,30G加到了100G。

conf.set('spark.driver.memory', '100G')

然而,还是不行,跑了4个多小时,运行到92%的时候又自动kill了。

虽然可以继续加大内存,但数据量这么小的情况下,跑不动,显然就是代码性能的问题了,后来研究了下,是由于collect函数使用过多,导致拉取太多数据到驱动器里,耗尽了内存,而这一步主要是计算各个特征的缺失值数量,如果用sql一次性全count出来,不用频繁拉取,因此更新代码如下。

def Identify_Missing(df, columns, all_cnt, missing_threshold=0.80):
	# 聚合函数拼接
	agg_cols = ['count(%s) as %s'%(x,x) for x in columns]
	# 使用sql
	agg_df = df.selectExpr(agg_cols).toPandas()
	# 计算缺失率
	nan_df = agg_df.apply(lambda x: 1-x/all_cnt)
	# 大于阈值,则丢弃
	drop_columns = nan_df.iloc[:, np.where(nan_df>=missing_threshold)[1]].columns.tolist()
	
	return drop_columns 

效果嘛,同样的数据集,5分钟跑出来了。。。。

当然,驱动器OOM的原因有很多,比较难诊断是哪种原因,下面罗列所有可能,供备查。

  1. 最直接的办法,增加驱动器进程的内存,让它可以处理更多的数据。
  2. 代码中使用了诸如collect之类的操作,将过大的数据集收集到驱动器节点。这种就尽可能避免使用action操作,或者像我这样一次性聚合。
  3. 使用了广播连接,但广播的数据太大。可以通过设置spark的最大广播连接数控制广播消息的大小。
  4. 应用程序长时间运行导致驱动器进程生成大量对象,并且无法释放。可以用Java的jmap工具打印堆内存维护对象的直方图,看哪些对象在占内存。
  5. 如果是共用的SparkContext,要确认别的用户不会在驱动器执行导致分配大量内存的操作,如创建过大的数组,或者加载过大的数据集。

版权声明:本文为qq_33536353原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。