1、什么是分布式锁
单机版中,当多个线程同时对共享数据做写操作的时候,会产生数据冲突即线程安全问题。对此,Java提供了Synchronized、Lock 等锁机制,保证同一时刻只有一个线程处理共享数据。
分布式场景下呢?应运而生了分布式锁
2、实现思路
实现分布式锁目前有三种流行方案,即基于数据库、Redis、ZooKeeper 的方案。
本文主要基于ZooKeeper实现,思路如下:
- 创建临时有序节点,判断是否序号是否最小,是则获取到锁资源,无需阻塞;
- 不是序号最小节点,则监听前一节点是否被删除,是则无需阻塞,否则进入等待;
- 执行完业务代码,释放锁资源。
3、实现步骤
/**
* @author: Lanrriet
* @description: 分布式锁
*/
@Slf4j
public class ZkLock {
@Autowired
private ZkDiscovery zkDiscovery;
//zk连接
private ZooKeeper zk = null;
//根节点名称
private final String ROOT = "/zk_lock";
//默认锁名称
private String lockName = "/tmp_lock";
@PostConstruct
private void init() {
try {
if (zk == null) {
zk = zkDiscovery.getZooKeeper();
}
//创建根节点
Stat stat = zk.exists(ROOT, false);
if (stat == null) {
zk.create(ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException | InterruptedException e) {
}
}
/**
* 获取锁
*
* @param name 节点名称
* @return 节点完整路径
*/
public String getLock(String name) {
//节点名称
String nodePath = null;
try {
//节点路径
String path = ROOT + (!StringUtils.isEmpty(name) ? name : lockName);
//创建临时有序节点
nodePath = zk.create(path, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//获取根节点下所有临时节点,升序排列
List<String> nodes = zk.getChildren(ROOT, false);
Collections.sort(nodes);
if (nodes.size() == 0) {
throw new InterruptedException("创建节点失败");
}
if (!nodePath.contains(nodes.get(0))) {
String root = ROOT + "/";
String preNode = root + nodes.get(Collections.binarySearch(nodes, nodePath.replace(root, "")) - 1);
//监听前一节点,
CountDownLatch countDownLatch = new CountDownLatch(1);
Stat stat = zk.exists(preNode, getWatcher(preNode, countDownLatch));
if (stat != null) {
//阻塞
countDownLatch.await();
}
}
} catch (KeeperException | InterruptedException e) {
}
return nodePath;
}
/**
* 解锁(删除节点)
*
* @param node 节点路径
*/
public void removelock(String node) {
try {
zk.delete(node, -1);
log.info("删除节点,释放锁资源");
} catch (KeeperException | InterruptedException e) {
}
}
/**
* 监听前一节点事件
*
* @param preNode
* @return
*/
private Watcher getWatcher(String preNode, CountDownLatch countDownLatch) {
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(preNode)) {
if (countDownLatch != null) {
//计数器减1,countDownLatch = 0 唤醒阻塞的业务线程
countDownLatch.countDown();
}
}
}
};
return watcher;
}
}
/**
* @author: Lanrriet
* @description: 自定义注解
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistrubateLock {
/**
* 节点路径
*/
String path() default "";
}
/**
* @author: Lanrriet
* @description: 切面类
*/
@Aspect
@Slf4j
public class DistrubateLockAspect {
@Autowired
private ZkLock zkLock;
@Pointcut("@annotation(com.api.DistrubateLock)")
public void pointcut() {
}
@Around("pointcut()")
public Object around(ProceedingJoinPoint point) throws Throwable {
// 1.获取当前运行的方法
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
String path = "/" + packagePath(method);
// 2.获取DistrubateLock注解参数
DistrubateLock dl = method.getDeclaredAnnotation(DistrubateLock.class);
String extPath = dl.path();
if (!("".equals(extPath) || null == extPath)) {
path = "/" + extPath;
}
log.info(path);
// 3.上锁
String node = zkLock.getLock(path);
// 4.处理业务,并透传业务异常
Object result = null;
try {
result = point.proceed();
} catch (Exception ex) {
throw ex;
} finally {
// 5.释放锁
zkLock.removelock(node);
}
return result;
}
private String packagePath(Method method) {
// 1.获取类名
String className = method.getDeclaringClass().getName();
// 2.获取方法名
String methodName = method.getName();
// 3.获取参数名
Parameter[] parameters = method.getParameters();
// 4.获取返回结果
Class<?> returnType = method.getReturnType();
String path = className + "--" + methodName + "--" + returnType.getName();
for (Parameter parameter : parameters) {
path = path + "--" + parameter.getName();
}
return path;
}
}
4、具体使用
注解方式:给接口添加锁资源,只需加上@DistrubateLock注解即可,可不配置path属性,该属性为锁资源名称,需唯一标识,默认采用的是:文件路径+文件名+方法名+返回类型+参数名称
@DistrubateLock(path = "lockTest")
@RequestMapping("lockTest")
public String lockTest(String name) {
log.info(Thread.currentThread().getName() + " 执行业务代码,age=" + age++);
return "success";
}
代码方式:存在并发问题处前后分别进行手动加锁、释放锁
public String lockTest(String name) {
String lock = zkLock.getLock("lockTest"); --加锁,传入锁资源名称,需唯一标识
log.info(Thread.currentThread().getName() + " 执行业务代码,age=" + age++);
zkLock.removelock(lock); --释放锁
return "success";
}
版权声明:本文为qq_35928318原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。