zookeeper 创建临时顺序节点_分布式锁 模拟——zookeeper

最近刚听过老师的分布式锁的解决方案,就说到了zookeeper,虽然现在生产环境已经不建议使用zookeeper来扮演分布式锁了,但是作为了解zookeeper,还是一个很好的借鉴。

分布式锁----目的是用于解决不同服务器之间更新同一条记录的时候,保证更新的状况,不允许同一时间有多个线程去更新数据。

那么zookeeper怎么来扮演分布式锁呢。

就是利用zookeeper创建临时带序号的节点的特性来实现分布式锁。

1:锁就是zookeeper指定目录下序号最小的临时序列节点,多个系统的多个线程都要在此目录下创建临时的顺序节点,因为zk的特性,可以保证创建的临时顺序节点具有顺序性,接下来就是通过节点的顺序来进行锁的判断。

2:模拟的情况是这样的,每个线程都创建一个临时顺序节点,然后获取当前目录下最新的节点序号,如果这个最小的节点序号就是自己刚才创建的,那么就获取到锁了,然后过一会儿就删除这个节点,如果不是,说明此时已经有别的线程抢到锁了,那么进入步骤3

3:获取当前节点的上一个临时顺序节点,对这个节点进行监听,如果监听到这个节点被删除了,不见了,说明锁此时已经释放掉了,那么就开始去抢锁了。

592ff51fcb04252c01706ec358cd4122.png

总体来说,就是一个循环判断。

每个任务如果要处理逻辑之前,先去创建节点,抢锁,如果抢到了,就去执行逻辑,执行完删除自己的节点。如果没有抢到,去监听比自己早的节点,监听到如果比自己早的节点不见了,就不需要等待,开始去执行任务,因为zookeeper的创建节点有顺序性,完全不需要考虑顺序的问题,可以保证过来的任务一个一个处理。

上代码

package com.zookeeper.dislock;import org.I0Itec.zkclient.IZkDataListener;import org.I0Itec.zkclient.ZkClient;import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;public class DisClient {    final String disLock = "/distrilock";    String beforeNodePath;    String currentNodePath;    CountDownLatch countDownLatch = null;    String threadNname;    //获取到zkClient    private ZkClient zkClient = new ZkClient("xxxxx:2185,xxxxxx:2185,xxxxxx:2185");    //把抢锁过程为量部分,一部分是创建节点,比较序号,另一部分是等待锁    //完整获取锁方法    public DisClient() {        threadNname = Thread.currentThread().getName();        synchronized (DisClient.class) {            if (!zkClient.exists(disLock)) {                zkClient.createPersistent(disLock);            }        }    }    public void deleteLock() {        if (zkClient != null && currentNodePath != null && currentNodePath.equals("") == false) {            zkClient.delete(currentNodePath);            System.out.println(threadNname + "释放锁");        } else {            System.out.println(threadNname + "释放锁,失败" + currentNodePath);        }    }//这个方法就是抢锁的入口,需要不停迭代使用    public void getDisLock() {        //第一步:肯定是去创建临时节点,如果创建后的节点的第一个节点就是自己,      //说明锁就抢到了        if (tryGetDisLock()) {            //说明获取到锁            System.out.println(threadNname + ":获取到了锁,成功");        } else {            System.out.println(threadNname + ": 获取到了锁,失败,等待");            waitForLock();            //递归获取锁            getDisLock();        }    }    //尝试获取锁    public Boolean tryGetDisLock() {      //判断节点释放创建过,如果第一次调用,则去创建        if (currentNodePath == null || currentNodePath.equals("")) {            //创建临时节点成功,返回 节点的路径            currentNodePath = zkClient.createEphemeralSequential(disLock + "/", "lock");        }        //获取到 /distrilock下面所有的子节点        final List childs = zkClient.getChildren(disLock);        //对节点信息进行排序        Collections.sort(childs);//默认是升序        String minNode = childs.get(0);//如果此时自己是最早的节点,就抢到锁了,可以执行自己的业务逻辑了        if (currentNodePath.equals(disLock + "/" + minNode)) {            return true;        } else {            //说明最小节点不是自己创建的,要监控自己当前节点序号前一个节点            final int i = Collections.binarySearch(childs, currentNodePath.substring((disLock + "/").length()));            beforeNodePath = disLock + "/" + childs.get(i - 1);        }        return false;    }    /**     * 等待之前节点释放锁,如何判断锁被释放,     * 需要唤醒线程继续尝试 tryGetDisLock     */    public void waitForLock() {      //注册一个监听器        IZkDataListener izk = new IZkDataListener() {            public void handleDataChange(String s, Object o) throws Exception {            }            public void handleDataDeleted(String s) throws Exception {                System.out.println(threadNname + " 监控到了," + beforeNodePath + "节点发生变化了");                countDownLatch.countDown();//把值减1变为0,唤醒之前await线程            }        };        //监控前一个节点        zkClient.subscribeDataChanges(beforeNodePath, izk);        //在监听的通知没来之前,该线程应该处于等待        if (zkClient.exists(beforeNodePath)) {            countDownLatch = new CountDownLatch(1);            try {                countDownLatch.await();//阻塞            } catch (InterruptedException e) {                e.printStackTrace();            }        }        zkClient.unsubscribeDataChanges(beforeNodePath, izk);    }}
public class DisLockTest {    public static void main(String[] args) {        //使用3个线程模拟分布式环境        for (int i = 0; i < 3; i++) {            new Thread(new DisLockRunnable()).start();        }    }    static class DisLockRunnable implements Runnable {        public void run() {            //每个线程就是去抢锁            final DisClient client = new DisClient();            client.getDisLock();            try {                Thread.sleep(2000);            } catch (InterruptedException e) {                e.printStackTrace();            }            client.deleteLock();        }    }}----------执行结果如下Thread-1:获取到了锁,成功Thread-0: 获取到了锁,失败,等待Thread-2: 获取到了锁,失败,等待Thread-1释放锁Thread-0 监控到了,/distrilock/0000000081节点发生变化了Thread-0:获取到了锁,成功Thread-0释放锁Thread-2 监控到了,/distrilock/0000000082节点发生变化了Thread-2:获取到了锁,成功Thread-2释放锁Process finished with exit code 0多线程情况下:Thread-1创建的临时顺序节点最小,那么这个时候,它抢到了锁,此时 Thread-0 和 Thread-2没有获取到锁,失败,等待,其中 Thread-0 监听 Thread-1 创建的目录,其中 Thread-2 监听 Thread-0 创建的目录,那么等其中 Thread-1 运行完,将自己删除的时候,Thread-0就监听到了,开始执行自己的逻辑,然后删除自己,此时Thread-2就监听到Thread-0创建的目录被删除了,Thread-2也开始执行。这样就做到了多线程情况下 模拟 分布式锁的效果了。感谢拉勾老师,讲的那么透彻。