Zookeeper源码解析(一)

1. 序列化重要的接口

1.1 Record

/**
 * Interface that is implemented by generated classes.
 *
 * 如果在 zookeeper 中,某些类想要进行序列化或者反序列化操作,都需要实现这个接口
 *
 * Hadoop:提供了一个 Writable 接口
 *
 * class Student implements Writable{
 *      // 反序列化
 *     void readFields(DataInput in)
 *
 *     // 序列化
 *     void write(DataOutput out)
 * }
 */
@InterfaceAudience.Public
public interface Record {

    // 真正的序列化方法(把java对象,写到网络流中,持久化到磁盘中)
    public void serialize(OutputArchive archive, String tag) throws IOException;

    // 真正的反序列化方法(从网络流中读取对象。)
    public void deserialize(InputArchive archive, String tag) throws IOException;
}

1.2 Index

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.jute;

/**
 * Interface that acts as an iterator for deserializing maps.
 * The deserializer returns an instance that the record uses to
 * read vectors and maps. An example of usage is as follows:
 *
 * <code>
 * Index idx = startVector(...);
 * while (!idx.done()) {
 *   .... // read element of a vector
 *   idx.incr();
 * }
 * </code>
 *
 * 在进行反序列化的时候时候针对数据进行迭代操作的一个迭代器  Iterator (hasNext()  next())
 */
public interface Index {

    // 迭代完毕
    public boolean done();

    // 增加一条,获取下一条
    public void incr();
}

1.3 InputArchive

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.jute;

import java.io.IOException;

/**
 * Interface that all the Deserializers have to implement.
 *
 * 反序列化
 */
public interface InputArchive {
    public byte readByte(String tag) throws IOException;

    public boolean readBool(String tag) throws IOException;

    public int readInt(String tag) throws IOException;

    public long readLong(String tag) throws IOException;

    public float readFloat(String tag) throws IOException;

    public double readDouble(String tag) throws IOException;

    public String readString(String tag) throws IOException;

    public byte[] readBuffer(String tag) throws IOException;

    public void readRecord(Record r, String tag) throws IOException;

    public void startRecord(String tag) throws IOException;

    public void endRecord(String tag) throws IOException;

    public Index startVector(String tag) throws IOException;

    public void endVector(String tag) throws IOException;

    public Index startMap(String tag) throws IOException;

    public void endMap(String tag) throws IOException;
}

1.4 OutputArchive

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.jute;

import java.io.IOException;
import java.util.List;
import java.util.TreeMap;

/**
 * 所有串行器必须实现的接口。
 *
 * 序列化的
 */
public interface OutputArchive {
    public void writeByte(byte b, String tag) throws IOException;

    public void writeBool(boolean b, String tag) throws IOException;

    public void writeInt(int i, String tag) throws IOException;

    public void writeLong(long l, String tag) throws IOException;

    public void writeFloat(float f, String tag) throws IOException;

    public void writeDouble(double d, String tag) throws IOException;

    public void writeString(String s, String tag) throws IOException;

    public void writeBuffer(byte buf[], String tag) throws IOException;

    public void writeRecord(Record r, String tag) throws IOException;

    public void startRecord(Record r, String tag) throws IOException;

    public void endRecord(Record r, String tag) throws IOException;

    public void startVector(List v, String tag) throws IOException;

    public void endVector(List v, String tag) throws IOException;

    public void startMap(TreeMap v, String tag) throws IOException;

    public void endMap(TreeMap v, String tag) throws IOException;

}

Zookeeper默认的接口实现类是Binary的,并且默认底层用的是NIO,但是我发现Zookeeper的命名不是很好ByteBuffer bb = ByteBuffer.allocate(1024);,哈哈

image-20200811202904250

2. Zookeeper的持久化机制

Zookeeper本身是一个leader,follower的对等架构(内部选举leader)的分布式组件,每个节点上都保存了整个系统的所有数据,每个节点上的数据都把数据放到内存和磁盘各一份

Zookeeper集群有三种角色:Leader、Follower、Observer

每种角色在磁盘和内存都拥有集群中的完整数据,内存和磁盘找那个的数据应该怎么保持一致?

WAL预写日志,不管是什么分布式系统,只要是内存和磁盘都有数据,我相信和WAL绝对有关系,不一样不重要,但是一定是借鉴了WAL的思想

Zookeeper内存数据: 类Unix文件系统组织的一棵树(ZKDataBase《DataTree》),等将来我一定要好好学习一番操作系统,因为最好的软件就是操作系统

Zookeeper磁盘数据: 快照文件(每隔一段时间,进行拍摄快照,zk集群冷启动,肯定是从快照中恢复数据到内存的,其实hdfs也是这样的)

2.1 先看一下类结构图

image-20200811205444231

2.2 SnapShot 接口

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server.persistence;

import org.apache.zookeeper.server.DataTree;

import java.io.File;
import java.io.IOException;
import java.util.Map;

/**
 * snapshot interface for the persistence layer.
 * implement this interface for implementing snapshots.
 *
 * SnapShot只定义了四个方法,反序列化、序列化、查找最新的snapshot文件、释放资源。
 *
 * 所有的数据,在内存中有一份,都使用 Datanode来进行 树形结构的组织,最后形成为一个  DataTree的对象
 * DataTree 每隔一段时间,就拍摄快照,把内存数据持久化到磁盘.
 * 理论上来说,数据在磁盘有一份。(快照 + 未拍摄快照的已提交事务的日志组成)
 * 理论上来说,内存拥有完整的一份:DataTree
 */
public interface SnapShot {

    /**
     * deserialize a data tree from the last valid snapshot and
     * return the last zxid that was deserialized
     *
     * @param dt       the datatree to be deserialized into
     * @param sessions the sessions to be deserialized into
     * @return the last zxid that was deserialized from the snapshot
     * @throws IOException
     *
     * 从磁盘恢复数据到 内存
     */
    long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException;

    /**
     * persist the datatree and the sessions into a persistence storage
     *
     * @param dt       the datatree to be serialized
     * @param sessions
     * @throws IOException
     *
     * 内存的数据,持久化到磁盘
     */
    void serialize(DataTree dt, Map<Long, Integer> sessions, File name) throws IOException;

    /**
     * find the most recent snapshot file
     *
     * @return the most recent snapshot file
     * @throws IOException
     */
    File findMostRecentSnapshot() throws IOException;

    /**
     * free resources from this snapshot immediately
     *
     * @throws IOException
     */
    void close() throws IOException;
} 

2.3 TxnLog 接口

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server.persistence;

import java.io.IOException;

import org.apache.jute.Record;
import org.apache.zookeeper.server.ServerStats;
import org.apache.zookeeper.txn.TxnHeader;

/**
 * Interface for reading transaction logs.
 *
 * WAL 的具体实现类
 *  负责读写 操作日志 CommitedLog
 *
 *  TxnLog 使用追加的方式,把每次事务都记录在 log 中。
 *  每次事务:Transaction:  Header, Record
 *     header: 事务头
 *     record: 具体的事务信息
 *
 *  客户端和服务端进行通信的时候: Request 和 Response
 */
public interface TxnLog {

    /**
     +     * Setter for ServerStats to monitor fsync threshold exceed
     +     * @param serverStats used to update fsyncThresholdExceedCount
     +     */
     void setServerStats(ServerStats serverStats);
    
    /**
     * roll the current log being appended to
     * @throws IOException 
     */
    void rollLog() throws IOException;
    /**
     * Append a request to the transaction log
     * @param hdr the transaction header
     * @param r the transaction itself
     * returns true iff something appended, otw false 
     * @throws IOException
     */
    boolean append(TxnHeader hdr, Record r) throws IOException;

    /**
     * Start reading the transaction logs
     * from a given zxid
     * @param zxid
     * @return returns an iterator to read the 
     * next transaction in the logs.
     * @throws IOException
     */
    TxnIterator read(long zxid) throws IOException;
    
    /**
     * the last zxid of the logged transactions.
     * @return the last zxid of the logged transactions.
     * @throws IOException
     */
    long getLastLoggedZxid() throws IOException;
    
    /**
     * truncate the log to get in sync with the 
     * leader.
     * @param zxid the zxid to truncate at.
     * @throws IOException 
     */
    boolean truncate(long zxid) throws IOException;
    
    /**
     * the dbid for this transaction log. 
     * @return the dbid for this transaction log.
     * @throws IOException
     */
    long getDbId() throws IOException;
    
    /**
     * commmit the trasaction and make sure
     * they are persisted
     * @throws IOException
     */
    void commit() throws IOException;
   
    /** 
     * close the transactions logs
     */
    void close() throws IOException;
    /**
     * an iterating interface for reading 
     * transaction logs. 
     */
    public interface TxnIterator {
        /**
         * return the transaction header.
         * @return return the transaction header.
         */
        TxnHeader getHeader();
        
        /**
         * return the transaction record.
         * @return return the transaction record.
         */
        Record getTxn();
     
        /**
         * go to the next transaction record.
         * @throws IOException
         */
        boolean next() throws IOException;
        
        /**
         * close files and release the 
         * resources
         * @throws IOException
         */
        void close() throws IOException;
    }
}

2.4 FileTxnSnapLog

这两个接口的实现类,一个是FileTxnLog 一个是FIleSnap,但是最后并不是使用的这两个类,而是采用了组合模式使用FileTxnSnapLog

/**
 * This is a helper class above the implementations of txnlog and snapshot classes
 *
 * 这是一个组合类:既能操作日志,也能尽心快照操作
 * FileTxnSnapLog 类,该类的作用是用来管理 ZooKeeper 的数据存储等相关操作,可以看作为 ZooKeeper 服务层提供底层持久化的接口。
 * 在 ZooKeeper 服务启动过程中,它会根据 zoo.cfg 配置文件中的 dataDir 数据快照目录和 dataLogDir 事物日志目录来创建 FileTxnSnapLog 类。
 *
 * 关于 zk 的持久化最核心的一个类:
 *  连个成员变量:
 *  1、TxnLog: 专门维护 日志的读写
 *  2、SnapShot  专门负责快照的 管理
 */
public class FileTxnSnapLog {
    //...
}

3. Zookeeper的网络通信架构简单陈述

image-20200811210331663

  • Stats: 表示ServerCnxn上的统计数据。
  • Watcher:表示时间处理器。
  • ServerCnxn:表示服务器连接,表示一个从客户端到服务器的连接。
  • NettyServerCnxn:基于Netty的连接的具体实现。
  • NIOServerCnxn:基于NIO的连接的具体实现(默认)。

4. Watcher监听机制

image-20200811210936501

4.1 Watcher接口

package org.apache.zookeeper;

import org.apache.yetus.audience.InterfaceAudience;

/**
 * 此接口指定事件处理程序类必须实现的公共接口。 
 * ZooKeeper客户端将从与其连接的ZooKeeper服务器获取各种事件。
 * 使用此类客户端的应用程序通过向客户端注册回调对象来处理这些事件。
 * 回调对象应该是实现Watcher接口的类的实例。
 * 
 */
@InterfaceAudience.Public
public interface Watcher {

    /**
     * This interface defines the possible states an Event may represent
     */
    @InterfaceAudience.Public
    public interface Event {
        /**
         * Enumeration of states the ZooKeeper may be at the event
         */
        @InterfaceAudience.Public
        public enum KeeperState {
            /** Unused, this state is never generated by the server */
            // 未知状态,不再使用,服务器不会产生此状态
            @Deprecated
            Unknown (-1),

            /** The client is in the disconnected state - it is not connected
             * to any server in the ensemble. */
            // 断开
            Disconnected (0),

            /** Unused, this state is never generated by the server */
            // 未同步连接,不再使用,服务器不会产生此状态
            @Deprecated
            NoSyncConnected (1),

            /** The client is in the connected state - it is connected
             * to a server in the ensemble (one of the servers specified
             * in the host connection parameter during ZooKeeper client
             * creation). */
            // 同步连接状态
            SyncConnected (3),

            /**
             * Auth failed state
             */
            // 认证失败状态
            AuthFailed (4),

            /**
             * The client is connected to a read-only server, that is the
             * server which is not currently connected to the majority.
             * The only operations allowed after receiving this state is
             * read operations.
             * This state is generated for read-only clients only since
             * read/write clients aren't allowed to connect to r/o servers.
             */
            // 只读连接状态
            ConnectedReadOnly (5),

            /**
              * SaslAuthenticated: used to notify clients that they are SASL-authenticated,
              * so that they can perform Zookeeper actions with their SASL-authorized permissions.
              */
            // SASL认证通过状态
            SaslAuthenticated(6),

            /** The serving cluster has expired this session. The ZooKeeper
             * client connection (the session) is no longer valid. You must
             * create a new client connection (instantiate a new ZooKeeper
             * instance) if you with to access the ensemble. */
            // 过期状态
            Expired (-112);

            // Integer representation of value for sending over wire
            private final int intValue;

            KeeperState(int intValue) {
                this.intValue = intValue;
            }

            public int getIntValue() {
                return intValue;
            }

            public static KeeperState fromInt(int intValue) {
                switch(intValue) {
                    case   -1: return KeeperState.Unknown;
                    case    0: return KeeperState.Disconnected;
                    case    1: return KeeperState.NoSyncConnected;
                    case    3: return KeeperState.SyncConnected;
                    case    4: return KeeperState.AuthFailed;
                    case    5: return KeeperState.ConnectedReadOnly;
                    case    6: return KeeperState.SaslAuthenticated;
                    case -112: return KeeperState.Expired;

                    default:
                        throw new RuntimeException("Invalid integer value for conversion to KeeperState");
                }
            }
        }

        /**
         * Enumeration of types of events that may occur on the ZooKeeper
         */
        @InterfaceAudience.Public
        public enum EventType {
            None (-1),                  // 空
            NodeCreated (1),            // 节点创建
            NodeDeleted (2),            // 节点删除
            NodeDataChanged (3),        // 节点数据变化
            NodeChildrenChanged (4);    // 当前节点下的子节点增删事件

            // Integer representation of value for sending over wire
            private final int intValue;

            EventType(int intValue) {
                this.intValue = intValue;
            }

            public int getIntValue() {
                return intValue;
            }

            public static EventType fromInt(int intValue) {
                switch(intValue) {
                    case -1: return EventType.None;
                    case  1: return EventType.NodeCreated;
                    case  2: return EventType.NodeDeleted;
                    case  3: return EventType.NodeDataChanged;
                    case  4: return EventType.NodeChildrenChanged;

                    default:
                        throw new RuntimeException("Invalid integer value for conversion to EventType");
                }
            }           
        }
    }

    /**
     * 该方法是 client 接收到 ZooKeeper 返回回来的信息之后进行业务回调的方法
     * 所以如果 client 有什么响应逻辑需要执行,就可以在此方法中编写代码
     * 注意接收到的参数对象:WatchedEvent
     * 它里面有三个非常重要的信息:
     *  1、链接信息
     *  2、事件类型
     *  3、发生时间的znode节点
     * @param event
     */
    abstract public void process(WatchedEvent event);
}

4.2 WatchedEvent

/**
 *
 * WatchedEvent表示Watcher 能够响应的ZooKeeper上的更改。 WatchedEvent包括发生的确切事件, ZooKeeper的当前状态以及该事件所涉及的znode的路径。
 */
@InterfaceAudience.Public
public class WatchedEvent {

    // 链接信息
    final private KeeperState keeperState;
    // 事件类型
    final private EventType eventType;
    // 事件发生的znode节点
    private String path;

    /**
     * Create a WatchedEvent with specified type, state and path
     */
    public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
        this.keeperState = keeperState;
        this.eventType = eventType;
        this.path = path;
    }

    /**
     * Convert a WatcherEvent sent over the wire into a full-fledged WatcherEvent
     */
    public WatchedEvent(WatcherEvent eventMessage) {
        keeperState = KeeperState.fromInt(eventMessage.getState());
        eventType = EventType.fromInt(eventMessage.getType());
        path = eventMessage.getPath();
    }

    public KeeperState getState() {
        return keeperState;
    }

    public EventType getType() {
        return eventType;
    }

    public String getPath() {
        return path;
    }

    @Override
    public String toString() {
        return "WatchedEvent state:" + keeperState + " type:" + eventType + " path:" + path;
    }

    /**
     *  Convert WatchedEvent to type that can be sent over network
     */
    public WatcherEvent getWrapper() {
        return new WatcherEvent(eventType.getIntValue(), keeperState.getIntValue(), path);
    }
}

4.2 ClientWatchManager



package org.apache.zookeeper;

import java.util.Set;

/**
 * 主要用来给客户端返回一堆需要处理的watcher
 */
public interface ClientWatchManager {
    /**
     * Return a set of watchers that should be notified of the event. The manager must not notify the
     * watcher(s), however it will update it's internal structure as if the watches had triggered. The
     * intent being that the callee is now responsible for notifying the watchers of the event, possibly at
     * some later time.
     *
     * @param state event state   链接信息
     * @param type  event type    事件类型
     * @param path  event path    节点
     * @return may be empty set but must not be null
     *
     * WatchedEvent
     */

    /**
     * 根据 zk 返回回来的消息 来决定到底回调那个 watcher
     * 该方法表示事件发生时,返回需要被通知的Watcher集合,可能为空集合。
     *
     * @param state
     * @param type
     * @param path
     * @return
     *
     * 这个方法的三个参数,组装成一个对象,其实就是 WatchedEvent
     * 这就是服务端给你传送回来的三个参数,在客户端就会调用构造方法,把这三个参数构造成 WatchedEvent 对象
     */
    public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path);

}

4.3 Zookeeper的内部类ZKWatchManager

/** 
 * 管理观察者并处理由ClientCnxn对象生成的事件。 * *我们将其实现为ZooKeeper的嵌套类,以便不会将公共方法作为ZooKeeper客户端API的一部分公开。
 */
    private static class ZKWatchManager implements ClientWatchManager {

        // 如果某一个znode发生了对应的事件! 根据 zk 返回的 WatchedEvent 获取 path(znode路径)
        // 通过 这个 key(path) 到这三个 map 里去寻找对应的 watcher  (materialize())
        // 通过另外一个方法来调用刚才返回的这些 watcher 的 process 方法

        // 数据变化事件  需要回调的 监听器 列表
        private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
        // zk.getData(nodepath, watcher)

        // znode的创建和删除事件  需要回调的 监听器 列表
        private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
        // zk.exists(nodepath, watcher)

        // znode节点的子节点创建和删除  需要回调的 监听器 列表
        private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
        // zk.getChildren(nodepath, watcher)

        /**
         * 触发监听器  通过 nodepath来确定到底调用那个 watcher 的回调方法: process
         */

        /**
         * ZooKeeeper zk  = new ZooKeeper(address, timeout, watcher)
         */
        // 客户端监听器
        private volatile Watcher defaultWatcher;

        final private void addTo(Set<Watcher> from, Set<Watcher> to) {
            if (from != null) {
                to.addAll(from);
            }
        }

        /**
         * (non-Javadoc)
         *
         * @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState, Event.EventType,
         * java.lang.String)
         *
         * 该方法的作用是返回需要响应的 Watcher 集合!
         * 从 dataWatches existWatches childWatches 三个 Watcher 集合中,挑选合适的 Watcher 到 result 中。
         *
         * 说明:该方法在事件发生后,返回需要被通知的Watcher集合。在该方法中,首先会根据EventType类型确定相应的事件类型,
         * 然后根据事件类型的不同做出相应的操作,如针对None类型,即无任何事件,则首先会从三个键值对中删除clientPath对应的Watcher,
         * 然后将剩余的Watcher集合添加至结果集合;针对NodeDataChanged和NodeCreated事件而言,
         * 其会从dataWatches和existWatches中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合。
         */
        @Override
        public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath) {

            // 新生成结果Watcher集合
            Set<Watcher> result = new HashSet<Watcher>();

            switch (type) {  // 确定事件类型
                case None:
                    result.add(defaultWatcher);  // 添加默认Watcher

                    // 是否需要清空(提取对zookeeper.disableAutoWatchReset字段进行配置的值、Zookeeper的状态是否为同步连接)
                    boolean clear = ClientCnxn.getDisableAutoResetWatch() && state != Watcher.Event.KeeperState.SyncConnected;

                    synchronized (dataWatches) {
                        for (Set<Watcher> ws : dataWatches.values()) {
                            result.addAll(ws);
                        }
                        if (clear) {
                            dataWatches.clear();
                        }
                    }

                    synchronized (existWatches) {
                        for (Set<Watcher> ws : existWatches.values()) {
                            result.addAll(ws);
                        }
                        if (clear) {
                            existWatches.clear();
                        }
                    }

                    synchronized (childWatches) {
                        for (Set<Watcher> ws : childWatches.values()) {
                            result.addAll(ws);
                        }
                        if (clear) {
                            childWatches.clear();
                        }
                    }

                    return result;
                case NodeDataChanged:       // 节点数据变化
                case NodeCreated:           // 创建节点
                    synchronized (dataWatches) {
                        // 移除clientPath对应的Watcher后全部添加至结果集合
                        addTo(dataWatches.remove(clientPath), result);
                    }
                    synchronized (existWatches) {
                        // 移除clientPath对应的Watcher后全部添加至结果集合
                        addTo(existWatches.remove(clientPath), result);
                    }
                    break;
                case NodeChildrenChanged:       // 节点子节点变化
                    synchronized (childWatches) {
                        // 移除clientPath对应的Watcher后全部添加至结果集合
                        addTo(childWatches.remove(clientPath), result);
                    }
                    break;
                case NodeDeleted:               // 删除节点
                    synchronized (dataWatches) {
                        // 移除clientPath对应的Watcher后全部添加至结果集合
                        addTo(dataWatches.remove(clientPath), result);
                    }
                    // XXX This shouldn't be needed, but just in case
                    synchronized (existWatches) {
                        // 移除clientPath对应的Watcher
                        Set<Watcher> list = existWatches.remove(clientPath);
                        if (list != null) {
                            // 移除clientPath对应的Watcher后全部添加至结果集合
                            addTo(list, result);
                            LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
                        }
                    }
                    synchronized (childWatches) {
                        // 移除clientPath对应的Watcher后全部添加至结果集合
                        addTo(childWatches.remove(clientPath), result);
                    }
                    break;
                default:
                    String msg = "Unhandled watch event type " + type + " with state " + state + " on path " + clientPath;
                    LOG.error(msg);
                    throw new RuntimeException(msg);
            }

            return result;
        }
    }

总结:

1、序列化机制(四个类:Record、InputArchive、OutputArchive、Index)

2、持久化机制(五个类:TxnLog、SnapShot、FileSnap、FileTxnSnapLog)

3、网络通信机制(两个类:ClientCnxn、ServerCnxn)

4、监听机制(Watcher、WathcerManager、WatchedEvent(KeeperState、EventType))


版权声明:本文为weixin_43704599原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。