源码解析Spark各个ShuffleWriter的实现机制(四)——UnsafeShuffleWriter

基于3.2源码。

UnsafeShuffleWriter的应用场景

它用在对序列化数据直接排序的场景,避免了将数据反序列化后排序,再序列化的开销。它是对数据的分区id进行排序,并不会对数据的key排序。

这个shuffle方式大量使用到了sun.misc.Unsafe来与操作系统打交道,直接操作内存。

实现

整体流程

UnsafeShuffleWriter的实现封装地看起来十分简单:

// UnsafeShuffleWriter
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
  // 这里采用try finally,并在finally中回收资源时
  // 根据success标志确认异常的根本原因,
  // 相比try catch finally更易定位异常
  boolean success = false;
  try {
    // 这就是要shuffle的数据
    while (records.hasNext()) {
      insertRecordIntoSorter(records.next());
    }
    closeAndWriteOutput();
    success = true;
  } finally {
    if (sorter != null) {
      try {
        sorter.cleanupResources();
      } catch (Exception e) {
        // Only throw this error if we won't be masking another
        // error.
        if (success) {
          throw e;
        } else {
          logger.error("In addition to a failure during writing, we failed during " +
                       "cleanup.", e);
        }
      }
    }
  }
}

那么整体流程就是

  • insertRecordIntoSorter: 将记录插入排序器,排序后写入文件,这里会根据设定的阈值溢出数据到切片文件;
  • closeAndWriteOutput: 合并所有切片文件到一份整体有序文件,并删除切片文件;
  • cleanupResources: 回收资源。

insertRecordIntoSorter

先查看insertRecordIntoSorter的具体实现:

// UnsafeShuffleWriter
void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
  assert(sorter != null);
  final K key = record._1();
  // 通过分区器计算出数据的分区
  final int partitionId = partitioner.getPartition(key);
  // 初始化序列化数组的计数为0
  // 下方的列化流的数据实际上是写到该数组的
  serBuffer.reset();
  // writeKey如果不查看实现的话,相当具有迷惑性
  // Spark实际上是没有将key写入文件的。
  // 根据注释提示key只在map端计算分区id时有用,
  // 不需要被shuffled
  serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
  // 将数据写入到序列化流中
  serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
  // 确保数据写入序列化流
  serOutputStream.flush();

  final int serializedRecordSize = serBuffer.size();
  assert (serializedRecordSize > 0);

  // 这里将数据插入到排序器中,经过排序后写入到文件中
  // 分别传入了:
  // + 序列化流的数据存储的数组
  // + JVM中对字节数组第一个元素的起始偏移量:在JVM的实现中,不论对象和数组都是对象头这样的固定开销,BYTE_ARRAY_OFFSET即通过sun.misc.Unsafe.arrayBaseOffset(byte[].class)得到
  // + 当前序列化记录的大小
  // + 分区id
  sorter.insertRecord(
    serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
}

那么排序和写入文件应当是在insertRecord中实现的。在确认实现之前,先了解ShuffleExternalSorter向TaskMemoryManager申请一个page(MemoryBlock)作为存储数据地址,此时MemoryBlock的核心组成为:高位13位作为页码,低位51位存储数据的地址。
接着ShuffleExternalSorter用一个LongArray记录依次这些page(MemoryBlock),以及数据的分区id,这样排序时不需要对数据进行进行移动,而是编排当前所有page的顺序完成排序。

// ShuffleExternalSorter
public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId)
  throws IOException {

  assert(inMemSorter != null);
  // sorter中的数据超过一定阈值就切分到文件中
  // 该阈值通过spark.shuffle.spill.numElementsForceSpillThreshold定义
  // 默认Interger.MAX_VALUE
  if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
    logger.info("Spilling data because number of spilledRecords crossed the threshold " +
      numElementsForSpillThreshold);
    // 在这里完成了数据的排序,并切分到文件
    spill();
  }

  // 当排序器中使用的数组容量不足以存储记录时,
  // 向taskMemoryManager申请当前数组容量的2倍进行扩容
  growPointerArrayIfNecessary();
  // 根据当前系统架构确认是否对齐字节,对齐则值为8,不对齐则为4
  final int uaoSize = UnsafeAlignedOffset.getUaoSize();
  // 需要4或8字节来存储记录的长度
  final int required = length + uaoSize;
  // 如有必要,向taskMemoryManager申请新的page来存储数据的地址和长度,
  // 并加到已申请的pages数组中,将currentPage指向新申请的page
  acquireNewPageIfNecessary(required);

  assert(currentPage != null);
  final Object base = currentPage.getBaseObject();
  // 得到64位的记录地址,其中高位13位作为页码,低位51位存储数据的地址(偏移量)
  final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
  // 以字节对齐方式将数据的长度记录在page中
  UnsafeAlignedOffset.putSize(base, pageCursor, length);
  // 调整下page指针的内存地址,如果不改,再按该地址写入数据会覆盖掉上述的length
  pageCursor += uaoSize;
  // 再将数据复制到上述指定的地址
  Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
  // 调整下page指针的内存地址,指到写入的数据后
  pageCursor += length;
  // 将记录的page和分区id记录到一个LongArray中,
  // 那么后续排序不需要调整数据的内存地址,只需要调整这个LongArray中page的次序即可
  inMemSorter.insertRecord(recordAddress, partitionId);
}

上述代码中spill的实现比较复杂,还没有讲清楚,接着查看spill的实现:

// ShuffleExternalSorter
public long spill(long size, MemoryConsumer trigger) throws IOException {
  if (trigger != this || inMemSorter == null || inMemSorter.numRecords() == 0) {
    return 0L;
  }
  // 溢出page到文件中就可以释放将这些page释放掉,等待gc即可
  // 排序并写入临时文件
  writeSortedFile(false);
  // 所有page的内存大小
  final long spillSize = freeMemory();
  inMemSorter.reset();
  // 上报溢出切片文件大小
  taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
  return spillSize;
}

那么关键实现在于writeSortedFile,其中分为两步,第一步是排序的实现,第二步才是写入文件。
先查看第一步排序:

// ShuffleExternalSorter
private void writeSortedFile(boolean isLastFile) {

  // This call performs the actual sort.
  // 根据这里的注释和方法名,可以知道排序是通过getSortedIterator实现的
  final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
    inMemSorter.getSortedIterator();
    // 先忽略后边的实现
}

查看getSortedIterator的实现,可以发现这里由两种排序方式,一种是基数排序(按一个字节,256进制排),默认采用,它的实现和常见的十进制基数排序不一样,在[Least-Significant-Digit Radix Sort ](#Least-Significant-Digit Radix Sort)小节详细展开;一种是TimSort倒是较为常见,这里不作多介绍:

// ShuffleInMemorySorter
public ShuffleSorterIterator getSortedIterator() {
  int offset = 0;
  // 默认基数排序
  // 通过spark.shuffle.sort.useRadixSort控制
  if (useRadixSort) {
    offset = RadixSort.sort(
      // array即记录page和partitionId的数组
      // pos即array中有效记录的个数
      array, pos,
      // 在array每个元素中,分区id记录在第[5, 7]个字节之间,分区id即由三个字节组成,
      // 也就是可以表示2^24=16777216,即采用UnsafeShuffleWriter的使用条件,分区数不得超过2^24
      // 以下两个值分别是5和7
      PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX,
      PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false);
  } else {
    // TimSort排序
    MemoryBlock unused = new MemoryBlock(
      array.getBaseObject(),
      array.getBaseOffset() + pos * 8L,
      (array.size() - pos) * 8L);
    LongArray buffer = new LongArray(unused);
    Sorter<PackedRecordPointer, LongArray> sorter =
      new Sorter<>(new ShuffleSortDataFormat(buffer));

    sorter.sort(array, 0, pos, SORT_COMPARATOR);
  }
  return new ShuffleSorterIterator(pos, array, offset);
}

到此完成了对数据的排序,并转换为Iterator对象。接着看writeSortedFile剩余的实现:

private void writeSortedFile(boolean isLastFile) {
  final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
    inMemSorter.getSortedIterator();
  // 没有数据,也就不需要写文件
  if (!sortedRecords.hasNext()) {
    return;
  }

  final ShuffleWriteMetricsReporter writeMetricsToUse;

  if (isLastFile) {
    // 只有在写非切分文件时,才需要上报指标
    writeMetricsToUse = writeMetrics;
  } else {
    // 还在切分文件,应当上报溢出到文件的字节,而不是上报写入文件的字节数,
    // 因此创建一个虚拟的metrics,上报不会有任何效果
    writeMetricsToUse = new ShuffleWriteMetrics();
  }

  // 写入的缓存大小,缓存满了则写入磁盘,
  // 通过spark.shuffle.spill.diskWriteBufferSize控制大小
  final byte[] writeBuffer = new byte[diskWriteBufferSize];

  // 创建分片文件对象
  final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
    blockManager.diskBlockManager().createTempShuffleBlock();
  final File file = spilledFileInfo._2();
  final TempShuffleBlockId blockId = spilledFileInfo._1();
  // 创建分片信息
  final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId);

  final SerializerInstance ser = DummySerializerInstance.INSTANCE;

  int currentPartition = -1;
  final FileSegment committedSegment;
  try (DiskBlockObjectWriter writer =
      blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse)) {

    final int uaoSize = UnsafeAlignedOffset.getUaoSize();
    while (sortedRecords.hasNext()) {
      // 这里的记录已根据分区id排序了,但要注意此时指的记录是(page, 分区id),
      // page中包含了数据在内存中的地址
      sortedRecords.loadNext();
      // 提取出分区id
      final int partition = sortedRecords.packedRecordPointer.getPartitionId();
      // 如果出现partition < currentPartition,说明排序或currentPartition的计算由问题,
      // 这里是出于健壮性判断
      assert (partition >= currentPartition);
      // 当当前数据的分区id和当前文件的分区id不一致(这里假设了数据是根据分区id升序写入到不同文件的),
      // 那么就要切换到新的FileSegment上写入数据,并切换当前分区id为当前数据的分区id
      if (partition != currentPartition) {
        // currentPartition初始值为-1,因此第一次是不需要提交上个fileSegment的
        if (currentPartition != -1) {
          final FileSegment fileSegment = writer.commitAndGet();
          spillInfo.partitionLengths[currentPartition] = fileSegment.length();
        }
        // 切换当前分区id为当前数据的分区id
        currentPartition = partition;
        if (partitionChecksums.length > 0) {
          writer.setChecksum(partitionChecksums[currentPartition]);
        }
      }
      // 得到数据在内存中的地址
      // 复习一下:
      // + sortedRecords每个元素的组成为(page, 分区id)
      // + page包含了pageNumber和数据在内存中的地址
      // 以下是在计算数据在内存中的真实地址
      final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
      final Object recordPage = taskMemoryManager.getPage(recordPointer);
      final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);
      int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage);
      // 字节补齐
      long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length
      while (dataRemaining > 0) {
        final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining);
        // 将数据拷贝到writerBuffer中
        Platform.copyMemory(
          recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer);
        writer.write(writeBuffer, 0, toTransfer);
        recordReadPosition += toTransfer;
        dataRemaining -= toTransfer;
      }
      // 将数据写入文件
      writer.recordWritten();
    }
    // 数据都写完后,再提交(flush)
    committedSegment = writer.commitAndGet();
  }

  if (currentPartition != -1) {
    spillInfo.partitionLengths[currentPartition] = committedSegment.length();
    // 将分片文件信息记录到spills数组中,便于确认每个分片文件的长度
    spills.add(spillInfo);
  }

  if (!isLastFile) {
    // 上报指标...
    writeMetrics.incRecordsWritten(
      ((ShuffleWriteMetrics)writeMetricsToUse).recordsWritten());
    taskContext.taskMetrics().incDiskBytesSpilled(
      ((ShuffleWriteMetrics)writeMetricsToUse).bytesWritten());
  }
}

小结

这里就完成了UnsafeShuffleWriter的主要实现,还有closeAndWriteOutput尚未分析,但内容已经不多了,只是合并切片文件,并记录每个切片文件的大小,作用和索引文件差不多,这和其他两种Shuffle方式没有太大差别。先作个小结:

  • UnsafeShuffleWriter应用在对序列化数据直接排序的场景
  • 基于数据的分区id进行升序排序
  • 排序算法默认低位有效字节(8位,2^8=256)基数排序,采用的256进制
  • 每个分区id对应一个分片文件

closeAndWriteOutput

这里代码就不需要展开了,阅读起来很简单,总体实现就是强制把缓存中的数据写入分片文件,再按partitionId升序合并所有切片文件,合并过程记录下每个fileSegment(分区文件)的长度,将每个分片文件的长度信息传入构造mapStatus,以便向DAGScheduler上报。

void closeAndWriteOutput() throws IOException {
  assert(sorter != null);
  // 更新内存使用峰值
  updatePeakMemoryUsed();
  
  serBuffer = null;
  serOutputStream = null;
  // 强制将缓存中的数据写入硬盘,释放掉排序器,
  // 并获取所有的分片文件信息
  final SpillInfo[] spills = sorter.closeAndGetSpills();
  try {
    // 合并切片文件
    partitionLengths = mergeSpills(spills);
  } finally {
    sorter = null;
    for (SpillInfo spill : spills) {
      if (spill.file.exists() && !spill.file.delete()) {
        logger.error("Error while deleting spill file {}", spill.file.getPath());
      }
    }
  }
  // 构造mapStatus以便向DAGScheduler上报作业状态和数据
  mapStatus = MapStatus$.MODULE$.apply(
    blockManager.shuffleServerId(), partitionLengths, mapId);
}

排序算法

Least-Significant-Digit Radix Sort

// RadixSort
public static int sort(
    // startByteIndex = 5
    // endByteIndex = 7
    // desc = false 即升序
    // signed = false 即非二进制补码排序
    LongArray array, long numRecords, int startByteIndex, int endByteIndex,
    boolean desc, boolean signed) {
    // 一些准入条件
  assert startByteIndex >= 0 : "startByteIndex (" + startByteIndex + ") should >= 0";
  assert endByteIndex <= 7 : "endByteIndex (" + endByteIndex + ") should <= 7";
  assert endByteIndex > startByteIndex;
  // 注意这里的array的大小至少得是当前所有记录数的两倍及以上,
  // 上边也讲到在ShuffleExternal#growPointerArrayIfNecessary方法中看到申请array时就按的两倍于已使用的大小
  // 在sortByByte中又会交替使用array中前半部分和后半部分的空间作为缓存
  assert numRecords * 2 <= array.size();
  long inIndex = 0;
  long outIndex = numRecords;
  if (numRecords > 0) {
    // 实现较为复杂,需展开说明
    long[][] counts = getCounts(array, numRecords, startByteIndex, endByteIndex);
    for (int i = startByteIndex; i <= endByteIndex; i++) {
      if (counts[i] != null) {
        sortAtByte(
          array, numRecords, counts[i], i, inIndex, outIndex,
          desc, signed && i == endByteIndex);
        long tmp = inIndex;
        inIndex = outIndex;
        outIndex = tmp;
      }
    }
  }
  return Ints.checkedCast(inIndex);
}

getCounts中,将分区id(3个字节组成)拆分成3个字节,从低位到高位,将其分布记录到counts数组中

private static long[][] getCounts(
    LongArray array, long numRecords, int startByteIndex, int endByteIndex) {
  // 这里声明counts数组能够存储8个一维数组,但实际只用到了[5, 7]三个,其余空闲
  long[][] counts = new long[8][];
  // 稍后细讲
  long bitwiseMax = 0;
  long bitwiseMin = -1L;
  // 得到存储page和分区id的数组的最大地址
  long maxOffset = array.getBaseOffset() + numRecords * 8L;
  // baseObject为null,在申请page时,传入的即null
  Object baseObject = array.getBaseObject();
  // 遍历每个(page, partitionId)
  for (long offset = array.getBaseOffset(); offset < maxOffset; offset += 8) {
    long value = Platform.getLong(baseObject, offset);
    // 原bitwiseMax定义的是0,或的定义:1 | 0 = 1, 0 | 0 = 0
    // 那么只要对应位上在遍历中出现过1,那么该位就会为1,
    bitwiseMax |= value;
    // 原bitwiseMin定义的是-1,按补码的定义,-1L为0xFFFFFFFF,也就是每个位上均为1
    // 与的定义是: 1 & 1 = 1, 0 & 1 = 0
    // 那么只要对应位上在遍历中出现过0,那么该位就会为0
    bitwiseMin &= value;
  }
  // 异或的定义是,相同为0,相异为1
  // bitwiseMax保留下曾经出现过1的位,bitwiseMin保留下曾经出现过0的位
  // 那么通过这两个数相异或,可以bitsChanged保留下在遍历中曾经有变化的位(指为1)
  long bitsChanged = bitwiseMin ^ bitwiseMax;
  // startByteIndex = 5, endByteIndex = 7
  for (int i = startByteIndex; i <= endByteIndex; i++) {
    // 拆开来看:
    // bitsChanged >>> (i * 8)),将bitsChanged零扩展右移 i * 8位
    // 将上述结果 & 0xff(=1111 1111),注意是和Long类型相与,隐式转换0xff前置有0
    // 可以得到在第i个字节在遍历中是否有变化(有变化即1,反之0)
    // 有变化就统计指标,没变化就说明对分区id的特定字节不需要排序
    if (((bitsChanged >>> (i * 8)) & 0xff) != 0) {
      // 分区id特定字节的值出现的次数,会被统计在count[i][分区id特定字节值]上
      // 每个字节有8位,一个字节最大能表示2 ^ 8 = 256,所以这里申请256
      // i取值5, 6, 7,即从低位到高位对应分区id特定字节
      counts[i] = new long[256];
      // 遍历array中每个元素(page, 分区id)
      for (long offset = array.getBaseOffset(); offset < maxOffset; offset += 8) {
        // Platform.getLong(baseObject, offset),得到(page, 分区id)
        // 将上述结果零扩展右移 (i * 8)位(这里是将分区id分成3个字节,i递增即可从低位到高位获得分区id的各个字节)
        // 再将右移的结果和0xff相与,那么就得到了分区id的指定字节的值[0, 255],将其转为int类型,并统计到count数组中
        counts[i][(int)((Platform.getLong(baseObject, offset) >>> (i * 8)) & 0xff)]++;
      }
    }
  }
  return counts;
}

这里统计下来的counts数组是用来做什么的呢?count[i][分区id特定字节值]记录的是分区id特定字节的值出现的次数,回顾基数排序,每次排序需要临时将数据有序地存入桶中,那么count[i][分区id特定字节值]的值就对应着桶的大小。因为是直接通过内存操作排序,因此桶的大小还需要转换为内存中的偏移量。
采取256进制,那么桶的个数自然是256个,在排序之前,会依据counts[i]数组,计算出每个桶在array中的起始偏移量,这里的桶之间的间隔自然就是桶的大小。怎么算的?counts[i][0]假设有3,并且当前起始偏移量是0,那么counts[i][1]的起始偏移量是0 + 3 * 8 = 24。

回到原流程中:
```scala
// RadixSort#sort

long inIndex = 0;
long outIndex = numRecords;
if (numRecords > 0) {
  long[][] counts = getCounts(array, numRecords, startByteIndex, endByteIndex);
  for (int i = startByteIndex; i <= endByteIndex; i++) {
    if (counts[i] != null) {
      // 排序的实现
      sortAtByte(
        // counts[i]将第i个字节的统计结果传入
        array, numRecords, counts[i], i, inIndex, outIndex,
        // desc = false, signed = false
        desc, signed && i == endByteIndex);
      // 交换inIndex和outIndex
      // 第一次遍历时,inIndex = 0, outIndex = 记录数
      // 第二次遍历时,inIndex = 记录数, outIndex = 0
      // 这是为了排序时依次使用array的后半部分和前半部分(反复提过,array是按照2倍于记录数大小申请的)
      // 想一想常规的基数排序,在按照第n位进行排序时,需要将对该位排序后的数据落到桶中,
      // 这里则是复用了array作为桶,第一次排序后的数据就有了两份,那么下次排序就可以覆盖掉array的前半部分了
      // 由于是奇数次交换,那么最终排序结果会出现在array的前半部分
      long tmp = inIndex;
      inIndex = outIndex;
      outIndex = tmp;
    }
  }
}

那么sortAtByte的实现如何呢?

// RadixSort
private static void sortAtByte(
    // counts第i个字节的统计结果,byteIdx = i
    LongArray array, long numRecords, long[] counts, int byteIdx, long inIndex, long outIndex,
    // desc = false, signed = false
    boolean desc, boolean signed) {
  assert counts.length == 256;
  // 将特定分区id特定字节转成的int值[0, 255]的出现次数,转换为在array中的偏移量,
  // 便于排序时的临时存储
  long[] offsets = transformCountsToOffsets(
    counts, numRecords, array.getBaseOffset() + outIndex * 8L, 8, desc, signed);
  // baseObject = null,在申请page时初始化为null,只需要知道它是内存拷贝时需要传入的对象
  Object baseObject = array.getBaseObject();
  // 获得当前轮次排序的起始地址
  long baseOffset = array.getBaseOffset() + inIndex * 8L;
  // 获得当前轮次排序的结束地址
  long maxOffset = baseOffset + numRecords * 8L;
  // 遍历每个(page, 分区id)
  for (long offset = baseOffset; offset < maxOffset; offset += 8) {
    // 取出(page, 分区id)
    long value = Platform.getLong(baseObject, offset);
    // 取出分区id的特定字节的值,作为桶的序号
    int bucket = (int)((value >>> (byteIdx * 8)) & 0xff);
    // 将(page, 分区id)写入到指定桶中
    Platform.putLong(baseObject, offsets[bucket], value);
    // 调整下桶的偏移量
    offsets[bucket] += 8;
  }
}

那么是怎么将桶的大小转换为偏移量的呢?

// RadixSort
private static long[] transformCountsToOffsets(
    // outputOffset = 在array中的起始偏移量,第一次排序在array后半部分,第二次在前半部分...
    // bytesPerRecord = 8,即每条(page, 分区id)8字节
    long[] counts, long numRecords, long outputOffset, long bytesPerRecord,
    // 两个false
    boolean desc, boolean signed) {
  assert counts.length == 256;
  // 有符号排序,这里是不会用的,因为我们排的分区id是无符号整数,因此从start = 0
  int start = signed ? 128 : 0;  // output the negative records first (values 129-255).
  // 降序排
  if (desc) {
    // 从后往前分配偏移量
    long pos = numRecords;
    for (int i = start; i < start + 256; i++) {
      pos -= counts[i & 0xff];
      counts[i & 0xff] = outputOffset + pos * bytesPerRecord;
    }
  } else {
    // 升序排(默认)
    long pos = 0;
    // 256个桶,计算每个桶的起始偏移量
    for (int i = start; i < start + 256; i++) {
      // 取出桶大小(有符号排序的话,i & 0xff最高只能求得255,因为固定无符号排序,不理解也没事)
      long tmp = counts[i & 0xff];
      // 从array起始偏移量开始,根据之前的桶大小的累加值(pos),求得当前桶的起始偏移量
      counts[i & 0xff] = outputOffset + pos * bytesPerRecord;
      // 累加桶大小,为下个桶的起始偏移量计算作准备
      pos += tmp;
    }
  }
  return counts;
}

到这,就完成了Spark UnsafeShuffleWriter默认采用的LSD Radix Sort的解析。


版权声明:本文为Christopher_L1n原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。