1、aggregate()
方法声明:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {}
方法声明中的重点:
1、aggregate返回类型为传入的类型参数U
2、第一个参数列表zeroValue为U类型的值,是聚合的初始值。
3、两个函数类型的参数,官方注释如下:
* @param seqOp an operator used to accumulate results within a partition
* @param combOp an associative operator used to combine results from different partitions
前两点不难理解,第三点中我会解释seqOp ,combOp。
- seqOp传入参数(U,T),其中U为累加结果,T为RDD中下一行的值,返回一个类型为U的值。可以说第一次调用seqOp时,U的初始值为第一个参数列表U类型的zeroValue,下一次调用seqOp时,U的值为上一个SeqOp的返回结果。需要注意的是seqOp只在Partition内执行,最终返回一个U类型的聚合结果,比如第i个partition的聚合结果我们暂且称为result_i。
- combOp是用来合并seqOp在不同partition内计算的聚合结果result_i,这也是为什么它的传入参数列表为(:U,:U),返回也为U类型。
下面是一个例子,将一个Tuple2类型列表中值转化为Json。
(此例为更鲜明体现aggragete返回的类型更加灵活,不受输入RDD的限制)
val list2 = List(("a",1),("b",2),("c",3),("d",4))
val rdd6 = sc.parallelize(list2)
var jsonobject = new JSONObject()
val a = rdd6.aggregate[JSONObject](jsonobject)(
(a:JSONObject,b)=>{
a.put(b._1.toString,b._2.toString)
a
}
,(c,d)=>{
val keyset = d.keySet()
import collection.JavaConversions._
for(key<-keyset){
c.put(key,d.getString(key))
}
c
}
)
println(a)
最终结果为:{“a”:“1”,“b”:“2”,“c”:“3”,“d”:“4”}
2、combineByKey()
combineByKey是PairRDD的算子,pairRdd大部分聚合操作内部都是调用combineByKey()实现的。
先看一下方法的声明,(有不同参数类型的重载,我们只看参数最少的这一个):
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
}
三个参数的官方解释:
* - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
* - `mergeCombiners`, to combine two C's into a single one.
- createCombiner:相当于对rdd中的value做一步类似map处理,形成改key下累加初始值,处理成累加器的格式C。
- mergeValue:与aggregate()第一个参数类似,他是将上一个累加器结果和下一行的值做合并处理,将上一个格式为C累加结果与rdd只能够的value合并为一个C类型的新结果。
- mergeCombiners:同aggregate()的第二个参数,合并不同分区中相同key的累加结果。
明显createCombiner,mergeValue是在同一个partition中进行,而在同一个partition中每次遇到新的key,createCombiner都会重新创建一个初始值。
计算班级平均成绩的代码:
val rdd4 = rdd.map(x=>(x.split(" ")(0),x.split(" ")(2)))
//rdd4:((classA,98),(classA,90),(classB,88))
rdd4.combineByKeyWithClassTag [Tuple2[Int,Int]](
(v:String)=>(v.toInt,1)
,(a:(Int,Int),v:String)=>(a._1+v.toInt,a._2+1)
,(a:(Int,Int),b:(Int,Int))=>(a._1+b._1,b._2+b._2)
).map{case (key,value)=>(key,value._1/value._2)}.foreach(x=>println(x._1+" "+x._2))
3、aggregateByKey()
看完上面两个这个也就不难理解了。
值得注意的是aggregateByKey()内部是调用了combineByKeyWithClassTag()实现的,,这也印证了“pairRdd大部分聚合操作内部都是调用combineByKey()实现的”说法。
aggregateByKey()部分源码如下:
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)
lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
// We will clean the combiner closure later in `combineByKey`
val cleanedSeqOp = self.context.clean(seqOp)
combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
cleanedSeqOp, combOp, partitioner)
}
使用样例:
rdd4.aggregateByKey [Tuple2[Int,Int]]((0,0))(
(a:(Int,Int),v:String)=>(a._1+v.toInt,a._2+1)
,(a:(Int,Int),b:(Int,Int))=>(a._1+b._1,b._2+b._2)
).map{case (key,value)=>(key,value._1/value._2)}.foreach(x=>println(x._1+" "+x._2))
版权声明:本文为EnterPine原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。