Redis|Redis Redisson 分布式锁的应用和源码
1. 前言
之前写过一篇《Redis分布式锁的实现》的文章,主要介绍的Redis分布式锁的原始性实现,核心是基于setnx
来加锁,以及使用lua
保障事务的原子性等。但毕竟比较原始,需要根据不同的应用场景做不同的代码实现,也容易考虑不周。当时文章中就有提到 Redisson
框架,刚好最近工作中又用的比较多,这次就着重介绍。
Redisson 是架设在 Redis基础上的一个Java开发框架,底层基于 Netty框架,为使用者提供了一系列具有分布式特性的常用工具类。Redisson的功能非常丰富,具体可参考 github中文wiki,但本文只介绍 Redisson分布式锁的功能。
2. 普通可重入锁
2.1. 使用示例
在SpringBoot项目通过Redisson来加锁非常容易,不需要像之前文章中一样写一大堆代码,框架屏蔽掉了很多细节。如下例:
Config config = new Config();
config.useSingleServer().setAddress("redis://ip:port").setPassword("password").setDatabase(0);
RedissonClient redissonClient = Redisson.create(config);
RLock lock = redissonClient.getLock("LOCK_KEY");
long waitTime=500L;
long leaseTime=15000L;
boolean isLock;
try {
isLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
if (isLock) {
// do something ...
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
注意代码中 Config 并无限制,示例中是Redis单节点连接,但实际上可以是哨兵模式、集群模式、主从模式等。
2.2. 源码讲解
前面例子中加锁用到了
RLock
接口,这里贴一下源码:org.redisson.api.RLock.java
public interface RLock extends Lock, RLockAsync {
String getName();
void lockInterruptibly(long var1, TimeUnit var3) throws InterruptedException;
boolean tryLock(long var1, long var3, TimeUnit var5) throws InterruptedException;
void lock(long var1, TimeUnit var3);
boolean forceUnlock();
boolean isLocked();
boolean isHeldByThread(long var1);
boolean isHeldByCurrentThread();
int getHoldCount();
long remainTimeToLive();
}
对于可重入锁,接口对应的实现方法在
org.redisson.RedissonLock
类里面,源码就不贴了,可以看到落到Redis时,实际的“加锁”和“解锁”过程也是一段lua脚本。1、加锁lua
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end
return redis.call('pttl', KEYS[1]);
参数解释:
KEYS[1]
:被锁资源名。ARGV[1]
:过期时间。ARGV[2]
:当前程序标识(UUID + 当前threadId)。
2、解锁lua
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end
return nil;
参数解释:
KEYS[1]
:被锁资源名。KEYS[2]
:解锁时广播通道名。ARGV[1]
:解锁时广播通道消息(值为0L)。ARGV[2]
:过期时间。ARGV[3]
:当前程序标识(UUID + 当前threadId)。
广播通道的设计很有亮点,当多个线程同时竞争锁时,未抢到锁的线程无需无效轮询,只需订阅一个通道。当锁释放时,在通道中广播消息,通知那些等待获取锁的线程现在可以获得锁了,那些线程再去竞争锁,避免性能资源的浪费。
3、看门狗机制如果拿到分布式锁的节点宕机,且这个锁正好处于锁住的状态时,会出现锁死的状态,为了避免这种情况的发生,锁都会设置一个过期时间。即前面在加锁时传入的
leaseTime
。某些应用场景中,如果在指定时间中,我们尚未完成业务,此时就需要给锁“续期”。如果整个过程完全可控,可以在程序中手动给锁续期。但如果希望能自动续期,就可以用到Redisson的Wath Dog(看门狗)机制。Redisson提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期,也就是说,如果一个拿到锁的线程一直没有完成逻辑,那么看门狗会帮助线程不断的延长锁超时时间,锁不会因为超时而被释放。默认情况下,看门狗的续期时间是30s,也可以通过修改Config.lockWatchdogTimeout来另行指定。
【Redis|Redis Redisson 分布式锁的应用和源码】下面就是加锁的源码,注意,在调用加锁方法时,如果想用看门狗,则传
leaseTime
值为-1L
。如果给leaseTime设置了有效值,那么看门狗就不会生效,锁不会自动续期,而是在你指定的时间后自动解锁。private RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
RFuture ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining == null) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
3. RedLock 红锁 3.1. 概念说明
也是在之前的那一篇文章中,也提到了RedLock,中文直译“红锁”。其实那篇文章已经介绍过了,这里再介绍一下。前面用redis实现分布式锁时存在漏洞,具体场景:
客户端A在Redis master节点申请锁。但master在将存储的key同步到slave上之前崩溃了,然后slave晋升为master。而客户端B申请一个客户端A已经持有的资源的锁。然后呢?然后呢?出问题啦,客户端A和B都能申请到同一个锁。
RedLock是Redis官方提出的算法,具体流程包括:
- 获取当前时间。
- 依次N个节点获取锁,并设置响应超时时间,防止单节点获取锁时间过长。
- 锁有效时间=锁过期时间-获取锁耗费时间,如果第2步骤中获取成功的节点数大于
N/2+1,且锁有效时间大于0,则获得锁成功。 - 若获得锁失败,则向所有节点释放锁。
3.2. 使用示例
Redisson关于RedLock的使用代码上及其简单,只是将几个锁组合成一个“大锁”,然后再正常使用“大锁”的加锁/解锁。
Config config1 = new Config();
config1.useSingleServer().setAddress("redis://ip1:port1")
.setPassword("password1").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);
Config config2 = new Config();
config2.useSingleServer().setAddress("redis://ip2:port2")
.setPassword("password2").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);
Config config3 = new Config();
config3.useSingleServer().setAddress("redis://ip3:port3")
.setPassword("password3").setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);
String lockKey = "REDLOCK_KEY";
RLock lock1 = redissonClient1.getLock(lockKey);
RLock lock2 = redissonClient2.getLock(lockKey);
RLock lock3 = redissonClient3.getLock(lockKey);
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
boolean isLock;
long waitTime=500L;
long leaseTime=15000L;
try {
isLock = redLock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
if (isLock) {
// do something ...
}
} catch (Exception e) {
... ...
} finally {
redLock.unlock();
}
注意代码中 Config 并无限制,示例中是Redis单节点连接,但实际上可以是哨兵模式、集群模式、主从模式等。
3.3. 源码讲解
在讲Redisson的
RedLock
(红锁)之前,先讲 MultiLock
(联锁),原因先看 RedissonRedLock
源码,完全是继承 RedissonMultiLock
的所有功能。RedissonRedLock.java
public class RedissonRedLock extends RedissonMultiLock {
public RedissonRedLock(RLock... locks) {
super(locks);
}protected int failedLocksLimit() {
return this.locks.size() - this.minLocksAmount(this.locks);
}protected int minLocksAmount(List locks) {
return locks.size() / 2 + 1;
}protected long calcLockWaitTime(long remainTime) {
return Math.max(remainTime / (long)this.locks.size(), 1L);
}public void unlock() {
this.unlockInner(this.locks);
}
}
RedissonMultiLock.java核心代码
// 加锁
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1L;
if (leaseTime != -1L) {
if (waitTime == -1L) {
newLeaseTime = unit.toMillis(leaseTime);
} else {
newLeaseTime = unit.toMillis(waitTime) * 2L;
}
}long time = System.currentTimeMillis();
long remainTime = -1L;
if (waitTime != -1L) {
remainTime = unit.toMillis(waitTime);
}long lockWaitTime = this.calcLockWaitTime(remainTime);
int failedLocksLimit = this.failedLocksLimit();
List acquiredLocks = new ArrayList(this.locks.size());
ListIterator iterator = this.locks.listIterator();
while(iterator.hasNext()) {
RLock lock = (RLock)iterator.next();
boolean lockAcquired;
try {
if (waitTime == -1L && leaseTime == -1L) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException var21) {
this.unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception var22) {
lockAcquired = false;
}if (lockAcquired) {
acquiredLocks.add(lock);
} else {
if (this.locks.size() - acquiredLocks.size() == this.failedLocksLimit()) {
break;
}if (failedLocksLimit == 0) {
this.unlockInner(acquiredLocks);
if (waitTime == -1L) {
return false;
}failedLocksLimit = this.failedLocksLimit();
acquiredLocks.clear();
while(iterator.hasPrevious()) {
iterator.previous();
}
} else {
--failedLocksLimit;
}
}if (remainTime != -1L) {
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
if (remainTime <= 0L) {
this.unlockInner(acquiredLocks);
return false;
}
}
}if (leaseTime != -1L) {
List> futures = new ArrayList(acquiredLocks.size());
Iterator var24 = acquiredLocks.iterator();
while(var24.hasNext()) {
RLock rLock = (RLock)var24.next();
RFuture future = ((RedissonLock)rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}var24 = futures.iterator();
while(var24.hasNext()) {
RFuture rFuture = (RFuture)var24.next();
rFuture.syncUninterruptibly();
}
}return true;
}// 解锁
public void unlock() {
List> futures = new ArrayList(this.locks.size());
Iterator var2 = this.locks.iterator();
while(var2.hasNext()) {
RLock lock = (RLock)var2.next();
futures.add(lock.unlockAsync());
}var2 = futures.iterator();
while(var2.hasNext()) {
RFuture future = (RFuture)var2.next();
future.syncUninterruptibly();
}}
RedissonRedLock.java
中重写了 RedissonMultiLock.java
里的几个方法:- failedLocksLimit:MultiLock中返回
0
,RedLock中返回locks.size() / 2 - 1
。 - calcLockWaitTime:MultiLock中返回
remainTime
,RedLock中返回Math.max(remainTime / (long)this.locks.size(), 1L)
。
推荐阅读
- 深入浅出谈一下有关分布式消息技术(Kafka)
- springboot使用redis缓存
- (1)redis集群原理及搭建与使用(1)
- KubeDL HostNetwork(加速分布式训练通信效率)
- springboot结合redis实现搜索栏热搜功能及文字过滤
- Redis——发布订阅/消息队列
- redis|redis 常见问题一
- 实操Redission|实操Redission 分布式服务
- 分布式|《Python3网络爬虫开发实战(第二版)》内容介绍
- 二、Redis的五种常用数据类型