基于ZooKeeper或Curator实现分布式锁

zookeeper的特性

  • zk包含一系列的节点,叫做znode,就好像文件系统一样每个znode表示一个目录。

  • 有序节点:假如当前有一个父节点为/lock,我们可以在这个父节点下面创建子节点:

    • zookeeper提供了一个可选的有序特性,例如我们可以创建子节点“/lock/node-”并且指明有序,那么zookeeper在生成子节点时会根据当前的子节点数量自动添加整数序号。

    • 也就是说,如果是第一个创建的子节点,那么生成的子节点为/lock/node-0000000000,下一个节点则为/lock/node-0000000001,依次类推。

  • 临时节点:客户端可以建立一个临时节点,在会话结束或者会话超时后,zookeeper会自动删除该节点。

  • 事件监听:在读取数据时,我们可以同时对节点设置事件监听,当节点数据或结构变化时,zookeeper会通知客户端。当前zookeeper有如下四种事件:节点创建、节点删除、节点数据修改、子节点变更。

两种实现方式

zk分布式锁的实现有两种方式

  1. 原生方式,使用java和zk api书写以上逻辑实现分布式锁,操作zookeeper使用的是apache提供的zookeeper的包。通过实现Watch接口,实现process(WatchedEvent event)方法来实施监控,使CountDownLatch来完成监控,在等待锁的时候使用CountDownLatch来计数,等到后进行countDown,停止等待,继续运行。

  2. 使用Curator框架来实现分布式锁,Curator是Netflix公司一个开源的zookeeper客户端,在原生API接口上进行了包装,解决了很多ZooKeeper客户端非常底层的细节开发。同时内部实现了诸如Session超时重连,Watcher反复注册等功能,实现了Fluent风格的API接口,是使用最广泛的zookeeper客户端之一。

两种方式对比来说,原生方式自己实现逻辑比较灵活,个性化高但是开发量比较大,使用Curator实现分布式锁非常简单,几行代码就可以搞定,隐藏了很多实现细节。

zk实现分布式锁的落地方案

  1. 使用zk的临时节点和有序节点,每个线程获取锁就是在zk创建一个临时有序的节点,比如在/lock/目录下。

  2. 创建节点成功后,获取/lock目录下的所有临时节点,再判断当前线程创建的节点是否是所有的节点的序号最小的节点

  3. 如果当前线程创建的节点是所有节点序号最小的节点,则认为获取锁成功。

  4. 如果当前线程创建的节点不是所有节点序号最小的节点,则对节点序号的前一个节点添加一个事件监听。
    比如当前线程获取到的节点序号为/lock/003,然后所有的节点列表为[/lock/001,/lock/002,/lock/003],则对/lock/002这个节点添加一个事件监听器。

如果锁释放了,会唤醒下一个序号的节点,然后重新执行第3步,判断是否自己的节点序号是最小。

比如/lock/001释放了,/lock/002监听到时间,此时节点集合为[/lock/002,/lock/003],则/lock/002为最小序号节点,获取到锁。

整个过程如下:
在这里插入图片描述

Curator实现ZooKeeper分布式锁

/**
 * Created by ErNiu on 2018/1/29.
 */
public class testlock {
    private static final String ZK_ADDRESS = "10.2.1.1:2181";
    private static final String ZK_LOCK_PATH = "/zktest/lock0";

    /**
     * 下面的程序会启动几个线程去争夺锁,拿到锁的线程会占用5秒
     */
    public static void main(String[] args) throws InterruptedException {
        // 1.Connect to zk
        final CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new RetryNTimes(10, 5000));
        client.start();

        System.out.println(client.getState());

        System.out.println("zk client start successfully!");

        final InterProcessMutex mutex = new InterProcessMutex(client, ZK_LOCK_PATH);

        for (int i = 0; i < 3; i++) {
            Runnable myRunnable = new Runnable() {
                public void run() {
                    doWithLock(client, mutex);
                }
            };
            Thread thread = new Thread(myRunnable, "Thread-" + i);
            thread.start();
        }

    }

    private static void doWithLock(CuratorFramework client, InterProcessMutex mutex) {
        try {
            String name = Thread.currentThread().getName();
            if (mutex.acquire(1, TimeUnit.SECONDS)) {

                System.out.println(name + " hold lock");

                System.out.println(client.getChildren().forPath(ZK_LOCK_PATH));

                Thread.sleep(5000L);
                System.out.println(name + " release lock");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                mutex.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

可以看到通过Curator实现分布式锁,只需要两行代码 mutex.acquire() 和 mutex.release(),上面的代码同时测试了锁超时时间的作用,启动三个线程去获取锁,线程2获取到锁sleep5秒,超时时间设置为1s,线程0和线程1阻塞等待1s后,便会抛出异常。这里仍然存在一个问题,假如线程2获取到锁,并且线程2因为自身原因,一直不释放锁。这就会导致其他线程无法正常运行,所以需要对获取到锁的线程设置一个超时时间,超过规定时间仍未执行完,则强制释放锁,并抛出异常,来保证程序不会阻塞。


版权声明:本文为qq_16570607原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。