Zookeeper源码解析 -- 本地事务日志持久化之FileTxnLog

 序言

  1. 在各个分布式组件中,持久化数据到本地的思想并不少见,为的是能保存内存中的数据,以及重启后能够重载上次内存状态的值。那么如何行之有效的进行,内存数据持久化到磁盘,怎么样的落盘策略合适,怎么设计持久化数据格式,如何进行记录的日志文件大小扩容,以及如何加载回磁盘数据到内存中,这么一些问题就是值得思考的问题了。

      2. 通过这篇文章的分析,我们能够看到一些持久化数据的基本原则,以及一些如何行之有效写数据到磁盘的方式方法(例如 利用稀疏文件,文件空洞,来进行文件大小的占位和填充),以及一些持久化本地数据的一些通用校验的思考,和迭代持久化数据的一些思考

 

 

正文

  1.  zookeeper 中涉及持久化的地方有两块,第一个是事务日志,第二个是快照日志, 事务日志即是每个进行更改事务的记录, 快照日志是内存中数据集体的快照. 本文我们先了解 事务日志 FileTxnLog 
  2. 设计一个框架和组件的过程中,我们需要进行的第一步是先设计接口,因为外界系统最好能只跟接口打交道,而只有通过接口来进行调用,更符合软件设计的最终目的,可替换的实现以及不变的接口。那我们先来看下,zookeeper中涉及本地持久化的接口。

 

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server.persistence;

import java.io.Closeable;
import java.io.IOException;
import org.apache.jute.Record;
import org.apache.zookeeper.server.ServerStats;
import org.apache.zookeeper.txn.TxnDigest;
import org.apache.zookeeper.txn.TxnHeader;

/**
 * Interface for reading transaction logs.
 *
 */
public interface TxnLog extends Closeable {

    /**
     * Setter for ServerStats to monitor fsync threshold exceed
     * @param serverStats used to update fsyncThresholdExceedCount
     */
    void setServerStats(ServerStats serverStats);

    /**
     * roll the current
     * log being appended to
     * @throws IOException
     */
    void rollLog() throws IOException;
    /**
     * Append a request to the transaction log
     * @param hdr the transaction header
     * @param r the transaction itself
     * @return true iff something appended, otw false
     * @throws IOException
     */
    boolean append(TxnHeader hdr, Record r) throws IOException;

    /**
     * Append a request to the transaction log with a digset
     * @param hdr the transaction header
     * @param r the transaction itself
     * @param digest transaction digest
     * returns true iff something appended, otw false
     * @throws IOException
     */
    boolean append(TxnHeader hdr, Record r, TxnDigest digest) throws IOException;

    /**
     * Start reading the transaction logs
     * from a given zxid
     * @param zxid
     * @return returns an iterator to read the
     * next transaction in the logs.
     * @throws IOException
     */
    TxnIterator read(long zxid) throws IOException;

    /**
     * the last zxid of the logged transactions.
     * @return the last zxid of the logged transactions.
     * @throws IOException
     */
    long getLastLoggedZxid() throws IOException;

    /**
     * truncate the log to get in sync with the
     * leader.
     * @param zxid the zxid to truncate at.
     * @throws IOException
     */
    boolean truncate(long zxid) throws IOException;

    /**
     * the dbid for this transaction log.
     * @return the dbid for this transaction log.
     * @throws IOException
     */
    long getDbId() throws IOException;

    /**
     * commit the transaction and make sure
     * they are persisted
     * @throws IOException
     */
    void commit() throws IOException;

    /**
     *
     * @return transaction log's elapsed sync time in milliseconds
     */
    long getTxnLogSyncElapsedTime();

    /**
     * close the transactions logs
     */
    void close() throws IOException;

    /**
     * Sets the total size of all log files
     */
    void setTotalLogSize(long size);

    /**
     * Gets the total size of all log files
     */
    long getTotalLogSize();

    /**
     * an iterating interface for reading
     * transaction logs.
     */
    interface TxnIterator extends Closeable {

        /**
         * return the transaction header.
         * @return return the transaction header.
         */
        TxnHeader getHeader();

        /**
         * return the transaction record.
         * @return return the transaction record.
         */
        Record getTxn();

        /**
         * @return the digest associated with the transaction.
         */
        TxnDigest getDigest();

        /**
         * go to the next transaction record.
         * @throws IOException
         */
        boolean next() throws IOException;

        /**
         * close files and release the
         * resources
         * @throws IOException
         */
        void close() throws IOException;

        /**
         * Get an estimated storage space used to store transaction records
         * that will return by this iterator
         * @throws IOException
         */
        long getStorageSize() throws IOException;

    }

}

 

可以看到每个接口都有些许注释,但是整体接口不能说是很轻的,要理解这个持久化接口的实现和运行原理,需要看其实现。 首先需要明确目标,我们要了解的主要就是两个事情,第一  zookeeper持久化是怎么把内存对象存进磁盘的, 第二 磁盘持久化数据是怎么读出来,加载到内存中的。

这两件事是我们阅读这个接口的核心目标。随着目标出发,第一我们无非要了解,zk 是如何把内存对象持久化到磁盘的。

 

可以看到接口中的append方法就是插入数据到磁盘的方法了,查看其实现. TxnLog的实现类是FileTxnLog, 然后其文件头有一些注释,我们可以了解其大概。

/**
 * This class implements the TxnLog interface. It provides api's
 * to access the txnlogs and add entries to it.
 * <p>
 * The format of a Transactional log is as follows:
 * <blockquote><pre>
 * LogFile:
 *     FileHeader TxnList ZeroPad
 *
 * FileHeader: {
 *     magic 4bytes (ZKLG)
 *     version 4bytes
 *     dbid 8bytes
 *   }
 *
 * TxnList:
 *     Txn || Txn TxnList
 *
 * Txn:
 *     checksum Txnlen TxnHeader Record 0x42
 *
 * checksum: 8bytes Adler32 is currently used
 *   calculated across payload -- Txnlen, TxnHeader, Record and 0x42
 *
 * Txnlen:
 *     len 4bytes
 *
 * TxnHeader: {
 *     sessionid 8bytes
 *     cxid 4bytes
 *     zxid 8bytes
 *     time 8bytes
 *     type 4bytes
 *   }
 *
 * Record:
 *     See Jute definition file for details on the various record types
 *
 * ZeroPad:
 *     0 padded to EOF (filled during preallocation stage)
 * </pre></blockquote>
 */
public class FileTxnLog implements TxnLog, Closeable {

// 略
}

   

可以看到,这个类的备注里面有着丰富的信息,包括这个数据格式如何在磁盘里面存储,是以怎样的方式序列化到磁盘的,然而光看这么一些落盘的存储信息,你还是会看得一脸懵逼,不知所云,这些内容只有你读完它的实现,反过来阅读的时候才会发现,这注释写得不错。

我们直接来看看这个append方法

    /**
     * append an entry to the transaction log
     * @param hdr the header of the transaction
     * @param txn the transaction part of the entry
     * returns true iff something appended, otw false
     */
    public synchronized boolean append(TxnHeader hdr, Record txn) throws IOException {
              return append(hdr, txn, null);
    }

    @Override
    public synchronized boolean append(TxnHeader hdr, Record txn, TxnDigest digest) throws IOException {
        if (hdr == null) {
            return false;
        }
        if (hdr.getZxid() <= lastZxidSeen) {
            LOG.warn(
                "Current zxid {} is <= {} for {}",
                hdr.getZxid(),
                lastZxidSeen,
                hdr.getType());
        } else {
            lastZxidSeen = hdr.getZxid();
        }
        if (logStream == null) {
            LOG.info("Creating new log file: {}", Util.makeLogName(hdr.getZxid()));

            logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
            fos = new FileOutputStream(logFileWrite);
            logStream = new BufferedOutputStream(fos);
            oa = BinaryOutputArchive.getArchive(logStream);
            FileHeader fhdr = new FileHeader(TXNLOG_MAGIC, VERSION, dbId);
            fhdr.serialize(oa, "fileheader");
            // Make sure that the magic number is written before padding.
            logStream.flush();
            filePadding.setCurrentSize(fos.getChannel().position());
            streamsToFlush.add(fos);
        }
        filePadding.padFile(fos.getChannel());
        byte[] buf = Util.marshallTxnEntry(hdr, txn, digest);
        if (buf == null || buf.length == 0) {
            throw new IOException("Faulty serialization for header " + "and txn");
        }
        Checksum crc = makeChecksumAlgorithm();
        crc.update(buf, 0, buf.length);
        oa.writeLong(crc.getValue(), "txnEntryCRC");
        Util.writeTxnBytes(oa, buf);

        return true;
    }

   可以看到,这个同步的append 方法里面,一般持久化内存数据需要这么几个参数 ,TxnHeader ,Record,TxnDigest 这么几个对象,TxnDigest这个可为null, Record在前面的序列化文章里面提供,这里就先不表了,记住它是进行序列化和反序列化的工具类即可。

然后就是这个TxhHeader了,那么这个是什么东西呢,查看其实现。

@InterfaceAudience.Public
public class TxnHeader implements Record {
  private long clientId;
  private int cxid;
  private long zxid;
  private long time;
  private int type;
  public TxnHeader() {
  }
  public TxnHeader(
        long clientId,
        int cxid,
        long zxid,
        long time,
        int type) {
    this.clientId=clientId;
    this.cxid=cxid;
    this.zxid=zxid;
    this.time=time;
    this.type=type;
  }

// 略
}

可以看到这个也是通过jute包自动生成的可序列对象,那么 里面的几个变量就是重点,变量是什么意思呢。看名字其实大概能看出不少信息,首先是clientId 这个很明显,就是client 连接zkServer的唯一性标识。 cxid(这个还没了解是什么东西,在这里先忽略,不急,因为到了后面我们肯定知道它会是什么东西,看起来也是id标识),然后这个zxid,这个是zookeeper每个操作都会生成的一个序列号,是递增的,为保证分布式情况下有序的一个序列号。   time 这个没说的,时间戳。  type这个东西在磁盘数据反序列化到内存的过程中用来标记,接下来的Record接口到底是什么对象的,因为我们虽然通过Record接口序列化数据到磁盘,但是反序列过程回来一定是要反序列成具体对象的,而不是接口。

 

看完这个旁支信息,接下去去读,这个append方法(可以回顾上文append方法代码)。首先是判断序列号的正确性

     if (hdr.getZxid() <= lastZxidSeen) {
            LOG.warn(
                "Current zxid {} is <= {} for {}",
                hdr.getZxid(),
                lastZxidSeen,
                hdr.getType());
        } else {
            lastZxidSeen = hdr.getZxid();
        }

可以看到判断插入的hdr(TxnHeader)的zxid的序列号大小,一般这个大小要大于 lastZxidSeen,否则就是状态不对了,前面说到这个zxid序列号是递增的,这个也好理解了。

 

然后,进行落盘插入数据的判断了,首先第一步,需要判断当前有没有这个日志文件,如果没有需要进行生成。

  @Override
    public synchronized boolean append(TxnHeader hdr, Record txn, TxnDigest digest) throws IOException {
        if (hdr == null) {
            return false;
        }
        if (hdr.getZxid() <= lastZxidSeen) {
            LOG.warn(
                "Current zxid {} is <= {} for {}",
                hdr.getZxid(),
                lastZxidSeen,
                hdr.getType());
        } else {
            lastZxidSeen = hdr.getZxid();
        }
        if (logStream == null) {
            LOG.info("Creating new log file: {}", Util.makeLogName(hdr.getZxid()));

            logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
            fos = new FileOutputStream(logFileWrite);
            logStream = new BufferedOutputStream(fos);
            oa = BinaryOutputArchive.getArchive(logStream);
            FileHeader fhdr = new FileHeader(TXNLOG_MAGIC, VERSION, dbId);
            fhdr.serialize(oa, "fileheader");
            // Make sure that the magic number is written before padding.
            logStream.flush();
            filePadding.setCurrentSize(fos.getChannel().position());
            streamsToFlush.add(fos);
        }
        filePadding.padFile(fos.getChannel());
        byte[] buf = Util.marshallTxnEntry(hdr, txn, digest);
        if (buf == null || buf.length == 0) {
            throw new IOException("Faulty serialization for header " + "and txn");
        }
        Checksum crc = makeChecksumAlgorithm();
        crc.update(buf, 0, buf.length);
        oa.writeLong(crc.getValue(), "txnEntryCRC");
        Util.writeTxnBytes(oa, buf);

        return true;
    }

 

可以看到 先判断了 logStream是否为空,这个logStream就是日志文件里面的 file stream流,可以看其初始化的过程,  在判断logStream为空后,说明还没有日志文件,需要先生成这个日志文件, 这个日志文件名的生成在 Util.makeLogName(hdr.getZxid()),可以看其

文件名生成有无什么讲究

    /**
     * Creates a valid transaction log file name.
     *
     * @param zxid used as a file name suffix (extension)
     * @return file name
     */
    public static String makeLogName(long zxid) {
        return FileTxnLog.LOG_FILE_PREFIX + "." + Long.toHexString(zxid);
    }

可以看到这里文件名没有大的讲究,就是通过第一次调用落盘的zxid 进行hex操作,但是因为zxid是递增的,所以日志文件也能够通过名字来判断生成的先后顺序,后面可以看到通过名字这个特性来进行日志排序的操作。

为什么需要看看文件名字的生成有没有附加信息呢,因为有些其他框架,日志文件名其实能包含比较多的信息,文件名其实就是一个索引,文件名的附带信息越多甚至能带来查找的很大的性能提升。

 

接下去阅读这个append方法,可以看到得到文件输出流后, 首先会在文件头面前加上 基本的 FileHeader,序列化到磁盘,这个算是一种校验,把一些magic number (这种特定协议或者是存储落盘加上自己特殊的校验方式的行为也是不少见,可以学习的地方),序列化到磁盘,然后把该输出流存到队列里面.   

接下来可以说道的方法就是 filePadding.padFile(fos.getChannel()); 这个是什么操作呢, 这个其实就是前面序言提到的,生成稀疏文件,文件空洞的一种技巧,这个稀疏文件和文件空洞(我在这里就不提了,因为这个也提的话,文章篇幅就会长很多)。简而言之吧,就是 通过文件 的seek方法,把

文件阅读到的position推到 超过文件尾部limit 的地方,其中文件内容 和文件尾部中间有一片空白,这样文件的逻辑大小就是一整块,但是磁盘大小却没有多占用(可以通过 类unix系统命令 ,ls -l 和 du 查看到文件的逻辑大小和实际磁盘大小并不相等)。这是unix系统允许我们这样做的一种行为。

稀疏文件的内容大概形式表现为:

内容主体  + 空白 +  EOF   

这样做的好处其实是有的,例如如果你需要利用多线程下载,你就可以通过先占用一整块的逻辑空间,然后不同线程在同一文件的不同position进行插入数据,这也就是为什么像 迅雷或者百度网盘,在下载大文件的时候都会预留一块大块的文件空间,明明文件还没下载完,但是文件的逻辑大小就已经是下载完整的一块磁盘大小了。

 

简单说完这个稀疏文件的作用看看,zk里面的filePadding是如何进行日志文件扩容的

 

    /**
     * pad the current file to increase its size to the next multiple of preAllocSize greater than the current size and position
     *
     * @param fileChannel the fileChannel of the file to be padded
     * @throws IOException
     */
    long padFile(FileChannel fileChannel) throws IOException {
        long newFileSize = calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize);
        if (currentSize != newFileSize) {
            fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
            currentSize = newFileSize;
        }
        return currentSize;
    }


  /**
     * Calculates a new file size with padding. We only return a new size if
     * the current file position is sufficiently close (less than 4K) to end of
     * file and preAllocSize is > 0.
     *
     * @param position     the point in the file we have written to
     * @param fileSize     application keeps track of the current file size
     * @param preAllocSize how many bytes to pad
     * @return the new file size. It can be the same as fileSize if no
     * padding was done.
     */
    // VisibleForTesting
    public static long calculateFileSizeWithPadding(long position, long fileSize, long preAllocSize) {
        // If preAllocSize is positive and we are within 4KB of the known end of the file calculate a new file size
        if (preAllocSize > 0 && position + 4096 >= fileSize) {
            // If we have written more than we have previously preallocated we need to make sure the new
            // file size is larger than what we already have
            if (position > fileSize) {
                fileSize = position + preAllocSize;
                fileSize -= fileSize % preAllocSize;
            } else {
                fileSize += preAllocSize;
            }
        }

        return fileSize;
    }

可以看到,zk作者也是怕你看不懂其含义,这个注释也是很全面的。

现在来看这个,calculateFileSizeWithPadding 方法,这个方法通过注释也可知,是当当前文件阅读的position 离 filesize不足4k byte的时候,进行扩容操作的。这里的preAllocSize 默认为64M, 这个函数当当前position 距离fileSize不足4k的时候,会计算出一个新的size,这个size是以64M大小向前移动的。

然后看到 padFile(FileChannel fileChannel) 这个方法,这个方法中,直接用fileChannel,写入 距离当前position ,64M 距离的位置,写入的fill 的定义是

private static final ByteBuffer fill = ByteBuffer.allocateDirect(1);
是只有一个byte的数组,所以目的很明显,这里是利用类unix系统允许 创建稀疏文件的特性,写入了一个逻辑大小不断向前移动64M的稀疏文件。这种稀疏文件的定义和产生,在持久化数据方式上,可能并不少见。起码用到了zk的组件应该会沿袭这一特性,例如 kafka等。



接下来的append方法的流程是

// 略 
byte[] buf = Util.marshallTxnEntry(hdr, txn, digest);
        if (buf == null || buf.length == 0) {
            throw new IOException("Faulty serialization for header " + "and txn");
        }
        Checksum crc = makeChecksumAlgorithm();
        crc.update(buf, 0, buf.length);
        oa.writeLong(crc.getValue(), "txnEntryCRC");
        Util.writeTxnBytes(oa, buf);


}

这里的主要目的是比较核心的,就是写入主要数据到磁盘中,先用marshallTxnEntry 方法,把这几个对象输出为byte[]数组,然后可以看到其实现

   public static byte[] marshallTxnEntry(TxnHeader hdr, Record txn, TxnDigest digest)
            throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        OutputArchive boa = BinaryOutputArchive.getArchive(baos);

        hdr.serialize(boa, "hdr");
        if (txn != null) {
            txn.serialize(boa, "txn");
        }
        if (digest != null) {
            digest.serialize(boa, "digest");
        }
        return baos.toByteArray();
    }

 

可以看到序列化对象的方法是从一而终的,保持一致性。用的还是前面提到的设计的Record方式来进行序列化。

然后可以注意到,这里用了一个提取数据摘要的算法,接口是CheckSum, 这个可以提及一下。为什么很多存储或者序列化反序列,一般都要加上一个摘要呢,这是为了辨别文件或者数据流有没有被篡改,如果有篡改,那么事实算出来的摘要和后面存进去的数据摘要不一致,那么就可以直接抛异常了。

这个CheckSum算法是 Adler32算法,CheckSum是 jdk里面自带接口,实现这个接口的摘要算法不在少数,有兴趣的自己去看。

/**
* creates a checksum algorithm to be used
* @return the checksum used for this txnlog
*/
protected Checksum makeChecksumAlgorithm() {
return new Adler32();
}
 

然后算出摘要值后,把摘要值先序列化到磁盘,然后把真正的byte数组再写入磁盘

值得注意的是,为了标记序列化结尾,这里特地用了,一个结尾符标志位,反序列化时候,比较容易知道是否到达某个对象的尾部。

  /**
     * Write the serialized transaction record to the output archive.
     *
     * @param oa output archive
     * @param bytes serialized transaction record
     * @throws IOException
     */
    public static void writeTxnBytes(OutputArchive oa, byte[] bytes) throws IOException {
        oa.writeBuffer(bytes, "txnEntry");
        oa.writeByte((byte) 0x42, "EOR"); // 'B'
    }

 

 

讲完落盘序列化实现,这里再简单讲下 从磁盘反序列对象到内存的步骤。

看其接口定义

  /**
     * start reading all the transactions from the given zxid
     * @param zxid the zxid to start reading transactions from
     * @return returns an iterator to iterate through the transaction
     * logs
     */
    public TxnIterator read(long zxid) throws IOException {
        return read(zxid, true);
    }

    /**
     * start reading all the transactions from the given zxid.
     *
     * @param zxid the zxid to start reading transactions from
     * @param fastForward true if the iterator should be fast forwarded to point
     *        to the txn of a given zxid, else the iterator will point to the
     *        starting txn of a txnlog that may contain txn of a given zxid
     * @return returns an iterator to iterate through the transaction logs
     */
    public TxnIterator read(long zxid, boolean fastForward) throws IOException {
        return new FileTxnIterator(logDir, zxid, fastForward);
    }

需要传入的是想要从哪个zxid开始进行 加载,至于zxid是怎么进行获得

可以看到TxnLog接口中还有一个方法

  /**
     * the last zxid of the logged transactions.
     * @return the last zxid of the logged transactions.
     * @throws IOException
     */
    long getLastLoggedZxid() throws IOException;

    /**

获取最新的的zxid.

查看这个 read注释,里面有个 fastForward参数,从参数中,我们得知,我们要通过这个TxnIterator进行数据的迭代获取。而需不需要直接定位到最新的zxid那条记录,还是定位到zxid所在的log日志的第一条记录开始迭代读取。

然后思考下,为什么要这样设计接口,为什么要通过zxid来获取这个 TxnIterator 来进行迭代数据。

  1. 首先我们前面知道这个zxid是递增的,其实每个序列化对象的zxid都是具有唯一性的,那么这个zxid近乎是这个每个对象的id了,查找的时候需要这个东西,定位到哪个对象,序列化哪个对象
  2. 因为我们从磁盘里面要找到某个对象的时候,需要先定位文件,而前面说到了文件名其实是由zxid来进行hex命名的,所以文件名能找到对应的zxid,也就能定位到文件,至于如何做定位,直接对比文件名中的zxid比该需要找的zxid的要小的文件基本就在那,而且倒序排列这些Log文件就行
  3. 其三是,如果这个zxid开始信息不知道,看到接口中有个获取最近一次zxid的接口,那我们就恍然大悟,这个zxid 完全是可以通过已落盘的文件取到的信息。但至于效率与否,有待商榷

 

接下去就是了解这个TxnIterator的设计了,通过迭代器来读取列表数据,很经典的设计。

 

  /**
         * create an iterator over a transaction database directory
         * @param logDir the transaction database directory
         * @param zxid the zxid to start reading from
         * @param fastForward   true if the iterator should be fast forwarded to
         *        point to the txn of a given zxid, else the iterator will
         *        point to the starting txn of a txnlog that may contain txn of
         *        a given zxid
         * @throws IOException
         */
        public FileTxnIterator(File logDir, long zxid, boolean fastForward) throws IOException {
            this.logDir = logDir;
            this.zxid = zxid;
            init();

            if (fastForward && hdr != null) {
                while (hdr.getZxid() < zxid) {
                    if (!next()) {
                        break;
                    }
                }
            }
        }

这里有几个参数,logDir 日志文件地址,zxid 包含zxid的日志文件开始的地方,fastForward是否定位到zxid 对应的位置返回相应的TxnIterator,然后需要看看其init()方法,

   /**
         * initialize to the zxid specified
         * this is inclusive of the zxid
         * @throws IOException
         */
        void init() throws IOException {
            storedFiles = new ArrayList<>();
            List<File> files = Util.sortDataDir(
                FileTxnLog.getLogFiles(logDir.listFiles(), 0),
                LOG_FILE_PREFIX,
                false);
            for (File f : files) {
                if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) >= zxid) {
                    storedFiles.add(f);
                } else if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) < zxid) {
                    // add the last logfile that is less than the zxid
                    storedFiles.add(f);
                    break;
                }
            }
            goToNextLog();
            next();
        }

首先进行的是将日志文件拍个序,从这个Util.sortDataDir方法可以看出其基本含义,然后看看其实现,限于篇幅,这个比较简单的sort方法就不讲了,它的主要作用是,把具有前缀的log文件,通过其名字(名字是zxid,递增)进行升序或者降序排序。

然后找到,大于需要查找zxid的log,将其加进storedFiles  List里面,至于那个else if 是为了小于zxid 的log 的第一个,也加进storeFiles里面,因为这个文件也可能包含zxid 对应的TxnIterator。

查看goToNextLog方法

   /**
         * go to the next logfile
         * @return true if there is one and false if there is no
         * new file to be read
         * @throws IOException
         */
        private boolean goToNextLog() throws IOException {
            if (storedFiles.size() > 0) {
                this.logFile = storedFiles.remove(storedFiles.size() - 1);
                ia = createInputArchive(this.logFile);
                return true;
            }
            return false;
        }

意图还比较明显,就是从上面找到的倒序排序里面的 logs 日志文件 list里面,开始进行迭代,这里注意到,文件从尾部开始迭代,而因为这个file list又是倒序排序的,所以其实这个迭代顺序,是从 TxnIterator 传进来的zxid,最接近的那个log 开始正序迭代。就相当于是为了找到传进来的zxid,从最

靠近的位置,向前迭代,迭代过程zxid是递增的。

 

然后就是这个next()了,这个是deserialize 的主体,是看到TxnIterator是如何进行迭代的。

 /**
         * the iterator that moves to the next transaction
         * @return true if there is more transactions to be read
         * false if not.
         */
        public boolean next() throws IOException {
            if (ia == null) {
                return false;
            }
            try {
                long crcValue = ia.readLong("crcvalue");
                byte[] bytes = Util.readTxnBytes(ia);
                // Since we preallocate, we define EOF to be an
                if (bytes == null || bytes.length == 0) {
                    throw new EOFException("Failed to read " + logFile);
                }
                // EOF or corrupted record
                // validate CRC
                Checksum crc = makeChecksumAlgorithm();
                crc.update(bytes, 0, bytes.length);
                if (crcValue != crc.getValue()) {
                    throw new IOException(CRC_ERROR);
                }
                TxnLogEntry logEntry = SerializeUtils.deserializeTxn(bytes);
                hdr = logEntry.getHeader();
                record = logEntry.getTxn();
                digest = logEntry.getDigest();
            } catch (EOFException e) {
                LOG.debug("EOF exception", e);
                inputStream.close();
                inputStream = null;
                ia = null;
                hdr = null;
                // this means that the file has ended
                // we should go to the next file
                if (!goToNextLog()) {
                    return false;
                }
                // if we went to the next log file, we should call next() again
                return next();
            } catch (IOException e) {
                inputStream.close();
                throw e;
            }
            return true;
        }

 这个函数有一些精妙之处,包括异常处理,和递归调用,也算是一些小技巧。

看到这个处理流程,取得checkSum 的摘要数字,和算出来的摘要数字进行对比,判断数据是否被篡改,真正进行deserilize 的方法放在SerializeUtils里面,然后看看里面的实现

 

   public static TxnLogEntry deserializeTxn(byte[] txnBytes) throws IOException {
        TxnHeader hdr = new TxnHeader();
        final ByteArrayInputStream bais = new ByteArrayInputStream(txnBytes);
        InputArchive ia = BinaryInputArchive.getArchive(bais);

        hdr.deserialize(ia, "hdr");
        bais.mark(bais.available());
        Record txn = null;
        switch (hdr.getType()) {
        case OpCode.createSession:
            // This isn't really an error txn; it just has the same
            // format. The error represents the timeout
            txn = new CreateSessionTxn();
            break;
        case OpCode.closeSession:
            txn = ZooKeeperServer.isCloseSessionTxnEnabled()
                    ?  new CloseSessionTxn() : null;
            break;
        case OpCode.create:
        case OpCode.create2:
            txn = new CreateTxn();
            break;
        case OpCode.createTTL:
            txn = new CreateTTLTxn();
            break;
        case OpCode.createContainer:
            txn = new CreateContainerTxn();
            break;
        case OpCode.delete:
        case OpCode.deleteContainer:
            txn = new DeleteTxn();
            break;
        case OpCode.reconfig:
        case OpCode.setData:
            txn = new SetDataTxn();
            break;
        case OpCode.setACL:
            txn = new SetACLTxn();
            break;
        case OpCode.error:
            txn = new ErrorTxn();
            break;
        case OpCode.multi:
            txn = new MultiTxn();
            break;
        default:
            throw new IOException("Unsupported Txn with type=%d" + hdr.getType());
        }
        if (txn != null) {
            try {
                txn.deserialize(ia, "txn");
            } catch (EOFException e) {
                // perhaps this is a V0 Create
                if (hdr.getType() == OpCode.create) {
                    CreateTxn create = (CreateTxn) txn;
                    bais.reset();
                    CreateTxnV0 createv0 = new CreateTxnV0();
                    createv0.deserialize(ia, "txn");
                    // cool now make it V1. a -1 parentCVersion will
                    // trigger fixup processing in processTxn
                    create.setPath(createv0.getPath());
                    create.setData(createv0.getData());
                    create.setAcl(createv0.getAcl());
                    create.setEphemeral(createv0.getEphemeral());
                    create.setParentCVersion(-1);
                } else if (hdr.getType() == OpCode.closeSession) {
                    // perhaps this is before CloseSessionTxn was added,
                    // ignore it and reset txn to null
                    txn = null;
                } else {
                    throw e;
                }
            }
        }
        TxnDigest digest = null;

        if (ZooKeeperServer.isDigestEnabled()) {
            digest = new TxnDigest();
            try {
                digest.deserialize(ia, "digest");
            } catch (EOFException exception) {
                // may not have digest in the txn
                digest = null;
            }
        }

        return new TxnLogEntry(txn, hdr, digest);
    }

然后可以看到这里用到了TxnHeader 里面的参数 getType(),还记得我前面说过,Serialize 的时候可以用Record接口直接序列化,反序列化时候需要对应真正的实体对象,这时候要怎么进行反序列化,这时候就需要有标志位进行标记,这个标志位就是type,

这里有好几种不同的实体类型,TxnHeader其实是记录一些标志头的,这些实体类型从名字上可以看出一些端倪,但是限于篇幅,我这里先不讲了,后面说。

 

回到next(),可以看到把这些反序列化的TxnLogEntry,回记录在TxnIterator上面,方便外面调用。如果出异常,说明文件格式不对,在catch那里会直接跳到下一个日志文件,继续递归处理。

讲到这,基本就把这个TxnIterator迭代的过程讲完了,那么反序列化流程基本也算是讲完了。

 

然后还有一些零碎的接口,我这里就先不讲了,我这里简单地提下它们的功能,有兴趣的读者可以自行阅读。

 

public interface TxnLog extends Closeable {

  /**
     * roll the current
     * log being appended to
     * @throws IOException
     */
    void rollLog() throws IOException;

    /**
     * the last zxid of the logged transactions.
     * @return the last zxid of the logged transactions.
     * @throws IOException
     */
    long getLastLoggedZxid() throws IOException;


// 略

}

首先讲下这个rollLog,前面可能没有提到,zk会有意地控制每个log的大小的,靠的就是这个rollLog()接口,当每个log的文件大小到达一定大小后(默认67m)就会重置这个rollLog函数,使其重新走重建log日志的流程。简单看下这个函数的实现

// Roll the log file if we exceed the size limit
if (txnLogSizeLimit > 0) {
long logSize = getCurrentLogSize();

if (logSize > txnLogSizeLimit) {
LOG.debug("Log size limit reached: {}", logSize);
rollLog();
}
}
可以看到表现意图很明显,在这就先不表了。

而这个getLastLoggedZxid ,从名字上也可看到是获取最尾部的那个zxid那个函数。实现也不麻烦,通过log文件名排序即可,在此不表。

 

结束语

做一个详细的源码解析,或者说写一个文章都是不容易的,这个篇幅,就是仅仅解析一个日志序列化的功能,写得我都累了。

总而言之, 创作不易,且行且珍惜。

 


版权声明:本文为dongjijiaoxiangqu原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。