Spark006---coalesce和repartition

Intro

常用的重分区操作,简单记录下

coalesce

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率。当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本

  • 默认情况下,不shuffle
  • 即增加分区数,没有变化
  • 减少分区数,会把该分区数据增加到其他分区中,原有分区数据保持不变
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
def showPartitionData(dataRDD:RDD[Int]): Unit ={
    val dataRDD1 = dataRDD.mapPartitionsWithIndex(
      (index, datas) => {
        Array(Array(index.toString, datas.toArray)).toIterator
      }
    )
    val arr = dataRDD1.collect().toArray
    arr.foreach(x => {
      val index = x(0)
      val data = x(1).asInstanceOf[Array[Int]].mkString(",")
      println(s"index=${index},data=${data}")
    }
    )
  }
showPartitionData: (dataRDD: org.apache.spark.rdd.RDD[Int])Unit
    val spark = SparkSession.builder().master("local[1]").getOrCreate()
    import spark.implicits._
    val dataRDD = spark.sparkContext.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
    showPartitionData(dataRDD)
index=0,data=1,2
index=1,data=3,4
index=2,data=5,6





spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@58767892
import spark.implicits._
dataRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:32

三个分区,每个数据两个数据

    showPartitionData(dataRDD.coalesce(2))
index=0,data=1,2
index=1,data=3,4,5,6

缩减分区,发现分区0数据没变,分区2的数据合并到分区1

    showPartitionData(dataRDD.coalesce(4))
index=0,data=1,2
index=1,data=3,4
index=2,data=5,6
println(dataRDD.coalesce(4).getNumPartitions)
3

增加分区没有发生变化

而shuffle是会发生变化的,直接看个例子:

println(dataRDD.coalesce(4,shuffle=true).getNumPartitions)
4
    showPartitionData(dataRDD.coalesce(4,shuffle=true))
index=0,data=2,5
index=1,data=3,6
index=2,data=4
index=3,data=1

repartition

简单的说repartition就是shuffle的coalesce

2021-11-17 于南京市江宁区九龙湖


版权声明:本文为wendaomudong_l2d4原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。