Zookeeper(五):内存数据、事务日志、数据快照,及数据恢复

DataNode
        数据节点,一个DataNode对应一个节点路径,内部包含其父节点、子节点列表、数据内容、ACL权限、节点状态等信息

public class DataNode implements Record {
    DataNode parent;
    byte data[];
    Long acl;    
    public StatPersisted stat;
    private Set<String> children = null;
...
}

DataTree
        主要负责管理DataNode及对应的相关事件,一种树结构,内部定义了根节点、临时节点容器、所有节点容器、节点事件管理器等等。其中临时节点的Map容器比较特殊,这是因为临时节点不会有子节点,为了便为清理,所以其Map中的key=sessionId,value=会话过程中创建的临时节点列表

public class DataTree {
    //所有节点列表,key=节点路径,value=节点对应对象
    private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>();
    //节点内容事件管理器
    private final WatchManager dataWatches = new WatchManager();
    //子节点事件管理器
    private final WatchManager childWatches = new WatchManager();
    //根节点
    private static final String rootZookeeper = "/";
    ...
    //临时节点不能有子节点,所以这里的key=sessionId,value=该会话过程中创建的所有临时节点
    private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>();
...
}

ZKDatabase
       ZkDatabase是zookeeper服务器中的内存数据库,负责所有会话、DataTree、事务日志的管理,以及数据快照、通过快照及事务日志恢复数据等等。从下面的源码可以看出,其中事务日志和数据快照这些磁盘持久化操作都由FileTxnSnapLog负责,而FileTxnSnapLog内部分别维护了一个FileTxnLog及FileSnap,前者负责事务日志的写读,后者负责快照文件的写读。
(在请求处理链的SyncRequestProcessor处理器中,会调用zkDb.append(rquest)进入下面的流程)

public class ZKDatabase {   
    protected DataTree dataTree;
    protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;   //维护所有session,K=sessionId,V=下一个超时时间点
    protected FileTxnSnapLog snapLog;                                  //负责事务日志、快照文件的读写
    protected long minCommittedLog, maxCommittedLog;
    public static final int commitLogCount = 500;
    protected static int commitLogBuffer = 700;
    protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();  //事务提议(事务日志)
    protected ReentrantReadWriteLock logLock = new ReentrantReadWriteLock();
    volatile private boolean initialized = false;
...
    public boolean append(Request si) throws IOException {  //添加事务请求
        return this.snapLog.append(si);                     //事务日志记录
    }
}
public class FileTxnSnapLog {
    private final File dataDir;
    private final File snapDir;
    private TxnLog txnLog;
    private SnapShot snapLog;
    public final static int VERSION = 2;
    public final static String version = "version-";

    public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {   
        this.dataDir = new File(dataDir, version + VERSION);
        this.snapDir = new File(snapDir, version + VERSION);
        .... 
        txnLog = new FileTxnLog(this.dataDir);
        snapLog = new FileSnap(this.snapDir);
    }
    
    public boolean append(Request si) throws IOException {
        return txnLog.append(si.hdr, si.txn);              //事务日志记录
    }
...
}

事务日志
       
对于客户端的每一个事务请求,Zookeeper都会将它们记录到事务日志中(同时也会变更到内存日志中),事务日志的记录由FileTxnLog负责,在未配置dataLogDir的情况下,事务日志默认写入dataDir配置的目录,Zookeeper在运行时会在该目录下创建一个/version-2子目录用来存放事务日志文件。
        日志文件默认预分配大小为64M,预分配的设计是为了避免频繁的触发磁盘I/O为文件分配新的磁盘块,提升I/O性能。生产环境下一般通过zookeeper.preAllocSize来增大预分配大小(单位为KB),减小生成的文件个数。 文件名为log.xxx,其中xx表示该文件中的第一条事务日志的事务ID。下方是上面源码的接续部分

public class FileTxnLog implements TxnLog {
     //...添加事务日志
     public synchronized boolean append(TxnHeader hdr, Record txn)
        throws IOException
    {
        //...未关联事务日志文件
        if (logStream==null) {
           ...
           //创建文件并关联到logStream
           logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
           fos = new FileOutputStream(logFileWrite);
           logStream=new BufferedOutputStream(fos);
           ...
        }
        //数据追加,内部根据大小确定是否扩容
        filePadding.padFile(fos.getChannel());
        ...
    }
}

public class FilePadding {
    ...
    private static long preAllocSize = 65536 * 1024; //默认预分配6大小为4M
    private static final ByteBuffer fill = ByteBuffer.allocateDirect(1);

    static {
        LOG = LoggerFactory.getLogger(FileTxnLog.class);

        String size = System.getProperty("zookeeper.preAllocSize");
        if (size != null) {
            try {
                preAllocSize = Long.parseLong(size) * 1024;  //所以配置的单位为KB
            } catch (NumberFormatException e) {
                LOG.warn(size + " is not a valid value for preAllocSize");
            }
        }
    }

    long padFile(FileChannel fileChannel) throws IOException {
        //计算新大小
        long newFileSize = calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize);
        if (currentSize != newFileSize) {
            //write时,如果已达文件末尾,会进行扩容,这点参与JDK文档 
            fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
            currentSize = newFileSize;
        }
        return currentSize;
    }
   
}



数据快照
       
数据快照本质上是将内存中的节点数据会话数据保存到磁盘。在某次事务日志记录之后,若事务记录的总次数超过了一定的次数(取决于zookeeper.snapCount配置<最小为2>),Zookeeper会将ZkDatabase中的数据全量dump到文件中,即数据快照。Zookeeper通过启动一个新线程来进行快照存储,以避免对事务请求造成阻塞,但如果前一次dump快照操作还在进行中,那么本次dump将跳过,直接该快照线程可用。
        其中次数对比采用“过半随机”的策略,当logCount > (snapCount / 2 + randRoll)时行快照,而没有采用固定的snapCount/2,这样是为了避免集群节点同时快照,影响整体性能。
       配置中的dataDir即代表快照目录,zookeeper同样会在该目录下生成一个version-2子目录来存储所在快照文件,文件格式为snapshot.xxx(xxx代表快照开始时服务器最新的事务ID),由于快照文件不像事务文件一样写入频繁,所以针对快照文件Zookeeper没有采用预分配的作用。此外由于过半随机原则及每次事务操作产生的数据量不一样,所以快照文件间大小并不一定相等。(下面是请求处理链的SyncRequestProcessor处理器的较详细代码)

public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
    ...
     @Override
    public void run() {
        try {
            int logCount = 0;
            setRandRoll(r.nextInt(snapCount/2));
            while (true) {
                //从请求队列中取出Request对象
                Request si = ...

                if (si != null) {
                    // track the number of records written to the log
                    // 事务日志写入成功
                    if (zks.getZKDatabase().append(si)) {
                        logCount++;
                        //过半检测
                        if (logCount > (snapCount / 2 + randRoll)) {
                            setRandRoll(r.nextInt(snapCount/2));
                            zks.getZKDatabase().rollLog();
                            // 上次快照未结束,跳过本次
                            if (snapInProcess != null && snapInProcess.isAlive()) {
                                LOG.warn("Too busy to snap, skipping");
                            } else {
                            //新开线程
                                snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                                        public void run() {
                                            try {
                                                zks.takeSnapshot();
                                            } catch(Exception e) {
                                                LOG.warn("Unexpected exception", e);
                                            }
                                        }
                                    };
                                snapInProcess.start();
                            }
                            logCount = 0;
                        }
                    } ...
                }
            }
        } ...
    }
}

数据恢复    
        其实在上篇关于选举的博客中说过,集群在启动时首先做的是数据加载loadDataBase(),然后才是选举。数据加载流程大概如下

  1. 创建ZKDatabase,给定对应的快照目录与事务日志目录
  2. 从快照目录中加载最后的快照文件内容。(由于每次快照都会全量dump数据,所以最新的快照数据通常是最全的)。
  3. 通过比较事务ID(快照文件名、事务日志文件名中的后缀即最新事务ID),获取事务记录增量数据(有可能跨多个事务日志文件),迭代事务日志文件列表,到快照之后产生的事务日志,进行更新。(即更新增量数据)
  4.  将事务日志同步到committedLog中,以便集群间快速同步
public class ZKDatabase {   
    protected DataTree dataTree;
    protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;   //维护所有session,K=sessionId,V=下一个超时时间点
    protected FileTxnSnapLog snapLog;     
    protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();
    ...          
    public long loadDataBase() throws IOException {
        long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
        initialized = true;
        return zxid;
    }
}

public class FileTxnSnapLog {
    public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
        snapLog.deserialize(dt, sessions);
        return fastForwardFromEdits(dt, sessions, listener);
    }
    public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,
                                     PlayBackListener listener) throws IOException {
        //事务日志目录
        FileTxnLog txnLog = new FileTxnLog(dataDir);
        //对比事务Id,找出事务日志增量数据(有可能跨事务日志,所以Itr内部维护了一个流来指向事务日志文件)
        //其itr.next()方法会读取日志文件中的每一条记录,如果读完,则其内部的流将指向下一个日志文件,然后返回下一个文件中的第一条记录
        TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
        long highestZxid = dt.lastProcessedZxid;
        TxnHeader hdr;
        try {
            while (true) {...                
                hdr = itr.getHeader();
                ...
                try {
                    processTransaction(hdr,dt,sessions, itr.getTxn());
                } ...
                //将事务同步到committedLog中,以便集群间快速同步
                listener.onTxnLoaded(hdr, itr.getTxn());
                if (!itr.next())   //读取当前事务日志文件中的下一条记录,如果当前记录是最后一条,则读取下一事务日志文件中的第一条记录
                    break;
            }
        } ...
        return highestZxid;
    }
}

 


版权声明:本文为w1673492580原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。