zookeeper api java_Zookeeper API for JAVA实战与应用

packagecom.zookeeper.watcher;importjava.util.List;importjava.util.concurrent.CountDownLatch;importjava.util.concurrent.atomic.AtomicInteger;importorg.apache.log4j.Logger;importorg.apache.zookeeper.CreateMode;importorg.apache.zookeeper.WatchedEvent;importorg.apache.zookeeper.Watcher;importorg.apache.zookeeper.Watcher.Event.EventType;importorg.apache.zookeeper.Watcher.Event.KeeperState;importorg.apache.zookeeper.ZooDefs.Ids;importorg.apache.zookeeper.ZooKeeper;importorg.apache.zookeeper.data.ACL;importorg.apache.zookeeper.data.Stat;public class ZKClient implementsWatcher{private static final Logger logger = Logger.getLogger(ZKClient.class);//定义session失效时间

private static final int SESSION_TIMEOUT = 10000;//zookeeper服务器地址

private static final String ZOOKEEPER_ADDRESS = "192.168.56.1:2181";//ZooKeeper变量

private ZooKeeper zk = null;//定义原子变量

AtomicInteger seq = newAtomicInteger();//信号量设置,用于等待zookeeper连接建立之后,通知阻塞程序继续向下执行

private CountDownLatch connectedSemaphore = new CountDownLatch(1);public static void main(String[] args) throwsInterruptedException {

String parentPath= "/test"; //父节点

String childrenPath = "/test/children"; //子节点

ZKClient test= newZKClient();//创建链接

test.createConnection();boolean isSuccess = test.createNode(parentPath, "abc", Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);if(isSuccess) {//读取数据

String result = test.getNodeData(parentPath, true);

logger.info("更新前数据:" +result);//更新数据

isSuccess =test.updateNode(parentPath, String.valueOf(System.currentTimeMillis()));if(isSuccess){

logger.info("更新后数据:" + test.getNodeData(parentPath, true));

}//创建子节点

isSuccess =test.createNode(childrenPath, String.valueOf(System.currentTimeMillis()), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);if(isSuccess){

test.updateNode(childrenPath, String.valueOf(System.currentTimeMillis()));

}//读取子节点

List childrenList = test.getChildren(parentPath, true);if(childrenList!=null && !childrenList.isEmpty()){for(String children : childrenList){

System.out.println("子节点:" +children);

}

}

}

Thread.sleep(1000);//创建临时有序子节点

test.createNode(childrenPath, String.valueOf(System.currentTimeMillis()), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);

test.createNode(childrenPath, String.valueOf(System.currentTimeMillis()), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);

test.createNode(childrenPath, String.valueOf(System.currentTimeMillis()), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);//读取子节点,并删除

List childrenList = test.getChildren(parentPath, true);if (childrenList != null && !childrenList.isEmpty()) {for(String children : childrenList) {

System.out.println("子节点:" +children);

test.deleteNode(parentPath+ "/" +children);

}

}//删除父节点

if (test.exists(childrenPath, false) != null) {

test.deleteNode(childrenPath);

}//释放链接

Thread.sleep(1000);

test.releaseConnection();

}/*** 创建节点

*

*@parampath 节点路径

*@paramdata 数据内容

*@paramacl 访问控制列表

*@paramcreateMode znode创建类型

*@return

*/

public boolean createNode(String path, String data, Listacl, CreateMode createMode) {try{//设置监控(由于zookeeper的监控都是一次性的,所以每次必须设置监控)

exists(path, true);

String resultPath= this.zk.create(path, data.getBytes(), acl, createMode);

logger.info(String.format("节点创建成功,path: %s,data: %s", resultPath, data));

}catch(Exception e) {

logger.error("节点创建失败", e);return false;

}return true;

}/*** 更新指定节点数据内容

*

*@parampath 节点路径

*@paramdata 数据内容

*@return

*/

public booleanupdateNode(String path, String data) {try{

Stat stat= this.zk.setData(path, data.getBytes(), -1);

logger.info("更新节点数据成功,path:" + path + ", stat: " +stat);

}catch(Exception e) {

logger.error("更新节点数据失败", e);return false;

}return true;

}/*** 删除指定节点

*

*@parampath

* 节点path*/

public voiddeleteNode(String path) {try{this.zk.delete(path, -1);

logger.info("删除节点成功,path:" +path);

}catch(Exception e) {

logger.error("删除节点失败", e);

}

}/*** 读取节点数据

*

*@parampath 节点路径

*@paramneedWatch 是否监控这个目录节点,这里的 watcher是在创建ZooKeeper实例时指定的watcher

*@return

*/

public String getNodeData(String path, booleanneedWatch) {try{

Stat stat=exists(path, needWatch);if(stat != null){return new String(this.zk.getData(path, needWatch, stat));

}

}catch(Exception e) {

logger.error("读取节点数据内容失败", e);

}return null;

}/*** 获取子节点

*

*@parampath 节点路径

*@paramneedWatch 是否监控这个目录节点,这里的 watcher是在创建ZooKeeper实例时指定的watcher

*@return

*/

public List getChildren(String path, booleanneedWatch) {try{return this.zk.getChildren(path, needWatch);

}catch(Exception e) {

logger.error("获取子节点失败", e);return null;

}

}/*** 判断znode节点是否存在

*

*@parampath 节点路径

*@paramneedWatch 是否监控这个目录节点,这里的 watcher是在创建ZooKeeper实例时指定的watcher

*@return

*/

public Stat exists(String path, booleanneedWatch) {try{return this.zk.exists(path, needWatch);

}catch(Exception e) {

logger.error("判断znode节点是否存在发生异常", e);

}return null;

}/*** 创建ZK连接

*

*@paramconnectAddr

*@paramsessionTimeout*/

public voidcreateConnection() {this.releaseConnection();try{

zk= new ZooKeeper(ZOOKEEPER_ADDRESS, SESSION_TIMEOUT, this);

logger.info("开始连接ZK服务器...");//zk连接未创建成功进行阻塞

connectedSemaphore.await();

}catch(Exception e) {

logger.error("ZK连接创建失败", e);

}

}/*** 关闭ZK连接*/

public voidreleaseConnection() {if (this.zk != null) {try{this.zk.close();

logger.info("ZK连接关闭成功");

}catch(InterruptedException e) {

logger.error("ZK连接关闭失败", e);

}

}

}

@Overridepublic voidprocess(WatchedEvent event) {

logger.info("进入process()方法...event = " +event);if (event == null) {return;

}

KeeperState keeperState= event.getState(); //连接状态

EventType eventType = event.getType(); //事件类型

String path = event.getPath(); //受影响的path

String logPrefix= "【Watcher-" + this.seq.incrementAndGet() + "】";

logger.info(String.format("%s收到Watcher通知...", logPrefix));

logger.info(String.format("%s连接状态:%s", logPrefix, keeperState));

logger.info(String.format("%s事件类型:%s", logPrefix, eventType));

logger.info(String.format("%s受影响的path:%s", logPrefix, path));if (KeeperState.SyncConnected ==keeperState) {if (EventType.None ==eventType) {//成功连接上ZK服务器

logger.info(logPrefix + "成功连接上ZK服务器...");

connectedSemaphore.countDown();

}else if (EventType.NodeCreated ==eventType) {//创建节点

logger.info(logPrefix + "节点创建");this.exists(path, true);

}else if (EventType.NodeDataChanged ==eventType) {//更新节点

logger.info(logPrefix + "节点数据更新");

logger.info(logPrefix+ "数据内容: " + this.getNodeData(path, true));

}else if (EventType.NodeChildrenChanged ==eventType) {//更新子节点

logger.info(logPrefix + "子节点变更");

logger.info(logPrefix+ "子节点列表:" + this.getChildren(path, true));

}else if (EventType.NodeDeleted ==eventType) {//删除节点

logger.info(logPrefix + "节点 " + path + " 被删除");

}

}else if (KeeperState.Disconnected ==keeperState) {

logger.info(logPrefix+ "与ZK服务器断开连接");

}else if (KeeperState.AuthFailed ==keeperState) {

logger.info(logPrefix+ "权限检查失败");

}else if (KeeperState.Expired ==keeperState) {

logger.info(logPrefix+ "会话失效");

}

}

}


版权声明:本文为weixin_36384501原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。