监听器触发流程
客户端发送请求
这里是通过getData请求注册一个默认监听器
这里将监听器对象个监听的节点封装起来,并且存放在客户端中,同时也会把监听器的信息封装到request对象中,但是注意这里只是把是否存在监听器发送过去。
public byte[] getData(final String path, Watcher watcher, Stat stat)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
// 注册监听器
// 这里会将监听器对象和节点路径封装起来
if (watcher != null) {
wcb = new DataWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
// 这里会将监听器信息放入request中
// 这里只是将是否注册监听器传进去,并没有把监听器对象传进去
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
// 这个方法就是将请求向服务端发送过去
// 并阻塞等待响应结果
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (stat != null) {
DataTree.copyStat(response.getStat(), stat);
}
// 返回响应结果
return response.getData();
}
服务端接收请求
这里直接从处理器链中分析,至于服务端通过NIO发送请求就不再分析了,可以参考下面的博文
- PrepRequestProcessor
这个处理器并没有处理关于事件逻辑,下面是调用链
1.org.apache.zookeeper.server.RequestProcessor#processRequest
2.java.util.AbstractQueue#add
3.org.apache.zookeeper.server.PrepRequestProcessor#run
4.org.apache.zookeeper.server.PrepRequestProcessor#pRequest
pRequest方法就是这个处理器的真正的处理方法,这里getData请求只是作了ACL安全验证
然后就是进行下一个处理器
- SyncRequestProcessor
这个处理器主要是对数据持久化和日志的处理,也没有对事件进行处理,下面是调用链
1.org.apache.zookeeper.server.RequestProcessor#processRequest
2.java.util.AbstractQueue#add
3.org.apache.zookeeper.server.SyncRequestProcessor#run
接着是调用下一个处理器
- FinalRequestProcessor
这个处理器对事件做了真正的处理
1.org.apache.zookeeper.server.RequestProcessor#processRequest
2.org.apache.zookeeper.server.FinalRequestProcessor#processRequest
在处理请求的响应信息时会将其中的监听信息拿出来并存储在服务端的一个WatchManager对象中,调用链如下
3.org.apache.zookeeper.server.ZKDatabase#getData
4.org.apache.zookeeper.server.DataTree#getData


到最后就是将响应信息发送回客户端
客户端接收响应
接收了服务端响应这里并没有什么特殊的地方
客户端触发监听器
- 服务端触发请求
假设发送了set请求,直接分析服务端的FinalRequestProcessor处理器,这里会触发事件,触发事件的调用链如下
1.org.apache.zookeeper.server.FinalRequestProcessor#processRequest
2.org.apache.zookeeper.server.ZooKeeperServer#processTxn
3.org.apache.zookeeper.server.ZKDatabase#processTxn
4.org.apache.zookeeper.server.DataTree#processTxn
5.org.apache.zookeeper.server.DataTree#setData
6.org.apache.zookeeper.server.WatchManager#triggerWatch(java.lang.String, org.apache.zookeeper.Watcher.Event.EventType)
7.org.apache.zookeeper.server.WatchManager#triggerWatch(java.lang.String, org.apache.zookeeper.Watcher.Event.EventType, java.util.Set<org.apache.zookeeper.Watcher>)
这里会见触发的监听器对象拿出来,并且移除,这也解释了为什么原生的客户端注册的监听器时一次性的。
这里在process方法中将触发的事件发送到客户端中去了
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
// 这里会通过remove方法拿到监听器
// 同时会将监听器从map中移除
// 这也解释了为什么原生的客户端注册的监听器时一次性的
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
// 移除路径
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
// 这里调用了NIOServerCnxn的process方法
// 这里将触发的事件发送到客户端去
w.process(e);
}
// 返回触发的监听器
return watchers;
}

- 客户端响应触发的事件
这里主要分析客户端的EventThread线程的run方法,至于接收响应就不再分析了,调用链如下
1.org.apache.zookeeper.ClientCnxn.EventThread#run
2.org.apache.zookeeper.ClientCnxn.EventThread#processEvent
/**
* 下面是EventThread的run方法
* 也就是线程启动的执行方法
* 它是在创建原生客户端时启动的
*/
@Override
public void run() {
try {
isRunning = true;
while (true) {
// 从队列中拿到触发的事件
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
// 处理事件
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down for session: 0x{}",
Long.toHexString(getSessionId()));
}
在processEvent方法中直接调用了监听器的process方法触发事件
CloseSession操作
这里是通过客户端的命令行的quit命令模拟的CloseSession操作
客户端发送命令
客户端命令行发送quit命令
1.org.apache.zookeeper.ZooKeeperMain#processZKCmd
这里先是在processZKCmd方法中判断命令类型并调用相关的函数进行处理
2.org.apache.zookeeper.ZooKeeper#close
3.org.apache.zookeeper.ClientCnxn#close
最后在close方法中将closeSession请求发送给服务端
服务端处理CloseSession
同样我们直接分析处理器链到底做了什么
- PrepRequestProcessor
这里直接找到对于closeSession的处理,将临时节点封装成一个记录对象放到一个改变记录集合中去,会在后面的处理器中处理
- SyncRequestProcessor
这里并没有对CloseSession有什么处理
- FinalRequestProcessor
1.org.apache.zookeeper.server.FinalRequestProcessor#processRequest
2.org.apache.zookeeper.server.ZooKeeperServer#processTxn
3.org.apache.zookeeper.server.ZKDatabase#processTxn
4.org.apache.zookeeper.server.DataTree#processTxn
5.org.apache.zookeeper.server.DataTree#killSession
6.org.apache.zookeeper.server.DataTree#deleteNode
下面再DataTree中会删除临时节点

最后也会移除session
ACL操作
具体介绍请看ACL详细介绍
权限介绍
- CREATE, 简写为c,可以创建子节点
- DELETE,简写为d,可以删除子节点(仅下一级节点),注意不是本节点
- READ,简写为r,可以读取节点数据及显示子节点列表
- WRITE,简写为w,可设置节点数据
- ADMIN,简写为a,可以设置节点访问控制列表
添加用户或者设置用户
假设在客户端发送了如下命令
addauth digest zhangsan:12345
- 客户端
在客户端是用的addAuthInfo方法处理的命令
- 服务端
服务端得从主线程开始分析,就是NIOServerCnxnFactory得run方法
1.org.apache.zookeeper.server.NIOServerCnxnFactory#run
2.org.apache.zookeeper.server.NIOServerCnxn#doIO
3.org.apache.zookeeper.server.NIOServerCnxn#readPayload
4.org.apache.zookeeper.server.NIOServerCnxn#readRequest
5.org.apache.zookeeper.server.ZooKeeperServer#processPacket
在processPacket方法中对这个权限操作做了处理
6.org.apache.zookeeper.server.auth.DigestAuthenticationProvider#handleAuthentication
这个方法将用户真正的保存在了服务端中了
public KeeperException.Code
handleAuthentication(ServerCnxn cnxn, byte[] authData)
{
String id = new String(authData);
try {
// 将获得的id生成一个签名
// id包含名字和密码或者ip
String digest = generateDigest(id);
if (digest.equals(superDigest)) {
// 超级管理员
cnxn.addAuthInfo(new Id("super", ""));
}
// 将用户对象添加到一个集合authInfo中去(前提是不存在这个用户)
// 这样服务端也就保存了用户的信息
cnxn.addAuthInfo(new Id(getScheme(), digest));
return KeeperException.Code.OK;
} catch (NoSuchAlgorithmException e) {
LOG.error("Missing algorithm",e);
}
return KeeperException.Code.AUTHFAILED;
}
设置权限
假设客户端发送如下的命令
setAcl /parent auth:zhangsan:123456:rdwca
服务端验证权限
在setAcl的时候其实也会验证权限,所以我们还是用上面那一条命令
只看分析服务端
- PrepRequestProcessor

checkACL方法的具体验证过程如下
/**
*
* @param zks
* @param acl:当前节点存在的用户及其权限
* @param perm:当前操作需要的权限
* @param ids:客户端当前的用户
* @throws KeeperException.NoAuthException
*/
static void checkACL(ZooKeeperServer zks, List<ACL> acl, int perm,
List<Id> ids) throws KeeperException.NoAuthException {
// 是否跳过验证
if (skipACL) {
return;
}
// 如果当前节点没有任何权限用户就会直接跳过验证
if (acl == null || acl.size() == 0) {
return;
}
// 有没有超级管理员
for (Id authId : ids) {
if (authId.getScheme().equals("super")) {
return;
}
}
// 遍历当前结点所有权限用户
for (ACL a : acl) {
Id id = a.getId();
/**
* 首先会查看当前操作的权限在所有权限用户中是否有拥有权限的
* 这里是通过与操作判断的
* 权限的种类共有下面几种
*
* int READ = 1 << 0;
* int WRITE = 1 << 1;
* int CREATE = 1 << 2;
* int DELETE = 1 << 3;
* int ADMIN = 1 << 4;
* int ALL = READ | WRITE | CREATE | DELETE | ADMIN;
*
* 这样的话通过与操作需要的权限进行与操作就可以得出是否拥有操作的权限了
*/
if ((a.getPerms() & perm) != 0) {
if (id.getScheme().equals("world")
&& id.getId().equals("anyone")) {
return;
}
AuthenticationProvider ap = ProviderRegistry.getProvider(id
.getScheme());
if (ap != null) {
// 遍历客户端的用户中是否存在拥有权限的用户
for (Id authId : ids) {
if (authId.getScheme().equals(id.getScheme())
&& ap.matches(authId.getId(), id.getId())) {
return;
}
}
}
}
}
// 如果没有权限就会抛出异常
throw new KeeperException.NoAuthException();
}
下面是描述权限的结构
跳过验证的设置
这个验证权限的方法基本作了如下的判断:
- 是否跳过验证
- 当前节点是否没有任何权限用户
- 客户端用户中是否包含超级管理员
- 当前节点的权限用户中是否有可以执行当前操作的用户
- 可以执行当前操作的用户是否在客户端的用户中
如果最后没有权限用户就会抛出异常
ACL的问题
需要注意的是更改权限时会有问题,客户端在更改一个节点中的一个用户的权限时,会更改其他的用户的权限。
这样会发现此节点的所有用户的权限都会变成刚刚设置的权限。