分布式锁初探
为什么需要分布式锁?
需要保证在分布式场景下只有一个线程同时在执行一个代码片段或者一个资源同时只有一个线程在使用。
常见的场景: 分布式任务,秒杀。
需要什么样的分布式锁?
高可用
有失效时间(避免死锁)
可重入(结合具体业务)
阻塞(结合具体业务)
性能高
公平锁(需要?)
实现
基于数据库实现
- 基于唯一索引
我们知道,数据库可以通过唯一索引来保证数据的唯一性,我们可以利用数据库的这个特点来设计我们的锁。
通过向数据库中插入数据,插入成功即获得锁,如果已经存在记录,则插入失败,无法获得锁。
解锁操作删除对应的记录即可。
通过以上的思路,我们可以实现一个最简单的分布式锁。
表结构。
CREATE TABLE `simple_lock` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`lock_key` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_key` (`lock_key`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=930 DEFAULT CHARSET=utf8
通过lock_key来标识锁。
核心代码:
加锁操作:
public boolean tryLock() {
Connection connection = null;
PreparedStatement preparedStatement = null;
try {
connection = getConnection();
preparedStatement = connection.prepareStatement(LOCK_SQL);
preparedStatement.setString(1, key);
int i = preparedStatement.executeUpdate();
return i != 0;
} catch (Exception e) {
…
} finally {
…
}
return false;
}
解锁操作:
@Override
public void unlock() {
Connection connection = null;
PreparedStatement preparedStatement = null;
try {
connection = getConnection();
preparedStatement = connection.prepareStatement(UNLOCK_SQL);
preparedStatement.setString(1, key);
preparedStatement.executeUpdate();
} catch (Exception e) {
…
} finally {
…
}
}
·虽然简单实现了,但是存在很多问题
- 没有做到高可用,可以通过主备数据库来解决。
- 没有失效时间,可以通过添加定时任务定时扫描数据库来使数据失效。这样又引入新的问题,假如某个线程持有锁时间很长,可能存在这个线程还未执行完锁已经被清除掉,那么将会有两个线程同时持有锁。为了解决这个问题,可以通过建立一个守护线程定时的去“补时”——对于还在运行中的锁,不断去设置失效时间来防止锁失效。
- 不可重入,可以通过增加锁标识字段来解决。
- 阻塞锁,可以通过不断尝试加锁来解决。
- 公平锁?
基于缓存
- 基于redis实现的分布式锁
基于redis实现的分布式锁实现思路和上面类似,但不同的是redis本身可以设置失效时间,这样不用设置定时任务去清理过期的锁。
核心命令set key value ex nx 即key 不存在是才能设置成功。简单实现:
public abstract class RedisLock implements Lock{
/**
* 此方法redis lock 不支持
*/
@Override
public Condition newCondition(){
throw new IllegalArgumentException("不支持此方法");
}
/**
* 此方法redis lock 不支持
*/
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
throw new RuntimeException("lock error", e);
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
throw new IllegalArgumentException("不支持此方法");
}
///
private static final Map<String, DefaultRedisLock> LOCK_MAP = new ConcurrentHashMap<String, DefaultRedisLock>();
/**
* 构建锁实例
* @param lockKey
* @return
*/
public static RedisLock instanceLock(String lockKey){
return instanceLock(getDefaultRedisClient(), lockKey);
}
/**
* 构建锁实例
* @param cacheProxy
* @param lockKey
* @return
*/
public static RedisLock instanceLock(CacheProxy cacheProxy, String lockKey){
DefaultRedisLock lock = LOCK_MAP.get(lockKey);
if(null == lock){
synchronized (LOCK_MAP) {
if(null == lock){
lock = new DefaultRedisLock(cacheProxy, lockKey);
LOCK_MAP.put(lockKey, lock);
}
}
}
return lock;
}
/**
* 构建锁实例
* @param lockKey
* @param maxTime 最长有效时间(单位:ms)
* @return
*/
public static RedisLock instanceLock(String lockKey, Long maxTime){
return instanceLock(getDefaultRedisClient(), lockKey, maxTime);
}
/**
* 构建锁实例
* @param cacheProxy
* @param lockKey
* @param maxTime 最长有效时间(单位:ms)
* @return
*/
public static RedisLock instanceLock(CacheProxy cacheProxy, String lockKey, Long maxTime){
RedisLock lock = instanceLock(cacheProxy, lockKey);
((DefaultRedisLock)lock).setMaxTime(maxTime);
return lock;
}
//
//
/**
* redis缓存采用hash数据结构
*/
private static CacheProxy cacheProxy;
/**
* 如果没有设置redisClient,系统自动从spring中取名字为marketCenterClient的redisClient
* @return
*/
public static CacheProxy getDefaultRedisClient() {
return cacheProxy;
}
public static void setDefaultCacheProxy(CacheProxy cacheProxy){
RedisLock.cacheProxy = cacheProxy;
}
/
private static final String LOCK_KEY = "xlock.";//锁Key前缀
private static final int DEFAULT_LOCK_TIMEOUT = 60;//90s
private static final long DEFAULT_MAX_LOCK_TIMEOUT = 300000;//5分钟(ms)
private static class DefaultRedisLock extends RedisLock{
private String id = UUID.randomUUID().toString();
private final long createTime = DateUtils.currentTimeMillis();
// private String field;
private String key;
private CacheProxy cacheProxy;
private Long maxTime = DEFAULT_MAX_LOCK_TIMEOUT;
private boolean updateTag = false;
public DefaultRedisLock(CacheProxy cacheProxy, String lockKey) {
if(null == cacheProxy || null == lockKey){
throw new NullPointerException("All parameters can't be empty!");
}
this.cacheProxy = cacheProxy;
// this.field = lockKey;
this.key = LOCK_KEY+lockKey;
}
public void setMaxTime(Long maxTime) {
this.maxTime = maxTime;
}
@Override
public void lockInterruptibly() throws InterruptedException {
boolean tryLock = tryLock(maxTime, TimeUnit.MILLISECONDS);
if(!tryLock) throw new InterruptedException("获取锁失败,timeout max:"+maxTime);
}
@Override
public boolean tryLock() {
boolean result = false;
try {
result = cacheProxy.setnx(key, id);
if (result) {
cacheProxy.expire(key, DEFAULT_LOCK_TIMEOUT, TimeUnit.SECONDS);
updateTag = true;
} else if (cacheProxy.ttl(key) == -1) {
cacheProxy.expire(key, DEFAULT_LOCK_TIMEOUT, TimeUnit.SECONDS);
}
} catch (Exception e) {
// e.printStackTrace();
}
return result;
}
/**
* 更新锁时间
* @return 0:不需要更新,1:更新成功,2:更新失败、已过期,3:更新失败、redis过期
*/
protected int updateLockTime() {
if(updateTag){
if(maxTime < (DateUtils.currentTimeMillis() - createTime)){
// removeLockMap(field);
return 2;
}else if(!cacheProxy.expire(key, DEFAULT_LOCK_TIMEOUT, TimeUnit.SECONDS)){
// removeLockMap(field);
return 3;
}else{
return 1;
}
}
return 0;
}
@Override
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException {
boolean b = tryLock();
if(!b){
long ms = unit.toMillis(time);
if(ms>3000){
long count = ms/1000;
for(int i=0; !b && i<count; i++){
Thread.sleep(1000);
b = tryLock();
}
}else if(ms>60){
int i=0;
long unitMs = ms / 6;
while(!b && i++<6){
Thread.sleep(unitMs);
b = tryLock();
}
}else if(ms>0){
Thread.sleep(ms);
b = tryLock();
}
}
return b;
}
@Override
public void unlock() {
cacheProxy.del(key);
// removeLockMap(field);
}
// private static void removeLockMap(String key) {
// LOCK_MAP.remove(key);
// }
}
//timer
private static int TIMER_TIME = DEFAULT_LOCK_TIMEOUT*3/5 * 1000;//60s/ms
private static final int MAX_CACHE_LOCK_NUM = 1000;//缓存LOCK长度安全值
static{
Thread t = new Thread(){
@Override
public void run() {
long l, temp, idx;
int resultCode;
DefaultRedisLock lock;
while(true){
temp = 0;
l = DateUtils.currentTimeMillis();
idx = 0;
lock = null;
try{
Iterator<DefaultRedisLock> iterator = LOCK_MAP.values().iterator();
while(iterator.hasNext()){
lock = iterator.next();
resultCode = lock.updateLockTime();
if(idx++ > MAX_CACHE_LOCK_NUM){
autoRemove(resultCode, lock, iterator);
}
}
// for(DefaultRedisLock lock : LOCK_MAP.values()){
// lock.updateLockTime();
// }
temp = TIMER_TIME - (DateUtils.currentTimeMillis()-l);
}catch(Throwable e){
}
if(temp>0){
try {
Thread.sleep(temp);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
private void autoRemove(int resultCode, DefaultRedisLock lock, Iterator<DefaultRedisLock> iterator) {
switch (resultCode) {
case 0:
//不需要更新的,判断时效
if(lock.maxTime < (DateUtils.currentTimeMillis() - lock.createTime)){
iterator.remove();
}
break;
case 1:
//更新成功的,不做任何操作
break;
default:
iterator.remove();
break;
}
}
};
t.setDaemon(true);
t.start();
}
}
- Redisson实现
https://github.com/redisson/redisson/wiki/8.-分布式锁和同步器
基于zookeeper
ZooKeeper是一个为分布式应用提供一致性服务的开源组件,它内部是一个分层的文件系统目录树结构,规定同一个目录下只能有一个唯一文件名。基于ZooKeeper实现分布式锁的步骤如下:
1创建一个目录mylock;
2线程A想获取锁就在mylock目录下创建临时顺序节点;
3获取mylock目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁;
4线程B获取所有节点,判断自己不是最小节点,设置监听比自己次小的节点;
5 线程A处理完,删除自己的节点,线程B监听到变更事件,判断自己是不是最小的节点,如果是则获得锁。
优点:具备高可用、可重入、阻塞锁特性,可解决失效死锁问题。
缺点:因为需要频繁的创建和删除节点,性能上不如Redis方式。
开源实现这里Curator,它是一个ZooKeeper客户端,Curator提供的InterProcessMutex是分布式锁的实现,acquire方法用于获取锁,release方法用于释放锁。
版权声明:本文为asdasd3418原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。