一、服务器动态上下线监听案例
1. 简介
<1> 启动Zookeeper集群
<2> 在服务器启动时在Zookeeper创建临时节点
<3> 获取当前在线服务器列表,并注册监听
<4> 服务器下线
<5> 服务器节点上的下线事件通知
2. 实现
<1>并创建/servers节点
create /servers "servers"
<2> DistributeServer
package com.zookeeper.case2;
import com.zookeeper.utils.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
/**
* @author : hechaogao
* @createTime : 2022/9/15 15:04
*/
public class DistributeServer {
/**
* zookeeper客户端
*/
private static ZooKeeper zkClient;
public static void main(String[] args) {
//1. 获取连接
zkClient = ZkUtils.getConnect();
//2. 注册服务器(创建路径)
regist(args[0]);
//3. 启动具体业务(sleep)
business();
}
public static void regist(String hostname) {
/**
* param1 创建的节点的目录
* param2 创建的节点的值
* param3 ACL访问权限
* param4 创建的节点类型,这里选择的持久类型
*
*/
try {
zkClient = ZkUtils.getConnect();
zkClient.create("/servers/" + hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void business() {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
<3> DistributeClient
package com.zookeeper.case2;
import com.zookeeper.utils.ZkUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.util.List;
/**
* @author : hechaogao
* @createTime : 2022/9/15 15:25
*/
public class DistributeClient {
/**
* zookeeper客户端
*/
private static ZooKeeper zkClient;
public static void main(String[] args) {
//1. 获取连接
zkClient = ZkUtils.getConnect();
//2. 监听/servers 下面的子节点
getServerList();
//3. 业务逻辑 sleep
business();
}
public static void getServerList() {
try {
List<String> children = zkClient.getChildren("/servers", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
getServerList();
}
});
System.out.println("-------------------------------------");
for (String child : children) {
byte[] data = zkClient.getData("/servers/" + child, false, null);
System.out.println("servser name : " + new String(data));
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void business() {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
<4> ZkUtils
package com.zookeeper.utils;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
public class ZkUtils {
/**
* 逗号前后不能有空格
*/
private static final String CONNECTSTR = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
/**
* 单位毫秒
*/
private static final int SESSIONTIMEOUT = 2 * 1000;
public static ZooKeeper getConnect() {
try {
ZooKeeper zkClient = new ZooKeeper(CONNECTSTR, SESSIONTIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
return zkClient;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
二、分布式锁案例
1. 简介
<1> 启动Zookeeper集群,并创建/locks节点
<2> 接收到请求后,在/locks节点下创建一个临时顺序节点
<3> 判断自己是不是当前节点下最小的节点,如果是,则获取到锁,如果不是则对前一个节点进行监听
<4> 获取到锁,处理完业务后,delete节点释放锁,然后下面的节点将收到通知,重复判断第二步判断
2. 实现
<1> DistributeLock
package com.zookeeper.case3;
import com.zookeeper.utils.ZkUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @author : hechaogao
* @createTime : 2022/9/15 21:16
*/
public class DistributeLock {
/**
* zookeeper客户端
*/
private ZooKeeper zkClient;
private CountDownLatch countDownLatch = new CountDownLatch(1);
private String currentNode;
public DistributeLock() {
try {
//1. 获取连接
zkClient = ZkUtils.getConnect();
//2. 判断根节点/locks是否存在
Boolean exists = exists("/locks");
//3. 不存在则创建
if (!exists) {
String nodeCreasted = zkClient.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 对zk加锁
*/
public void zklock() throws InterruptedException, KeeperException {
//1. 创建临时带序号的节点
currentNode = zkClient.create("/locks/seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//2. 判断创建的节点是否是最小序号的节点 如果是 则获取到锁 如果不是 就监听比当前节点序号小的节点
List<String> children = zkClient.getChildren("/locks", true);
//2.1 进行排序
Collections.sort(children);
//2.2 获取节点名称
String currentNodeName = currentNode.substring("/locks/".length());
//2.3 获取当前节点 在children集合中的位置
int index = children.indexOf(currentNodeName);
if (index == -1) {
//2.4 不存在则返回异常
System.out.println("currentNode not exists on children list waring...");
} else if (index == 0) {
//2.5 就一个节点,可以直接获取锁
return;
} else {
//2.6 需要监听当前节点的前一个节点
String waitPath = "/locks/" + children.get(index - 1);
zkClient.getData(waitPath, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//2.7 如果触发监听器,并且是删除了上一个节点 ,则获取锁
if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
countDownLatch.countDown();
}
}
}, null);
//2.7 等在监听结束再往下走
countDownLatch.await();
return;
}
}
/**
* 对zk解锁
*/
public void unZklock() throws InterruptedException, KeeperException {
//1. 删除节点
zkClient.delete(currentNode, -1);
}
/**
* 判断 路径是否存在
*
* @param path 要判断的路径
* @return true 存在,false 不存在
*/
public Boolean exists(String path) throws InterruptedException, KeeperException {
/**
* param1 路径
* param2 是否监听
*/
Stat stat = null;
stat = zkClient.exists(path, false);
return stat != null ? true : false;
}
}
<2> DistributeLockTest
package com.zookeeper.case3;
/**
* @author : hechaogao
* @createTime : 2022/9/15 22:31
*/
public class DistributeLockTest {
public static void main(String[] args) {
final DistributeLock lock1 = new DistributeLock();
final DistributeLock lock2 = new DistributeLock();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.zklock();
System.out.println("lock1 get lock");
Thread.sleep(5 * 1000);
System.out.println("lock1 remove lock");
lock1.unZklock();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.zklock();
System.out.println("lock2 get lock");
Thread.sleep(5 * 1000);
System.out.println("lock2 remove lock");
lock2.unZklock();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
<3> ZkUtils
package com.zookeeper.utils;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
/**
* @author : hechaogao
* @createTime : 2022/9/15 15:06
*/
public class ZkUtils {
/**
* 逗号前后不能有空格
*/
private static final String CONNECTSTR = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
/**
* 单位毫秒
*/
private static final int SESSIONTIMEOUT = 200 * 1000;
public static ZooKeeper getConnect() {
try {
ZooKeeper zkClient = new ZooKeeper(CONNECTSTR, SESSIONTIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
return zkClient;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
版权声明:本文为qq_42000631原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。