zookeeper的特性
zk包含一系列的节点,叫做znode,就好像文件系统一样每个znode表示一个目录。
有序节点:假如当前有一个父节点为/lock,我们可以在这个父节点下面创建子节点:
zookeeper提供了一个可选的有序特性,例如我们可以创建子节点“/lock/node-”并且指明有序,那么zookeeper在生成子节点时会根据当前的子节点数量自动添加整数序号。
也就是说,如果是第一个创建的子节点,那么生成的子节点为/lock/node-0000000000,下一个节点则为/lock/node-0000000001,依次类推。
临时节点:客户端可以建立一个临时节点,在会话结束或者会话超时后,zookeeper会自动删除该节点。
事件监听:在读取数据时,我们可以同时对节点设置事件监听,当节点数据或结构变化时,zookeeper会通知客户端。当前zookeeper有如下四种事件:节点创建、节点删除、节点数据修改、子节点变更。
两种实现方式
zk分布式锁的实现有两种方式
原生方式,使用java和zk api书写以上逻辑实现分布式锁,操作zookeeper使用的是apache提供的zookeeper的包。通过实现Watch接口,实现process(WatchedEvent event)方法来实施监控,使CountDownLatch来完成监控,在等待锁的时候使用CountDownLatch来计数,等到后进行countDown,停止等待,继续运行。
使用Curator框架来实现分布式锁,Curator是Netflix公司一个开源的zookeeper客户端,在原生API接口上进行了包装,解决了很多ZooKeeper客户端非常底层的细节开发。同时内部实现了诸如Session超时重连,Watcher反复注册等功能,实现了Fluent风格的API接口,是使用最广泛的zookeeper客户端之一。
两种方式对比来说,原生方式自己实现逻辑比较灵活,个性化高但是开发量比较大,使用Curator实现分布式锁非常简单,几行代码就可以搞定,隐藏了很多实现细节。
zk实现分布式锁的落地方案
使用zk的临时节点和有序节点,每个线程获取锁就是在zk创建一个临时有序的节点,比如在/lock/目录下。
创建节点成功后,获取/lock目录下的所有临时节点,再判断当前线程创建的节点是否是所有的节点的序号最小的节点
如果当前线程创建的节点是所有节点序号最小的节点,则认为获取锁成功。
如果当前线程创建的节点不是所有节点序号最小的节点,则对节点序号的前一个节点添加一个事件监听。
比如当前线程获取到的节点序号为/lock/003,然后所有的节点列表为[/lock/001,/lock/002,/lock/003],则对/lock/002这个节点添加一个事件监听器。
如果锁释放了,会唤醒下一个序号的节点,然后重新执行第3步,判断是否自己的节点序号是最小。
比如/lock/001释放了,/lock/002监听到时间,此时节点集合为[/lock/002,/lock/003],则/lock/002为最小序号节点,获取到锁。
整个过程如下:
Curator实现ZooKeeper分布式锁
/**
* Created by ErNiu on 2018/1/29.
*/
public class testlock {
private static final String ZK_ADDRESS = "10.2.1.1:2181";
private static final String ZK_LOCK_PATH = "/zktest/lock0";
/**
* 下面的程序会启动几个线程去争夺锁,拿到锁的线程会占用5秒
*/
public static void main(String[] args) throws InterruptedException {
// 1.Connect to zk
final CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new RetryNTimes(10, 5000));
client.start();
System.out.println(client.getState());
System.out.println("zk client start successfully!");
final InterProcessMutex mutex = new InterProcessMutex(client, ZK_LOCK_PATH);
for (int i = 0; i < 3; i++) {
Runnable myRunnable = new Runnable() {
public void run() {
doWithLock(client, mutex);
}
};
Thread thread = new Thread(myRunnable, "Thread-" + i);
thread.start();
}
}
private static void doWithLock(CuratorFramework client, InterProcessMutex mutex) {
try {
String name = Thread.currentThread().getName();
if (mutex.acquire(1, TimeUnit.SECONDS)) {
System.out.println(name + " hold lock");
System.out.println(client.getChildren().forPath(ZK_LOCK_PATH));
Thread.sleep(5000L);
System.out.println(name + " release lock");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
mutex.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
可以看到通过Curator实现分布式锁,只需要两行代码 mutex.acquire() 和 mutex.release(),上面的代码同时测试了锁超时时间的作用,启动三个线程去获取锁,线程2获取到锁sleep5秒,超时时间设置为1s,线程0和线程1阻塞等待1s后,便会抛出异常。这里仍然存在一个问题,假如线程2获取到锁,并且线程2因为自身原因,一直不释放锁。这就会导致其他线程无法正常运行,所以需要对获取到锁的线程设置一个超时时间,超过规定时间仍未执行完,则强制释放锁,并抛出异常,来保证程序不会阻塞。