ZkClient是由Datameer的工程师开发的开源客户端,对Zookeeper的原生API进行了包装,实现了超时重连、Watcher反复注册等功能。
ZKClient版本及源码
maven依赖
ZkClient目前有两个不同artifactId的系列。
其中最早的0.1版本maven依赖如下:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>另外一个系列为的maven依赖为:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>其中第二个系列包含了从0.1~0.10的版本。查看dubbo的源代码,我们可以看到,dubbo采用了第一个系列的0.1版本。
源代码
github源代码地址:https://github.com/sgroschupf/zkclient
ZkClient使用
以下我们以第二个系列的0.10版本为例来说明ZKClient的API和使用
创建会话
ZkClient提供了7中创建会话的方法:
public ZkClient(String serverstring)
public ZkClient(String zkServers, int connectionTimeout)
public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout)
public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer)
public ZkClient(final String zkServers, final int sessionTimeout, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout)
public ZkClient(IZkConnection connection)
public ZkClient(IZkConnection connection, int connectionTimeout)
public ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer)
public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout)上面方法的参数如果我们熟悉原生API的话,不难理解其参数,基本上参数名都是自描述的。值得留意的是ZkClient将ZK原生API中的异步处理进行了同步化。
其中一个参数IZkConnection是一个接口的定义。查看接口的方法不难发现它是对ZK原生接口最直接的包装。在此接口下面有两个实现方法ZkConnection和InMemoryConnection。在日常中直接使用ZkConnection方法就可以解决大部分的常见业务需求。
参数ZkSerializer同样是一个接口,定义了byte数组序列化和反序列化的两个方法。如果不传递此参数,则默认使用org.I0Itec.zkclient.serialize.SerializableSerializer实现类进行序列化。某些情况下此序列化接口会出现问题,比如乱码。此时,开发者可以直接实现ZkSerializer接口,重写自己的序列化方法。比如使用Hessian或Kryo等。
举例:
String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";
ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), 10000);
创建节点
ZkClient提供了15个创建节点的方法:
public void createPersistent(String path)
public void createPersistent(String path, boolean createParents)
public void createPersistent(String path, boolean createParents, List<ACL> acl)
public void createPersistent(String path, Object data)
public void createPersistent(String path, Object data, List<ACL> acl)
public String createPersistentSequential(String path, Object data)
public String createPersistentSequential(String path, Object data, List<ACL> acl)
public void createEphemeral(final String path)
public void createEphemeral(final String path, final List<ACL> acl)
public String create(final String path, Object data, final CreateMode mode)
public String create(final String path, Object data, final List<ACL> acl, final CreateMode mode)
public void create(final String path, Object data, final CreateMode mode, final AsyncCallback.StringCallback callback, final Object context)
public void createEphemeral(final String path, final Object data)
public void createEphemeral(final String path, final Object data, final List<ACL> acl)
public String createEphemeralSequential(final String path, final Object data)
public String createEphemeralSequential(final String path, final Object data, final List<ACL> acl)查看源代码可知,其实这些创建节点的方法都是对原生API的一层封装而已,底层实现基本相同。值得留意的一点是,原生API的参数通过byte[]来传递节点内容,而ZkClient支持自定义序列化,因此可以传输Object对象。
该API方法的参数说明如下表所示。
参数名 说明 path 指定数据节点的节点路径,即API调用的目的是创建该节点 data 节点的初始数据内容,可以传入null mode 节点类型,是一个枚举类型,通常有4种可选的节点类型 acl 节点的ACL策略 callback 注册一个异步回调函数 context 用于传递一个对象,可以在执行回调函数的时候使用 createParents 指定是否创建父节点
示例:
zkc.createEphemeral("/temp");
// 可以支持递归的创建,但是不能递归赋值
zkc.createPersistent("/super/c1", true);
删除节点
删除节点提供了以下方法:
public boolean delete(final String path)
public boolean delete(final String path, final int version)
public boolean deleteRecursive(String path)删除API其实很简单,重点说一下deleteRecursive接口,这个接口提供了递归删除的功能。在原生API中,如果一个节点存在子节点,那么它将无法直接删除,必须一层层遍历先删除全部子节点,然后才能将目标节点删除。
示例:
删除 /temp节点
zkc.delete("/temp");
//递归删除/super
zkc.deleteRecursive("/super");
读取节点
获取节点列表
public List<String> getChildren(String path)此接口返回子节点的相对路径列表。比如节点路径为/test/a1和/test/a2,那么当path为/test时,返回的结果为[a1,a2]。
获取节点内容
public <T extends Object> T readData(String path)
public <T extends Object> T readData(String path, boolean returnNullIfPathNotExists)
public <T extends Object> T readData(String path, Stat stat)通过方法返回参数的定义,就可以得知,返回的结果(节点的内容)已经被反序列化成对象了。
获取节点并返回内容示例:
List<String> list = zkc.getChildren("/super");
for(String p : list){
System.out.println(p);
String rp = "/super/" + p;
String data = zkc.readData(rp);
System.out.println("节点为:" + rp + ",内容为: " + data);
}
更新操作可以通过以下接口来实现:
public void writeData(String path, Object object)
public void writeData(final String path, Object datat, final int expectedVersion)
public Stat writeDataReturnStat(final String path, Object datat, final int expectedVersion)示例:zkc.writeData("/super/c1", "新内容");
监测节点是否存在
此API比较简单,调用以下方法即可:
protected boolean exists(final String path, final boolean watch)综合以上api 写一个小栗子:
代码在 bjsxt.zkclient.base
public class ZkClientBase {
/** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.1.31:2181,192.168.1.32:2181,192.168.1.33:2181";
/** session超时时间 */
static final int SESSION_OUTTIME = 10000;//ms
public static void main(String[] args) throws Exception {
ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);
//1. create and delete方法
zkc.createEphemeral("/temp");
zkc.createPersistent("/super/c1", true);
Thread.sleep(10000);
zkc.delete("/temp");
zkc.deleteRecursive("/super");
//2. 设置path和data 并且读取子节点和每个节点的内容
zkc.createPersistent("/super", "1234");
zkc.createPersistent("/super/c1", "c1内容");
zkc.createPersistent("/super/c2", "c2内容");
List<String> list = zkc.getChildren("/super");
for(String p : list){
System.out.println(p);
String rp = "/super/" + p;
String data = zkc.readData(rp);
System.out.println("节点为:" + rp + ",内容为: " + data);
}
//3. 更新和判断节点是否存在
zkc.writeData("/super/c1", "新内容");
System.out.println(zkc.readData("/super/c1").toString());
System.out.println(zkc.exists("/super/c1"));
// 4.递归删除/super内容
zkc.deleteRecursive("/super");
}
}输出
c1
节点为:/super/c1,内容为: c1内容
c2
节点为:/super/c2,内容为: c2内容
新内容
true注册监听
我们发现,上述ZKClient里面并没有类似的watcher、watch参数,这也就是说我们开发人员无需关心反复注册watcher的问题, 在ZkClient中可以通过注册相关的事件监听来实现对Zookeeper服务端时间的订阅。其中ZkClient提供的监听事件接口有以下几种:
| 接口类 | 注册监听方法 | 解除监听方法 |
|---|---|---|
| IZkChildListener | ZkClient的subscribeChildChanges方法 | ZkClient的unsubscribeChildChanges方法 |
| IZkDataListener | ZkClient的subscribeDataChanges方法 | ZkClient的subscribeDataChanges方法 |
| IZkStateListener | ZkClient的subscribeStateChanges方法 | ZkClient的unsubscribeStateChanges方法 |
其中ZkClient还提供了一个unsubscribeAll方法,来解除所有监听。
示例:
1 subscribeChildChanges方法 订阅子节点变化
参数1: path路径
参数2:实现了IZKChildListener接口的类(如:实例化IZKClientListener类) 只需要重写其handleChildChanges(String parentPath, List currentChild)方法。其中
- parentPath 为监听节点全路径
- currentChilds为新的子节点列表
IZKChildListener事件说明针对于下面三个事件触发:
- 新增子节点
- 减少子节点
- 删除节点
注意: 不监听节点内容的变化
举例
public class ZkClientWatcher1 {
/** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.1.31:2181,192.168.1.32:2181,192.168.1.33:2181";
/** session超时时间 */
static final int SESSION_OUTTIME = 10000;//ms
public static void main(String[] args) throws Exception {
ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);
//对父节点添加监听子节点变化。
zkc.subscribeChildChanges("/super", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println("parentPath: " + parentPath);
System.out.println("currentChilds: " + currentChilds);
}
});
Thread.sleep(3000);
zkc.createPersistent("/super");
Thread.sleep(1000);
zkc.createPersistent("/super" + "/" + "c1", "c1内容");
Thread.sleep(1000);
zkc.createPersistent("/super" + "/" + "c2", "c2内容");
Thread.sleep(1000);
zkc.delete("/super/c2");
Thread.sleep(1000);
zkc.deleteRecursive("/super");
Thread.sleep(Integer.MAX_VALUE);
}
}
输出
parentPath: /super
currentChilds: []
parentPath: /super
currentChilds: [c1]
parentPath: /super
currentChilds: [c1, c2]
parentPath: /super
currentChilds: [c1]
parentPath: /super
currentChilds: null
parentPath: /super
currentChilds: null2 subscribeDataChanges 订阅内容变化
和前面的subscribeChildChanges类似
- 参数1 路径
- 参数2 IZkDataListener对象,重写handleDataDeleted(String path) 方法,可以得到删除节点的path 重写handleDataChange(String path, Object data)可以得到变更的节点和变更的内容
举例
public class ZkClientWatcher2 {
/** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.1.31:2181,192.168.1.32:2181,192.168.1.33:2181";
/** session超时时间 */
static final int SESSION_OUTTIME = 10000;//ms
public static void main(String[] args) throws Exception {
ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);
zkc.createPersistent("/super", "1234");
//对父节点添加监听子节点变化。
zkc.subscribeDataChanges("/super", new IZkDataListener() {
@Override
public void handleDataDeleted(String path) throws Exception {
System.out.println("删除的节点为:" + path);
}
@Override
public void handleDataChange(String path, Object data) throws Exception {
System.out.println("变更的节点为:" + path + ", 变更内容为:" + data);
}
});
Thread.sleep(3000);
zkc.writeData("/super", "456", -1);
Thread.sleep(1000);
zkc.delete("/super");
Thread.sleep(Integer.MAX_VALUE);
}
}- 输出
变更的节点为:/super, 变更内容为:456
删除的节点为:/super
3 zookeeper服务状态监听器:
public class ZkStateWatcher {
static final String CONNECT_ADDR = "172.21.121.53:2181,172.21.121.54:2181,172.21.121.55:2181";
static final int CONNECTION_OUTTIME = 5000;
public static void main(String[] args) throws InterruptedException{
ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR),CONNECTION_OUTTIME);
zkc.subscribeStateChanges(new IZkStateListener() {
@Override
public void handleStateChanged(KeeperState state) throws Exception {
if(state==KeeperState.SyncConnected){
//当我重新启动后start,监听触发
System.out.println("连接成功");
}else if(state==KeeperState.Disconnected){
System.out.println("连接断开");//当我在服务端将zk服务stop时,监听触发
}else
System.out.println("其他状态"+state);
}
@Override
public void handleNewSession() throws Exception {
System.out.println("---->重建session");
}
});
// zkc.close();
Thread.sleep(Integer.MAX_VALUE);
}
}
当我只连接zk服务集群中单独的一个zk服务时:
./zkServer.sh stop 连接断开
./zkServer.sh start 连接成功
./zkServer.sh restart 连接断开 连接成功
./zkServer.sh stop 连接断开
等待几分钟
./zkServer.sh satrt 其他状态Expired ---->重建session 连接成功
其实无需等待几分钟,在zoo.cfg配置文件中有2个参数来管理session的超时时间:minSessionTimeout和maxSessionTimeout
2个参数的默认值都为-1,这个值在日志中可以看到:
默认情况下,maxSessionTimeout = ticketTime*20; minSessionTimeout = ticketTime*2;