zookeeper使用篇(三):分布式锁

1、什么是分布式锁

单机版中,当多个线程同时对共享数据做写操作的时候,会产生数据冲突即线程安全问题。对此,Java提供了Synchronized、Lock 等锁机制,保证同一时刻只有一个线程处理共享数据。
分布式场景下呢?应运而生了分布式锁

2、实现思路

实现分布式锁目前有三种流行方案,即基于数据库、Redis、ZooKeeper 的方案。
本文主要基于ZooKeeper实现,思路如下:

  1. 创建临时有序节点,判断是否序号是否最小,是则获取到锁资源,无需阻塞;
  2. 不是序号最小节点,则监听前一节点是否被删除,是则无需阻塞,否则进入等待;
  3. 执行完业务代码,释放锁资源。

3、实现步骤

/**
 * @author: Lanrriet
 * @description: 分布式锁
 */
@Slf4j
public class ZkLock {

    @Autowired
    private ZkDiscovery zkDiscovery;

    //zk连接
    private ZooKeeper zk = null;

    //根节点名称
    private final String ROOT = "/zk_lock";

    //默认锁名称
    private String lockName = "/tmp_lock";

    @PostConstruct
    private void init() {
        try {
            if (zk == null) {
                zk = zkDiscovery.getZooKeeper();
            }
            //创建根节点
            Stat stat = zk.exists(ROOT, false);
            if (stat == null) {
                zk.create(ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException | InterruptedException e) {
        }
    }

    /**
     * 获取锁
     *
     * @param name 节点名称
     * @return 节点完整路径
     */
    public String getLock(String name) {
        //节点名称
        String nodePath = null;
        try {
            //节点路径
            String path = ROOT + (!StringUtils.isEmpty(name) ? name : lockName);
            //创建临时有序节点
            nodePath = zk.create(path, new byte[0],
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            //获取根节点下所有临时节点,升序排列
            List<String> nodes = zk.getChildren(ROOT, false);
            Collections.sort(nodes);
            if (nodes.size() == 0) {
                throw new InterruptedException("创建节点失败");
            }
            if (!nodePath.contains(nodes.get(0))) {
                String root = ROOT + "/";
                String preNode = root + nodes.get(Collections.binarySearch(nodes, nodePath.replace(root, "")) - 1);
                //监听前一节点,
                CountDownLatch countDownLatch = new CountDownLatch(1);
                Stat stat = zk.exists(preNode, getWatcher(preNode, countDownLatch));
                if (stat != null) {
                    //阻塞
                    countDownLatch.await();
                }
            }
        } catch (KeeperException | InterruptedException e) {
        }
        return nodePath;
    }

    /**
     * 解锁(删除节点)
     *
     * @param node 节点路径
     */
    public void removelock(String node) {
        try {
            zk.delete(node, -1);
            log.info("删除节点,释放锁资源");
        } catch (KeeperException | InterruptedException e) {
        }
    }

    /**
     * 监听前一节点事件
     *
     * @param preNode
     * @return
     */
    private Watcher getWatcher(String preNode, CountDownLatch countDownLatch) {
        Watcher watcher = new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(preNode)) {
                    if (countDownLatch != null) {
                        //计数器减1,countDownLatch = 0 唤醒阻塞的业务线程
                        countDownLatch.countDown();
                    }
                }
            }
        };
        return watcher;
    }

}

/**
 * @author: Lanrriet
 * @description: 自定义注解
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistrubateLock {

    /**
     * 节点路径
     */
    String path() default "";

}

/**
 * @author: Lanrriet
 * @description: 切面类
 */
@Aspect
@Slf4j
public class DistrubateLockAspect {

    @Autowired
    private ZkLock zkLock;

    @Pointcut("@annotation(com.api.DistrubateLock)")
    public void pointcut() {
    }

    @Around("pointcut()")
    public Object around(ProceedingJoinPoint point) throws Throwable {
        // 1.获取当前运行的方法
        MethodSignature signature = (MethodSignature) point.getSignature();
        Method method = signature.getMethod();
        String path = "/" + packagePath(method);
        // 2.获取DistrubateLock注解参数
        DistrubateLock dl = method.getDeclaredAnnotation(DistrubateLock.class);
        String extPath = dl.path();
        if (!("".equals(extPath) || null == extPath)) {
            path = "/" + extPath;
        }
        log.info(path);
        // 3.上锁
        String node = zkLock.getLock(path);
        // 4.处理业务,并透传业务异常
        Object result = null;
        try {
            result = point.proceed();
        } catch (Exception ex) {
            throw ex;
        } finally {
            // 5.释放锁
            zkLock.removelock(node);
        }
        return result;
    }

    private String packagePath(Method method) {
        // 1.获取类名
        String className = method.getDeclaringClass().getName();
        // 2.获取方法名
        String methodName = method.getName();
        // 3.获取参数名
        Parameter[] parameters = method.getParameters();
        // 4.获取返回结果
        Class<?> returnType = method.getReturnType();
        String path = className + "--" + methodName + "--" + returnType.getName();
        for (Parameter parameter : parameters) {
            path = path + "--" + parameter.getName();
        }
        return path;
    }

}

4、具体使用

注解方式:给接口添加锁资源,只需加上@DistrubateLock注解即可,可不配置path属性,该属性为锁资源名称,需唯一标识,默认采用的是:文件路径+文件名+方法名+返回类型+参数名称
@DistrubateLock(path = "lockTest")
@RequestMapping("lockTest")
public String lockTest(String name) {
    log.info(Thread.currentThread().getName() + " 执行业务代码,age=" + age++);
    return "success";
}

代码方式:存在并发问题处前后分别进行手动加锁、释放锁
public String lockTest(String name) {
    String lock = zkLock.getLock("lockTest");  --加锁,传入锁资源名称,需唯一标识
    log.info(Thread.currentThread().getName() + " 执行业务代码,age=" + age++);
    zkLock.removelock(lock);  --释放锁
    return "success";
}

版权声明:本文为qq_35928318原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。