DataNode
数据节点,一个DataNode对应一个节点路径,内部包含其父节点、子节点列表、数据内容、ACL权限、节点状态等信息
public class DataNode implements Record {
DataNode parent;
byte data[];
Long acl;
public StatPersisted stat;
private Set<String> children = null;
...
}DataTree
主要负责管理DataNode及对应的相关事件,一种树结构,内部定义了根节点、临时节点容器、所有节点容器、节点事件管理器等等。其中临时节点的Map容器比较特殊,这是因为临时节点不会有子节点,为了便为清理,所以其Map中的key=sessionId,value=会话过程中创建的临时节点列表
public class DataTree {
//所有节点列表,key=节点路径,value=节点对应对象
private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>();
//节点内容事件管理器
private final WatchManager dataWatches = new WatchManager();
//子节点事件管理器
private final WatchManager childWatches = new WatchManager();
//根节点
private static final String rootZookeeper = "/";
...
//临时节点不能有子节点,所以这里的key=sessionId,value=该会话过程中创建的所有临时节点
private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>();
...
}ZKDatabase
ZkDatabase是zookeeper服务器中的内存数据库,负责所有会话、DataTree、事务日志的管理,以及数据快照、通过快照及事务日志恢复数据等等。从下面的源码可以看出,其中事务日志和数据快照这些磁盘持久化操作都由FileTxnSnapLog负责,而FileTxnSnapLog内部分别维护了一个FileTxnLog及FileSnap,前者负责事务日志的写读,后者负责快照文件的写读。
(在请求处理链的SyncRequestProcessor处理器中,会调用zkDb.append(rquest)进入下面的流程)
public class ZKDatabase {
protected DataTree dataTree;
protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts; //维护所有session,K=sessionId,V=下一个超时时间点
protected FileTxnSnapLog snapLog; //负责事务日志、快照文件的读写
protected long minCommittedLog, maxCommittedLog;
public static final int commitLogCount = 500;
protected static int commitLogBuffer = 700;
protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>(); //事务提议(事务日志)
protected ReentrantReadWriteLock logLock = new ReentrantReadWriteLock();
volatile private boolean initialized = false;
...
public boolean append(Request si) throws IOException { //添加事务请求
return this.snapLog.append(si); //事务日志记录
}
}
public class FileTxnSnapLog {
private final File dataDir;
private final File snapDir;
private TxnLog txnLog;
private SnapShot snapLog;
public final static int VERSION = 2;
public final static String version = "version-";
public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {
this.dataDir = new File(dataDir, version + VERSION);
this.snapDir = new File(snapDir, version + VERSION);
....
txnLog = new FileTxnLog(this.dataDir);
snapLog = new FileSnap(this.snapDir);
}
public boolean append(Request si) throws IOException {
return txnLog.append(si.hdr, si.txn); //事务日志记录
}
...
}事务日志
对于客户端的每一个事务请求,Zookeeper都会将它们记录到事务日志中(同时也会变更到内存日志中),事务日志的记录由FileTxnLog负责,在未配置dataLogDir的情况下,事务日志默认写入dataDir配置的目录,Zookeeper在运行时会在该目录下创建一个/version-2子目录用来存放事务日志文件。
日志文件默认预分配大小为64M,预分配的设计是为了避免频繁的触发磁盘I/O为文件分配新的磁盘块,提升I/O性能。生产环境下一般通过zookeeper.preAllocSize来增大预分配大小(单位为KB),减小生成的文件个数。 文件名为log.xxx,其中xx表示该文件中的第一条事务日志的事务ID。下方是上面源码的接续部分
public class FileTxnLog implements TxnLog {
//...添加事务日志
public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException
{
//...未关联事务日志文件
if (logStream==null) {
...
//创建文件并关联到logStream
logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
...
}
//数据追加,内部根据大小确定是否扩容
filePadding.padFile(fos.getChannel());
...
}
}
public class FilePadding {
...
private static long preAllocSize = 65536 * 1024; //默认预分配6大小为4M
private static final ByteBuffer fill = ByteBuffer.allocateDirect(1);
static {
LOG = LoggerFactory.getLogger(FileTxnLog.class);
String size = System.getProperty("zookeeper.preAllocSize");
if (size != null) {
try {
preAllocSize = Long.parseLong(size) * 1024; //所以配置的单位为KB
} catch (NumberFormatException e) {
LOG.warn(size + " is not a valid value for preAllocSize");
}
}
}
long padFile(FileChannel fileChannel) throws IOException {
//计算新大小
long newFileSize = calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize);
if (currentSize != newFileSize) {
//write时,如果已达文件末尾,会进行扩容,这点参与JDK文档
fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
currentSize = newFileSize;
}
return currentSize;
}
}
数据快照
数据快照本质上是将内存中的节点数据及会话数据保存到磁盘。在某次事务日志记录之后,若事务记录的总次数超过了一定的次数(取决于zookeeper.snapCount配置<最小为2>),Zookeeper会将ZkDatabase中的数据全量dump到文件中,即数据快照。Zookeeper通过启动一个新线程来进行快照存储,以避免对事务请求造成阻塞,但如果前一次dump快照操作还在进行中,那么本次dump将跳过,直接该快照线程可用。
其中次数对比采用“过半随机”的策略,当logCount > (snapCount / 2 + randRoll)时行快照,而没有采用固定的snapCount/2,这样是为了避免集群节点同时快照,影响整体性能。
配置中的dataDir即代表快照目录,zookeeper同样会在该目录下生成一个version-2子目录来存储所在快照文件,文件格式为snapshot.xxx(xxx代表快照开始时服务器最新的事务ID),由于快照文件不像事务文件一样写入频繁,所以针对快照文件Zookeeper没有采用预分配的作用。此外由于过半随机原则及每次事务操作产生的数据量不一样,所以快照文件间大小并不一定相等。(下面是请求处理链的SyncRequestProcessor处理器的较详细代码)
public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
...
@Override
public void run() {
try {
int logCount = 0;
setRandRoll(r.nextInt(snapCount/2));
while (true) {
//从请求队列中取出Request对象
Request si = ...
if (si != null) {
// track the number of records written to the log
// 事务日志写入成功
if (zks.getZKDatabase().append(si)) {
logCount++;
//过半检测
if (logCount > (snapCount / 2 + randRoll)) {
setRandRoll(r.nextInt(snapCount/2));
zks.getZKDatabase().rollLog();
// 上次快照未结束,跳过本次
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
//新开线程
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();
}
logCount = 0;
}
} ...
}
}
} ...
}
}数据恢复
其实在上篇关于选举的博客中说过,集群在启动时首先做的是数据加载loadDataBase(),然后才是选举。数据加载流程大概如下
- 创建ZKDatabase,给定对应的快照目录与事务日志目录
- 从快照目录中加载最后的快照文件内容。(由于每次快照都会全量dump数据,所以最新的快照数据通常是最全的)。
- 通过比较事务ID(快照文件名、事务日志文件名中的后缀即最新事务ID),获取事务记录增量数据(有可能跨多个事务日志文件),迭代事务日志文件列表,到快照之后产生的事务日志,进行更新。(即更新增量数据)
- 将事务日志同步到committedLog中,以便集群间快速同步
public class ZKDatabase {
protected DataTree dataTree;
protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts; //维护所有session,K=sessionId,V=下一个超时时间点
protected FileTxnSnapLog snapLog;
protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();
...
public long loadDataBase() throws IOException {
long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
return zxid;
}
}
public class FileTxnSnapLog {
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
snapLog.deserialize(dt, sessions);
return fastForwardFromEdits(dt, sessions, listener);
}
public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,
PlayBackListener listener) throws IOException {
//事务日志目录
FileTxnLog txnLog = new FileTxnLog(dataDir);
//对比事务Id,找出事务日志增量数据(有可能跨事务日志,所以Itr内部维护了一个流来指向事务日志文件)
//其itr.next()方法会读取日志文件中的每一条记录,如果读完,则其内部的流将指向下一个日志文件,然后返回下一个文件中的第一条记录
TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
long highestZxid = dt.lastProcessedZxid;
TxnHeader hdr;
try {
while (true) {...
hdr = itr.getHeader();
...
try {
processTransaction(hdr,dt,sessions, itr.getTxn());
} ...
//将事务同步到committedLog中,以便集群间快速同步
listener.onTxnLoaded(hdr, itr.getTxn());
if (!itr.next()) //读取当前事务日志文件中的下一条记录,如果当前记录是最后一条,则读取下一事务日志文件中的第一条记录
break;
}
} ...
return highestZxid;
}
}