Spark中RDD复杂算子 aggregate()、combineByKeyWithClassTag()与aggregateByKey()

1、aggregate()
方法声明:

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {}

方法声明中的重点:
1、aggregate返回类型为传入的类型参数U
2、第一个参数列表zeroValue为U类型的值,是聚合的初始值。
3、两个函数类型的参数,官方注释如下:

 * @param seqOp an operator used to accumulate results within a partition
 * @param combOp an associative operator used to combine results from different partitions

前两点不难理解,第三点中我会解释seqOp ,combOp。

  • seqOp传入参数(U,T),其中U为累加结果,T为RDD中下一行的值,返回一个类型为U的值。可以说第一次调用seqOp时,U的初始值为第一个参数列表U类型的zeroValue,下一次调用seqOp时,U的值为上一个SeqOp的返回结果。需要注意的是seqOp只在Partition内执行,最终返回一个U类型的聚合结果,比如第i个partition的聚合结果我们暂且称为result_i。
  • combOp是用来合并seqOp在不同partition内计算的聚合结果result_i,这也是为什么它的传入参数列表为(:U,:U),返回也为U类型。

下面是一个例子,将一个Tuple2类型列表中值转化为Json。
(此例为更鲜明体现aggragete返回的类型更加灵活,不受输入RDD的限制)

val list2 = List(("a",1),("b",2),("c",3),("d",4))
    val rdd6 = sc.parallelize(list2)

    var jsonobject = new JSONObject()

    val a = rdd6.aggregate[JSONObject](jsonobject)(
      (a:JSONObject,b)=>{
            a.put(b._1.toString,b._2.toString)
            a
          }
      ,(c,d)=>{
        val keyset =  d.keySet()
        import collection.JavaConversions._
        for(key<-keyset){
          c.put(key,d.getString(key))
        }
        c
      }
    )
    println(a)

最终结果为:{“a”:“1”,“b”:“2”,“c”:“3”,“d”:“4”}

2、combineByKey()
combineByKey是PairRDD的算子,pairRdd大部分聚合操作内部都是调用combineByKey()实现的。
先看一下方法的声明,(有不同参数类型的重载,我们只看参数最少的这一个):

 def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
  }

三个参数的官方解释:

 *  - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
 *  - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
 *  - `mergeCombiners`, to combine two C's into a single one.
  • createCombiner:相当于对rdd中的value做一步类似map处理,形成改key下累加初始值,处理成累加器的格式C。
  • mergeValue:与aggregate()第一个参数类似,他是将上一个累加器结果和下一行的值做合并处理,将上一个格式为C累加结果与rdd只能够的value合并为一个C类型的新结果。
  • mergeCombiners:同aggregate()的第二个参数,合并不同分区中相同key的累加结果。

明显createCombiner,mergeValue是在同一个partition中进行,而在同一个partition中每次遇到新的key,createCombiner都会重新创建一个初始值。

计算班级平均成绩的代码:

val rdd4 = rdd.map(x=>(x.split(" ")(0),x.split(" ")(2)))
//rdd4:((classA,98),(classA,90),(classB,88))
rdd4.combineByKeyWithClassTag [Tuple2[Int,Int]](
      (v:String)=>(v.toInt,1)
      ,(a:(Int,Int),v:String)=>(a._1+v.toInt,a._2+1)
      ,(a:(Int,Int),b:(Int,Int))=>(a._1+b._1,b._2+b._2)
    ).map{case (key,value)=>(key,value._1/value._2)}.foreach(x=>println(x._1+" "+x._2))

3、aggregateByKey()
看完上面两个这个也就不难理解了。
值得注意的是aggregateByKey()内部是调用了combineByKeyWithClassTag()实现的,,这也印证了“pairRdd大部分聚合操作内部都是调用combineByKey()实现的”说法。

aggregateByKey()部分源码如下:

  def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
    // Serialize the zero value to a byte array so that we can get a new clone of it on each key
    val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
    val zeroArray = new Array[Byte](zeroBuffer.limit)
    zeroBuffer.get(zeroArray)

    lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
    val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))

    // We will clean the combiner closure later in `combineByKey`
    val cleanedSeqOp = self.context.clean(seqOp)
    combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
      cleanedSeqOp, combOp, partitioner)
  }

使用样例:

rdd4.aggregateByKey [Tuple2[Int,Int]]((0,0))(
      (a:(Int,Int),v:String)=>(a._1+v.toInt,a._2+1)
      ,(a:(Int,Int),b:(Int,Int))=>(a._1+b._1,b._2+b._2)
    ).map{case (key,value)=>(key,value._1/value._2)}.foreach(x=>println(x._1+" "+x._2))


版权声明:本文为EnterPine原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。