Redis分布式实现原理


一、使用 1、pom.xml导入依赖

org.springframework.boot spring-boot-starter-integration org.springframework.integration spring-integration-redis org.springframework.boot spring-boot-starter-data-redis

Redis分布式实现原理
文章图片
2、配置文件
@Configuration public class RedissonConfig { // 自定义在yml或properties文件中 @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private String port; @Value("${spring.redis.password}") private String password; @Bean public RedissonClient getRedisson() { Config config = new Config(); if (StringUtils.isBlank(password)) { config.useSingleServer().setAddress("redis://" + host + ":" + port); } else { config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password); } return Redisson.create(config); } }

Redis分布式实现原理
文章图片
3、使用类
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.integration.redis.util.RedisLockRegistry; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; /** * @author Forlan * @since 2022-04-03 */ public class ForlanTest { private static final Logger logger = LoggerFactory.getLogger(ForlanTest.class); @Autowired private RedisLockRegistry redisLockRegistry; public void test() { String lockKey = "key_id"; Lock loginLock = redisLockRegistry.obtain(lockKey.intern()); boolean getLock = false; try { // tryLock()底层调用this.tryLock(0L, TimeUnit.MILLISECONDS) getLock = loginLock.tryLock(5, TimeUnit.SECONDS); if (getLock) { //获得锁执行业务 } } catch (Exception e) { logger.error("异常信息...", e); } finally { if (getLock) { //释放锁 loginLock.unlock(); } } } }

Redis分布式实现原理
文章图片
二、原理 obtain方法
private final Map locks; private final class RedisLock implements Lock { private final String lockKey; private final ReentrantLock localLock; private volatile long lockedAt; private RedisLock(String path) { this.localLock = new ReentrantLock(); this.lockKey = this.constructLockKey(path); } } public Lock obtain(Object lockKey) { Assert.isInstanceOf(String.class, lockKey); String path = (String)lockKey; return (Lock)this.locks.computeIfAbsent(path, (x$0) -> { return new RedisLockRegistry.RedisLock(x$0); }); }

Redis分布式实现原理
文章图片
主要是根据lockKey去查locks这个map中是否已经存在这个key
如果存在就返回内部类RedisLock
如果不存在就创建一个RedisLock,以lockKey为key,RedisLock为value放入map中
备注:每个分布式应用自己都会创建一个RedisLockRegistry实例,同一个应用的多个线程共享RedisLock类
tryLock方法
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { long now = System.currentTimeMillis(); // 尝试拿取本地锁 if (!this.localLock.tryLock(time, unit)) { return false; } else { try { long expire = now + TimeUnit.MILLISECONDS.convert(time, unit); boolean acquired; // 当前时间还没过期并且还未获得redis锁,睡眠100ms继续重试 while(!(acquired = this.obtainLock()) && System.currentTimeMillis() < expire) { Thread.sleep(100L); } if (!acquired) { this.localLock.unlock(); } return acquired; } catch (Exception var9) { this.localLock.unlock(); this.rethrowAsLockException(var9); return false; } } }

Redis分布式实现原理
文章图片
主要过程
先获得本地锁,拿不到直接返回失败
当前时间还没过期并且还没拿到redis锁,睡眠100ms继续重试
如果拿到redis锁,结束循环,返回成功
如果超时了还没拿到,释放锁,返回失败
拿redis锁的过程
private boolean obtainLock() { boolean success = (Boolean)RedisLockRegistry.this.redisTemplate.execute(RedisLockRegistry.this.obtainLockScript, Collections.singletonList(this.lockKey), new Object[]{RedisLockRegistry.this.clientId, String.valueOf(RedisLockRegistry.this.expireAfter)}); if (success) { this.lockedAt = System.currentTimeMillis(); } return success; }

Redis分布式实现原理
文章图片
通过obtainLock方法,执行lua脚本来获取
redisTemplate.execute()参数说明:
第一个obtainLockScript参数就是要执行的lua脚本;
local lockClientId = redis.call('GET', KEYS[1]) if lockClientId == ARGV[1] then redis.call('PEXPIRE', KEYS[1], ARGV[2]) return true elseif not lockClientId then redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2]) return true end return false

Redis分布式实现原理
文章图片
第二个参数就是表示在脚本中所用到的那些 Redis 键(key),这些键名参数可以在 Lua 中通过全局变量 KEYS 数组,用1为基址的形式访问( KEYS[1] , KEYS[2] ,以此类推);
【Redis分布式实现原理】第三个参是附加参数 arg [arg …] ,可以在 Lua 中通过全局变量 ARGV 数组访问,访问的形式和 KEYS 变量类似( ARGV[1] 、 ARGV[2] ,诸如此类)
为什么要用本地锁
  • 为了可重入
  • 为了减轻redis服务器的压力
为什么要用lua脚本
  • 保证原子性
  • 减少网络开销
  • 替代redis的事务功能
unlock方法
public void unlock() { if (!this.localLock.isHeldByCurrentThread()) { throw new IllegalStateException("You do not own lock at " + this.lockKey); } else if (this.localLock.getHoldCount() > 1) { this.localLock.unlock(); } else { try { if (Thread.currentThread().isInterrupted()) { RedisLockRegistry.this.executor.execute(this::removeLockKey); } else { this.removeLockKey(); } if (RedisLockRegistry.logger.isDebugEnabled()) { RedisLockRegistry.logger.debug("Released lock; " + this); } } catch (Exception var5) { ReflectionUtils.rethrowRuntimeException(var5); } finally { this.localLock.unlock(); } } }

Redis分布式实现原理
文章图片
释放锁的过程
1、判断是否是当前线程持有锁,如果不是,抛异常(本地锁)
2、判断当前线程持有锁的计数
如果当前线程持有锁的计数 > 1,说明本地锁被当前线程多次获取,这时只会释放本地锁,释放之后当前线程持有锁的计数-1。
否则,释放本地锁和redis锁。
?

    推荐阅读