微服务系列(二)(3) ZooKeeper源码分析-part-2
前文跟踪源码分析了ZooKeeper Server的初始化过程,通讯原理及选举机制,本文将继续进入源码,探究ZooKeeper的存储机制。
通过前文的链路追踪,可以知道ZooKeeper的存储核心类是org.apache.zookeeper.server.ZKDatabase
下面就开始分析解读它的实现,它在内存中保存了怎样的数据结构,又是以哪种策略来写入文件?
首先回忆一下它是在哪里进行初始化的
...
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
...
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
...
可以看到通过调用org.apache.zookeeper.server.ZKDatabase#ZKDatabase构造器来初始化,并且传递了一个FileTxnSnapLog对象
public ZKDatabase(FileTxnSnapLog snapLog) {
dataTree = createDataTree();
sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
this.snapLog = snapLog;
try {
snapshotSizeFactor = Double.parseDouble(
System.getProperty(SNAPSHOT_SIZE_FACTOR,
Double.toString(DEFAULT_SNAPSHOT_SIZE_FACTOR)));
if (snapshotSizeFactor > 1) {
snapshotSizeFactor = DEFAULT_SNAPSHOT_SIZE_FACTOR;
LOG.warn("The configured {} is invalid, going to use " +
"the default {}", SNAPSHOT_SIZE_FACTOR,
DEFAULT_SNAPSHOT_SIZE_FACTOR);
}
} catch (NumberFormatException e) {
LOG.error("Error parsing {}, using default value {}",
SNAPSHOT_SIZE_FACTOR, DEFAULT_SNAPSHOT_SIZE_FACTOR);
snapshotSizeFactor = DEFAULT_SNAPSHOT_SIZE_FACTOR;
}
LOG.info("{} = {}", SNAPSHOT_SIZE_FACTOR, snapshotSizeFactor);
}
这里发现了一个系统参数zookeeper.snapshotSizeFactor,默认为0.33,打个tag,总觉得是个比较重要的参数。
继续进入createDataTree()
public DataTree createDataTree() {
return new DataTree();
}
public DataTree() {
/* Rather than fight it, let root have an alias */
nodes.put("", root);
nodes.put(rootZookeeper, root);
/** add the proc node and quota node */
root.addChild(procChildZookeeper);
nodes.put(procZookeeper, procDataNode);
procDataNode.addChild(quotaChildZookeeper);
nodes.put(quotaZookeeper, quotaDataNode);
addConfigNode();
nodeDataSize.set(approximateDataSize());
try {
dataWatches = WatchManagerFactory.createWatchManager();
childWatches = WatchManagerFactory.createWatchManager();
} catch (Exception e) {
LOG.error("Unexpected exception when creating WatchManager, " +
"exiting abnormally", e);
System.exit(ExitCode.UNEXPECTED_ERROR.getValue());
}
}
又出来了一个类DataTree,先看看官方注释:
/** * This class maintains the tree data structure. It doesn't have any networking * or client connection code in it so that it can be tested in a stand alone * way. * <p> * The tree maintains two parallel data structures: a hashtable that maps from * full paths to DataNodes and a tree of DataNodes. All accesses to a path is * through the hashtable. The tree is traversed only when serializing to disk. * * 翻译: * 该类维护树数据结构。 它没有任何网络或客户端连接代码,因此可以独立测试。 * <p> * 树维护着两个并行的数据结构:一个从完整路径映射到DataNodes的哈希表和一个DataNode树。 * 对路径的所有访问都是通过哈希表进行的。 仅在序列化到磁盘时遍历树。 */
大致可以了解到,其内部维护了一个树的数据结构,并且还维护了路径到节点的hash表(在java中其实就是一个Map,K是完整路径,V是节点)。
可以看到,初始化过程会初始化几个默认节点:
- /zookeeper
- /zookeeper/quota
- /zookeeper/config
并设置所存储节点的当前近似数据大小
以及
dataWatches = WatchManagerFactory.createWatchManager();
childWatches = WatchManagerFactory.createWatchManager();
很容易联想到zk中的监听器功能,后续在深入了解其原理。
到这里,可以发现,原来ZooKeeper中的节点org.apache.zookeeper.server.DataNode对象保存children信息是通过保存其路径来标志父子关系,这也是为什么同一路径下不允许出现同名的节点的原因。
Api中有一个核心的方法org.apache.zookeeper.server.DataTree#processTxn(TxnHeader, org.apache.jute.Record, boolean)
这个方法则是进行事务消息的写入和存储(内存中的存储)
将请求或本地封装一个TxnHeader对象保存请求信息,Record作为内容对象,进行方法的调用。
从TxnHeader对象解析出clientId、cxid、zxid、type等信息,并根据type来做不同的处理,如:
create:创建节点(不携带stat信息)
create2:创建节点(携带stat信息)
setData:更新节点信息
…等
那么到这里,就会有一个疑问,这个zkDatabase作为内存中用于与其他组件交互的“数据库”,为什么没有提供持久化的操作呢?
原来是有的,只是我刚开始没有发现,这个方法命名有些不太好定位。
org.apache.zookeeper.server.ZKDatabase#append
我凭什么推测是这个方法进行持久化操作,有以下原因:
- 从前一篇文章中有提到
proccessPacket()方法,该方法存在于Learner、Follower、Observer中,并分别做了不同的实现,这里以Follower举例,可以在处理PROPOSAL消息逻辑中找到fzk.logRequest(hdr, txn);,而它最终调用了org.apache.zookeeper.server.SyncRequestProcessor#processRequest,最终定位到逻辑zks.getZKDatabase().append(si),猜想证实(zk的请求处理器会在下一篇文章详细讲到) - 从官方注释来看
append to the underlying transaction log - 从方法底层实现来看,其最终调用
org.apache.zookeeper.server.persistence.FileTxnLog#append,而在这个方法中,会找到文件流操作的逻辑。
暂且不管zk的请求处理器是如何转发请求的,直接进入执行sync请求的逻辑,zk除了存储Node信息外,还额外做了哪些事情。
由一个线程来循环处理这样的逻辑:
while (true) {
Request si = null;
if (toFlush.isEmpty()) {
si = queuedRequests.take();
} else {
si = queuedRequests.poll();
if (si == null) {
flush(toFlush);
continue;
}
}
if (si == requestOfDeath) {
break;
}
if (si != null) {
// track the number of records written to the log
if (zks.getZKDatabase().append(si)) {
logCount++;
if (logCount > (snapCount / 2 + randRoll)) {
randRoll = r.nextInt(snapCount/2);
// roll the log
zks.getZKDatabase().rollLog();
// take a snapshot
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();
}
logCount = 0;
}
} else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue;
}
toFlush.add(si);
if (toFlush.size() > 1000) {
flush(toFlush);
}
}
}
要理解这段逻辑,则需要理解这几个对象:
toFlush:内存中缓存的请求,用于缓冲,当请求进入时,会先进入Flush链表,而当达到flush条件时,则会触发flush进行批量的处理Flush链表中的请求。(flush条件1.请求队列为空且Flush链表不为空(此时请求pending在zkServer)2.请求队列不为空且Flush链表为空(此时请求不断进入,将请求pending后批量处理))
randRoll:作为一个随机值,来决定执行快照备份的时机。
那么,这段逻辑做了以下几件事:
- 如果是写操作,追加到事务日志,如果是读操作,直接转发给下一个处理器
- 当执行写操作时,令logCount++,检查
logCount是否到达随机阈值(0.5*snapCount~snapCount),如果达到阈值,则进行快照备份操作,即每经过一定次数的写事务日志操作,则会进行一次快照备份,且是通过守护线程来异步执行快照。(这里需要注意的是,在执行快照的逻辑前还执行了rollLog(),可以理解为日志的分段,在定期备份的同时,形成新的事务文件,日志名以log.${zxid}命名)
接着深入它快照备份了哪些信息
最终定位到org.apache.zookeeper.server.persistence.FileTxnSnapLog#save
public void save(DataTree dataTree,
ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
boolean syncSnap)
throws IOException {
long lastZxid = dataTree.lastProcessedZxid;
File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
snapshotFile);
try {
snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
} catch (IOException e) {
if (snapshotFile.length() == 0) {
/* This may be caused by a full disk. In such a case, the server
* will get stuck in a loop where it tries to write a snapshot
* out to disk, and ends up creating an empty file instead.
* Doing so will eventually result in valid snapshots being
* removed during cleanup. */
if (snapshotFile.delete()) {
LOG.info("Deleted empty snapshot file: " +
snapshotFile.getAbsolutePath());
} else {
LOG.warn("Could not delete empty snapshot file: " +
snapshotFile.getAbsolutePath());
}
} else {
/* Something else went wrong when writing the snapshot out to
* disk. If this snapshot file is invalid, when restarting,
* ZooKeeper will skip it, and find the last known good snapshot
* instead. */
}
throw e;
}
}
可以看到,其最终将内存中的DataTree信息以及session会话信息序列化后写入到快照文件
另外,在上文源码追踪过程中了解到,在ZkServer启动时,会启动一个用于日志清理的线程,防止日志的无限制堆积而影响处理速度。
通过以上的源码定位和分析,可以了解到Zk的存储机制:
- ZooKeeper client在使用时,对于节点的增删改查操作对应于ZooKeeper server中的ZkDatabase的DataTree数据结构
- ZooKeeper的节点信息保存在内存中
- ZooKeeper在执行事务请求时会写入到日志文件,并且事务日志会根据事务消息的数量不定期的分段,日志名以log.${zxid}命名
- ZooKeeper会不定期的进行快照备份,备份原理是将内存中的DataTree信息以及session会话信息序列化后写入到快照文件
- ZooKeeper写入文件的原理,使用jdk提供的FileOutputStream(BufferedOutputStream(DataOutputStream()))IO流的API实现
至此,通过源码分析ZooKeeper的存储机制暂时告一段落了。