微服务系列(二)(3) ZooKeeper源码分析-part-2

微服务系列(二)(3) ZooKeeper源码分析-part-2

前文跟踪源码分析了ZooKeeper Server的初始化过程,通讯原理及选举机制,本文将继续进入源码,探究ZooKeeper的存储机制。

通过前文的链路追踪,可以知道ZooKeeper的存储核心类是org.apache.zookeeper.server.ZKDatabase

下面就开始分析解读它的实现,它在内存中保存了怎样的数据结构,又是以哪种策略来写入文件?

首先回忆一下它是在哪里进行初始化的

...
quorumPeer.setTxnFactory(new FileTxnSnapLog(
                      config.getDataLogDir(),
                      config.getDataDir()));
...
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
...

可以看到通过调用org.apache.zookeeper.server.ZKDatabase#ZKDatabase构造器来初始化,并且传递了一个FileTxnSnapLog对象

public ZKDatabase(FileTxnSnapLog snapLog) {
    dataTree = createDataTree();
    sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
    this.snapLog = snapLog;

    try {
        snapshotSizeFactor = Double.parseDouble(
            System.getProperty(SNAPSHOT_SIZE_FACTOR,
                    Double.toString(DEFAULT_SNAPSHOT_SIZE_FACTOR)));
        if (snapshotSizeFactor > 1) {
            snapshotSizeFactor = DEFAULT_SNAPSHOT_SIZE_FACTOR;
            LOG.warn("The configured {} is invalid, going to use " +
                    "the default {}", SNAPSHOT_SIZE_FACTOR,
                    DEFAULT_SNAPSHOT_SIZE_FACTOR);
        }
    } catch (NumberFormatException e) {
        LOG.error("Error parsing {}, using default value {}",
                SNAPSHOT_SIZE_FACTOR, DEFAULT_SNAPSHOT_SIZE_FACTOR);
        snapshotSizeFactor = DEFAULT_SNAPSHOT_SIZE_FACTOR;
    }
    LOG.info("{} = {}", SNAPSHOT_SIZE_FACTOR, snapshotSizeFactor);
}

这里发现了一个系统参数zookeeper.snapshotSizeFactor,默认为0.33,打个tag,总觉得是个比较重要的参数。

继续进入createDataTree()

public DataTree createDataTree() {
    return new DataTree();
}
public DataTree() {
    /* Rather than fight it, let root have an alias */
    nodes.put("", root);
    nodes.put(rootZookeeper, root);

    /** add the proc node and quota node */
    root.addChild(procChildZookeeper);
    nodes.put(procZookeeper, procDataNode);

    procDataNode.addChild(quotaChildZookeeper);
    nodes.put(quotaZookeeper, quotaDataNode);

    addConfigNode();

    nodeDataSize.set(approximateDataSize());
    try {
        dataWatches = WatchManagerFactory.createWatchManager();
        childWatches = WatchManagerFactory.createWatchManager();
    } catch (Exception e) {
        LOG.error("Unexpected exception when creating WatchManager, " +
                "exiting abnormally", e);
        System.exit(ExitCode.UNEXPECTED_ERROR.getValue());
    }
}

又出来了一个类DataTree,先看看官方注释:

/**
 * This class maintains the tree data structure. It doesn't have any networking
 * or client connection code in it so that it can be tested in a stand alone
 * way.
 * <p>
 * The tree maintains two parallel data structures: a hashtable that maps from
 * full paths to DataNodes and a tree of DataNodes. All accesses to a path is
 * through the hashtable. The tree is traversed only when serializing to disk.
 * 
 * 翻译:
 * 该类维护树数据结构。 它没有任何网络或客户端连接代码,因此可以独立测试。
 * <p>
 * 树维护着两个并行的数据结构:一个从完整路径映射到DataNodes的哈希表和一个DataNode树。 
 * 对路径的所有访问都是通过哈希表进行的。 仅在序列化到磁盘时遍历树。
 */

大致可以了解到,其内部维护了一个树的数据结构,并且还维护了路径到节点的hash表(在java中其实就是一个Map,K是完整路径,V是节点)。

可以看到,初始化过程会初始化几个默认节点:

  • /zookeeper
  • /zookeeper/quota
  • /zookeeper/config

并设置所存储节点的当前近似数据大小

以及

dataWatches = WatchManagerFactory.createWatchManager();
childWatches = WatchManagerFactory.createWatchManager();

很容易联想到zk中的监听器功能,后续在深入了解其原理。

到这里,可以发现,原来ZooKeeper中的节点org.apache.zookeeper.server.DataNode对象保存children信息是通过保存其路径来标志父子关系,这也是为什么同一路径下不允许出现同名的节点的原因。

Api中有一个核心的方法org.apache.zookeeper.server.DataTree#processTxn(TxnHeader, org.apache.jute.Record, boolean)

这个方法则是进行事务消息的写入和存储(内存中的存储)

将请求或本地封装一个TxnHeader对象保存请求信息,Record作为内容对象,进行方法的调用。

TxnHeader对象解析出clientId、cxid、zxid、type等信息,并根据type来做不同的处理,如:

create:创建节点(不携带stat信息)

create2:创建节点(携带stat信息)

setData:更新节点信息

…等

那么到这里,就会有一个疑问,这个zkDatabase作为内存中用于与其他组件交互的“数据库”,为什么没有提供持久化的操作呢?

原来是有的,只是我刚开始没有发现,这个方法命名有些不太好定位。

org.apache.zookeeper.server.ZKDatabase#append

我凭什么推测是这个方法进行持久化操作,有以下原因:

  1. 从前一篇文章中有提到proccessPacket()方法,该方法存在于LearnerFollowerObserver中,并分别做了不同的实现,这里以Follower举例,可以在处理PROPOSAL消息逻辑中找到fzk.logRequest(hdr, txn);,而它最终调用了org.apache.zookeeper.server.SyncRequestProcessor#processRequest,最终定位到逻辑zks.getZKDatabase().append(si),猜想证实(zk的请求处理器会在下一篇文章详细讲到)
  2. 从官方注释来看append to the underlying transaction log
  3. 从方法底层实现来看,其最终调用org.apache.zookeeper.server.persistence.FileTxnLog#append,而在这个方法中,会找到文件流操作的逻辑。

暂且不管zk的请求处理器是如何转发请求的,直接进入执行sync请求的逻辑,zk除了存储Node信息外,还额外做了哪些事情。

由一个线程来循环处理这样的逻辑:

while (true) {
    Request si = null;
    if (toFlush.isEmpty()) {
        si = queuedRequests.take();
    } else {
        si = queuedRequests.poll();
        if (si == null) {
            flush(toFlush);
            continue;
        }
    }
    if (si == requestOfDeath) {
        break;
    }
    if (si != null) {
        // track the number of records written to the log
        if (zks.getZKDatabase().append(si)) {
            logCount++;
            if (logCount > (snapCount / 2 + randRoll)) {
                randRoll = r.nextInt(snapCount/2);
                // roll the log
                zks.getZKDatabase().rollLog();
                // take a snapshot
                if (snapInProcess != null && snapInProcess.isAlive()) {
                    LOG.warn("Too busy to snap, skipping");
                } else {
                    snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                            public void run() {
                                try {
                                    zks.takeSnapshot();
                                } catch(Exception e) {
                                    LOG.warn("Unexpected exception", e);
                                }
                            }
                        };
                    snapInProcess.start();
                }
                logCount = 0;
            }
        } else if (toFlush.isEmpty()) {
            // optimization for read heavy workloads
            // iff this is a read, and there are no pending
            // flushes (writes), then just pass this to the next
            // processor
            if (nextProcessor != null) {
                nextProcessor.processRequest(si);
                if (nextProcessor instanceof Flushable) {
                    ((Flushable)nextProcessor).flush();
                }
            }
            continue;
        }
        toFlush.add(si);
        if (toFlush.size() > 1000) {
            flush(toFlush);
        }
    }
}

要理解这段逻辑,则需要理解这几个对象:

toFlush:内存中缓存的请求,用于缓冲,当请求进入时,会先进入Flush链表,而当达到flush条件时,则会触发flush进行批量的处理Flush链表中的请求。(flush条件1.请求队列为空且Flush链表不为空(此时请求pending在zkServer)2.请求队列不为空且Flush链表为空(此时请求不断进入,将请求pending后批量处理))

randRoll:作为一个随机值,来决定执行快照备份的时机。

那么,这段逻辑做了以下几件事:

  • 如果是写操作,追加到事务日志,如果是读操作,直接转发给下一个处理器
  • 当执行写操作时,令logCount++,检查logCount是否到达随机阈值(0.5*snapCount~snapCount),如果达到阈值,则进行快照备份操作,即每经过一定次数的写事务日志操作,则会进行一次快照备份,且是通过守护线程来异步执行快照。(这里需要注意的是,在执行快照的逻辑前还执行了rollLog(),可以理解为日志的分段,在定期备份的同时,形成新的事务文件,日志名以log.${zxid}命名)

接着深入它快照备份了哪些信息

最终定位到org.apache.zookeeper.server.persistence.FileTxnSnapLog#save

public void save(DataTree dataTree,
                 ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
                 boolean syncSnap)
    throws IOException {
    long lastZxid = dataTree.lastProcessedZxid;
    File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
    LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
            snapshotFile);
    try {
        snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
    } catch (IOException e) {
        if (snapshotFile.length() == 0) {
            /* This may be caused by a full disk. In such a case, the server
             * will get stuck in a loop where it tries to write a snapshot
             * out to disk, and ends up creating an empty file instead.
             * Doing so will eventually result in valid snapshots being
             * removed during cleanup. */
            if (snapshotFile.delete()) {
                LOG.info("Deleted empty snapshot file: " +
                         snapshotFile.getAbsolutePath());
            } else {
                LOG.warn("Could not delete empty snapshot file: " +
                         snapshotFile.getAbsolutePath());
            }
        } else {
            /* Something else went wrong when writing the snapshot out to
             * disk. If this snapshot file is invalid, when restarting,
             * ZooKeeper will skip it, and find the last known good snapshot
             * instead. */
        }
        throw e;
    }
}

可以看到,其最终将内存中的DataTree信息以及session会话信息序列化后写入到快照文件

另外,在上文源码追踪过程中了解到,在ZkServer启动时,会启动一个用于日志清理的线程,防止日志的无限制堆积而影响处理速度。

通过以上的源码定位和分析,可以了解到Zk的存储机制:

  • ZooKeeper client在使用时,对于节点的增删改查操作对应于ZooKeeper server中的ZkDatabase的DataTree数据结构
  • ZooKeeper的节点信息保存在内存中
  • ZooKeeper在执行事务请求时会写入到日志文件,并且事务日志会根据事务消息的数量不定期的分段,日志名以log.${zxid}命名
  • ZooKeeper会不定期的进行快照备份,备份原理是将内存中的DataTree信息以及session会话信息序列化后写入到快照文件
  • ZooKeeper写入文件的原理,使用jdk提供的FileOutputStream(BufferedOutputStream(DataOutputStream()))IO流的API实现

至此,通过源码分析ZooKeeper的存储机制暂时告一段落了。


版权声明:本文为xc1158840657原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。