redis分布式锁RedissonLock
简单使用
String key = "key-lock";
RLock lock = redisson.getLock(key);
lock.lock();
try {// TODO} catch (Exception e){log.error(e.getMessage(), e);
} finally {lock.unlock();
}
String key = "key-tryLock";
long maxWaitTime = 3_000;
RLock lock = redisson.getLock(key);
if (lock.tryLock(maxWaitTime, TimeUnit.MILLISECONDS)){try {// TODO} catch (Exception e){log.error(e.getMessage(), e);
} finally {lock.unlock();
}} else {log.debug("redis锁竞争失败");
}
流程图
多个线程节点锁竞争的正常流程如下图:
文章图片
多个线程节点锁竞争,并出现节点下线的异常流程如下图:
文章图片
源码解析
RedissonLock是可重入锁,使用redis的hash结构作为锁的标识存储,锁的名称作为hash的key,UUID + 线程ID作为hash的field,锁被重入的次数作为hash的value。如图所示:
文章图片
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {long threadId = Thread.currentThread().getId();
// 尝试获取锁,锁获取成功则ttl为null;
获取失败则返回锁的剩余过期时间Long ttl = tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {return;
}// 锁被其他线程占用而索取失败,使用线程通知而非自旋的方式等待锁// 使用redis的发布订阅pub/sub功能来等待锁的释放通知RFuture future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {while (true) {ttl = tryAcquire(leaseTime, unit, threadId);
// 尝试获取锁,锁获取成功则ttl为null;
获取失败则返回锁的剩余过期时间if (ttl == null) {break;
}if (ttl >= 0) {// 使用LockSupport.parkNanos方法线程休眠try {getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {if (interruptibly) {throw e;
}getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}} else {if (interruptibly) {getEntry(threadId).getLatch().acquire();
} else {getEntry(threadId).getLatch().acquireUninterruptibly();
}}}} finally {// 退出锁竞争(锁获取成功或者放弃获取锁),则取消锁的释放订阅unsubscribe(future, threadId);
}}
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {return true;
}time -= System.currentTimeMillis() - current;
if (time <= 0) {acquireFailed(threadId);
return false;
}current = System.currentTimeMillis();
RFuture subscribeFuture = subscribe(threadId);
if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {unsubscribe(subscribeFuture, threadId);
}});
}acquireFailed(threadId);
return false;
}try {time -= System.currentTimeMillis() - current;
if (time <= 0) {acquireFailed(threadId);
return false;
}while (true) {long currentTime = System.currentTimeMillis();
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquiredif (ttl == null) {return true;
}time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {acquireFailed(threadId);
return false;
}currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {acquireFailed(threadId);
return false;
}}} finally {unsubscribe(subscribeFuture, threadId);
}}
RedissonLock实现的是可重入锁,通过redis的hash结构实现,而非加单的set nx ex。为了实现原子性的复杂的加锁逻辑,而通过lua脚本实现。获取锁会有如下三种状态:
1、锁未被任何线程占用,则锁获取成功,返回null
2、锁被当前线程占用,则锁获取成功并进行锁的重入,对锁的重入计数+1,返回null
3、锁被其他线程占用,则锁获取失败,返回该锁的自动过期时间ttl
RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1);
" +"redis.call('pexpire', KEYS[1], ARGV[1]);
" +"return nil;
" +"end;
" +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1);
" +"redis.call('pexpire', KEYS[1], ARGV[1]);
" +"return nil;
" +"end;
" +"return redis.call('pttl', KEYS[1]);
",Collections.