后端和服务器|redis分布式锁的实现总结

在Java中,关于锁我想大家都很熟悉。在并发编程中,我们通过锁,来避免由于竞争而造成的数据不一致问题。通常我们以进程锁synchronized 、Lock来实现它,对于分布式程序,就不能用进程锁了,这时候常用的是分布式锁。
什么是分布式锁 分布式锁,是一种思想,它的实现方式有很多。比如,我们将沙滩当做分布式锁的组件,那么它看起来应该是这样的:
加锁 【后端和服务器|redis分布式锁的实现总结】在沙滩上踩一脚,留下自己的脚印,就对应了加锁操作。其他进程或者线程,看到沙滩上已经有脚印,证明锁已被别人持有,则等待。
解锁 把脚印从沙滩上抹去,就是解锁的过程。
锁超时 为了避免死锁,我们可以设置一阵风,在单位时间后刮起,将脚印自动抹去。
分布式锁的实现有很多,比如基于数据库、memcached、Redis、系统文件、zookeeper等。它们的核心的理念跟上面的过程大致相同。基于数据库可以用乐观锁和悲观锁处理分页式锁,乐观锁使用对比记录version号来实现,悲观锁使用类似“select * where * for update”行锁实现。
本文讨论的是基于redis实现分页式锁的问题,别的方面不做详说,有相关需求可以参考和查阅别的资料。
Redis分布式锁原理 加锁 加锁实际上就是在redis中,给Key键设置一个值,为避免死锁,并给定一个过期时间。

SET lock_key random_value NX PX 5000
值得注意的是:
random_value 是客户端生成的唯一的字符串。
NX 代表只在键不存在时,才对键进行设置操作。
PX 5000 设置键的过期时间为5000毫秒。
这样,如果上面的命令执行成功,则证明客户端获取到了锁。
解锁 解锁的过程就是将Key键删除。但也不能乱删,不能说客户端1的请求将客户端2的锁给删除掉。这时候random_value的作用就体现出来。
为了保证解锁操作的原子性,我们用LUA脚本完成这一操作。先判断当前锁的字符串是否与传入的值相等,是的话就删除Key,解锁成功。
if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end

jedis实现(单节点)
/** * 获取分布式锁:一分命令,保证事务的一致性。 * @param lockKey * @param requestId * @param expireTime * @return */ public static boolean getDistributeLock(String lockKey, String requestId, long expireTime) { Jedis jedis = null; try { jedis = getResource(); String result = jedis.set(lockKey,requestId,"NX","PX",expireTime); if ("OK".equals(result)) { return true; } } catch (Exception e) { logger.error("getDistributeLock {}", lockKey, e); } finally { returnResource(jedis); } return false; }/** * 释放分布式锁:使用lua脚本,一个命令实现对带有标志的锁的释放 * @param lockKey * @param requestId * @return */ public static boolean releaseDistributeLock(String lockKey, String requestId) { Jedis jedis = null; try { jedis = getResource(); String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); Long RELEASE_SUCCESS = 1L; if (RELEASE_SUCCESS.equals(result)) { return true; } } catch (Exception e) { logger.error("releaseDistributeLock {}", lockKey, e); } finally { returnResource(jedis); } return false; }

注意:这里的requestId,类似客户端口请求id,每次请求都是不同的可以使用uuid,测试和使用可以参考后面的”测试和说明“部分。
缺点:在集群包括主从、哨兵模式、集群模式不可用; 锁不具有可重入性。
redisson实现(通用) Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
Redisson底层采用的是Netty 框架。支持Redis 2.8以上版本,支持Java1.6+以上版本。它里面也实现了分布式锁,而且包含多种类型的锁:可重入锁,公平锁等。
具体实现如下:
JedisUtil提供
//从配置类中获取redisson对象 private static Redisson redisson = JedisConfig.getRedisson(); //加锁 Redisson:适用单机、主从、哨兵和集群 //同步方法,等待锁返回执行 所以涉及锁使用的,可以放在线程池中进行 public static boolean acquire(String lockName){ //声明key对象 String key = lockName; //获取锁对象 RLock mylock = redisson.getLock(key); //加锁,并且设置锁过期时间,防止死锁的产生 mylock.lock(2, TimeUnit.MINUTES); // 分钟 //加锁成功 returntrue; } //锁的释放 Redisson:适用单机、主从、哨兵和集群 //同步方法,等待锁返回执行所以涉及锁使用的,可以放在线程池中进行 public static void release(String lockName){ //必须是和加锁时的同一个key String key =lockName; //获取所对象 RLock mylock = redisson.getLock(key); //释放锁(解锁) mylock.unlock(); }

JedisConfig提供
private static Config config = new Config(); //声明redisso对象 private static Redisson redisson = null; static{//可以用"redis://"来启用SSL连接 if (IS_CLUSTER.equals(CLUSTER_USED)) {//集群 log.info("Redisson redis lock init cluster config:"+server1+"; "+server2+"; "+server3+"; "+server4+"; "+server5+"; "+server6); config.useClusterServers().addNodeAddress( "redis://".concat(server1),"redis://".concat(server2), "redis://".concat(server3), "redis://".concat(server4),"redis://".concat(server5), "redis://".concat(server6)).setScanInterval(5000); } else {//单机 log.info("Redisson redis lock init single node config:"+server1+"; "+server2+"; "+server3+"; "+server4+"; "+server5+"; "+server6); config.useSingleServer().setAddress("redis://".concat(poolHost).concat(":").concat(poolPort)); }//得到redisson对象 redisson = (Redisson) Redisson.create(config); } /** * Redisson redis分布式锁处理对象 * @return */ public static Redisson getRedisson() { return redisson; }

测试和说明 测试和使用,可以参考下面的junit测试用例。
@Slf4j public class JedisUtilTest extends SpringTxTestCase { private static Logger logger = LoggerFactory.getLogger(JedisUtils.class); /** * 单机版本:加解锁功能 */ @Test public void testSingleRedisLockAndUnlock(){ JedisUtils.getDistributeLock("lockKey","requestId",JedisConfig.JEDIS_EXPIRE); { for (int i = 0; i < 5; i++) { boolean result = JedisUtils.getDistributeLock("lockKey","requestId",JedisConfig.JEDIS_EXPIRE); System.out.println(Thread.currentThread().getName()+":lock result:"+result); JedisUtils.releaseDistributeLock("lockKey","requestId"); boolean result1 = JedisUtils.getDistributeLock("lockKey","requestId",JedisConfig.JEDIS_EXPIRE); System.out.println(Thread.currentThread().getName()+":unlock result1:"+result1); } } } /** * 单机版本:锁测试 */ @Test public void testSingleRedisLock(){ { final CyclicBarrier cbRef = new CyclicBarrier(10); final ReentrantLock reentrantLock=new ReentrantLock(); for(int i=0; i<10; i++){ Thread t=new Thread(new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread().getName() + "准备"); cbRef.await(); //10个线程等待在这里 才开始执行下面的 //reentrantLock.lock(); //tryGetDistributedLock("hello","hello",10000); boolean result = JedisUtils.getDistributeLock("lockKey","requestId",JedisConfig.JEDIS_EXPIRE); System.out.println(Thread.currentThread().getName()+"===lock result:"+result); JedisUtils.releaseDistributeLock("lockKey",UUID.randomUUID().toString()); boolean result1 = JedisUtils.getDistributeLock("lockKey","requestId",JedisConfig.JEDIS_EXPIRE); System.out.println(Thread.currentThread().getName()+"===lock result1:"+result); } catch (Exception e) { e.printStackTrace(); }finally { //reentrantLock.unlock(); } } }); t.start(); } //这一段可以不要 try { Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + "起跑"); System.out.println( cbRef.getParties()+"--" +cbRef.getNumberWaiting()); } catch (Exception e){ e.printStackTrace(); } } }/** * 单机版本redis:测试分布式锁的使用方法 */ @Test public void testUseOfSingleRedisLock() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(10); String data2Deal = "data to deal"; final CyclicBarrier cbRef = new CyclicBarrier(10); for(int i=0; i<10; i++){ Thread t=new Thread(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + "准备"); try { cbRef.await(); //10个线程等待在这里 才开始执行下面的+ } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } final ReentrantLock reentrantLock=new ReentrantLock(); reentrantLock.lock(); try { String data2Deal = "data to deal:" + Thread.currentThread().getName(); useOfSingleRedisLock(data2Deal); } catch (Exception e){ e.printStackTrace(); } finally { reentrantLock.unlock(); } countDownLatch.countDown(); } }); t.start(); } countDownLatch.await(); System.out.println("所有线程都执行完了……"); }/** * 分布式锁的使用方法:单机redis cluster包括(集群和哨兵)不适用。 * @param data2Deal */ public void useOfSingleRedisLock(String data2Deal){ String requestId = UUID.randomUUID().toString(); if(JedisPoolUtils.getDistributeLock("lock_key", requestId, 1000*60*5)){ try { methonNeedDisLock(data2Deal); } catch (Exception e) { logger.error("分布式锁业务处理失败!",e); e.printStackTrace(); } finally { JedisPoolUtils.releaseDistributeLock("lock_key",requestId); } } else { try { Thread.sleep(1000); useOfSingleRedisLock(data2Deal); } catch (InterruptedException e) { logger.error(e.getMessage()); } } }/* *需要分布式锁的业务代码 */ public void methonNeedDisLock(String data2Deal){ System.out.println("分布式锁业务处理方法:"+data2Deal); } /** * 测试分布式锁(Redisson)的使用方法:redis单机和哨兵、集群都适用 * 测试说明:开启1000个线程,对count进行累加 */int count = 0; @Test public void testRedisLock() throws InterruptedException { int clientcount =1000; final CountDownLatch countDownLatch = new CountDownLatch(clientcount); ExecutorService executorService = Executors.newFixedThreadPool(clientcount); long start = System.currentTimeMillis(); for (int i = 0; i

参考 https://blog.csdn.net/u014353343/article/details/88921212
https://www.jianshu.com/p/828aa3b44564
https://www.jianshu.com/p/47fd7f86c848
臭味相投的朋友们,我在这里:

猿in小站:http://www.yuanin.net

csdn博客:https://blog.csdn.net/jiabeis

简书:https://www.jianshu.com/u/4cb7d664ec4b

微信免费订阅号“猿in”
后端和服务器|redis分布式锁的实现总结
文章图片

    推荐阅读