存储管理
diskstore
memorystore
一大组件
BlockManager
消息通信(Actor模型)
master(BlockManager) to slave(BlockManager)
slave(BlockManager) to master(BlockManager)
RDD/Block
在调度层, RDD由多个partition/bucket构成
在存储层, RDD又是以block为单位进行存取,
对于DiskStore,则一个blcok一个物理文件,再由hash map管理id和路径
对于MemoryStore,则直接由HashMap管理id和block
在task的rdd.iterator()中,即partition/bucket要进行运算时,会处理通过BlockManager管理block
(rdd的partition和block为一一对应)
spark 中的block是rdd在被task执行之前,其基本组成partition被blockManage映射而来的一种抽象
spark 中,在storage模块里面所有的操作都是和block相关的,但是在RDD里面所有的运算都是基于partition的
如果当前RDD的storage level不是NONE的话,表示该RDD在BlockManager中有存储,那么调用CacheManager中的getOrCompute()函数计算RDD,
在这个函数中partition和block发生了关系:
首先根据RDD id和partition index构造出block id (rdd_xx_xx),接着从BlockManager中取出相应的block
需要注意的是block的计算和存储是阻塞的,若另一线程也需要用到此block则需等到该线程block的loading结束
(hdfs 中的 block 是存储的最小单元)spark中的RDD-Cache, Shuffle-output, 以及broadcast的实现都是基于BlockManager来实现, BlockManager提供了数据存储(内存/文件存储)接口.
这里的Block和HDFS中谈到的Block块是有本质区别:
HDFS中是对大文件进行分Block进行存储,Block大小固定为512M等;
Spark中的Block是用户的操作单位, 一个Block对应一块有组织的内存,一个完整的文件或文件的区间端,并没有固定每个Block大小的做法;
(hdfs 中的 block 是存储的最小单元)
trait BlockDataManager {
def getBlockData(blockId: String): Option[ManagedBuffer]
def putBlockData(blockId: String, data: ManagedBuffer, level: StorageLevel): Unit
}Spark中Block类型
RDDBlock:"rdd_" + rddId + "_" + splitIndex; 即每个RDD block表示一个特定rdd的一个分片
ShuffleBlock:关于shuffle,在Spark的1.1版本中发布一个sort版本的shuffle,原先的版本为hash,因此两种类型的shuffle也对应了两种数据结构
Hash版本,ShuffleBlock:"shuffle_" + shuffleId + "" + mapId + "" + reduceId
Sort版本,对于每一个bucket(shuffleId + "" + mapId + "" + reduceId组合)由ShuffleDataBlock和ShuffleIndexBlock两种block组成
"shuffle_" + shuffleId + "" + mapId + "" + reduceId + ".data"
"shuffle_" + shuffleId + "" + mapId + "" + reduceId + ".index"
BroadcastBlock:"broadcast_" + broadcastId + "_" + field)
TaskResultBlock:"taskresult_" + taskId;Spark中task运行的结果也是通过BlockManager进行管理
StreamBlock: "input-" + streamId + "-" + uniqueId应该是用于streaming中
TempBlock: "temp_" + id
转载于:https://my.oschina.net/igooglezm/blog/978869
