分布式场景Redis客户端最优解决方案-lettuce|分布式场景Redis客户端最优解决方案-lettuce + Redisson

Redis 官方推荐的 Java 客户端有Jedis、lettuce 和 Redisson。三种客户端各有优缺点,在我们业务实现中选择合适的客户端,有助于提高Redis的使用性能。要选择合适的客户端我们应该大概清楚Redis客户端的通讯方式和各种客户端的优缺点。
Redis概述 Redis支持多种语言客户端,支持的客户端可以从官网查看,这里列举一系列常见的客户端:
  • C语言
  • C++
  • C#
  • Java
  • Python
  • Node.js
  • PHP
Redis采用单线程方式处理多个客户端连接,作为程序开发者应该了解 Redis 服务端和客户端的通信协议,以及主流编程语言的 Redis 客户端使用方法,同时还需要了解客户端管理的相应 API 以及开发运维中可能遇到的问题。
Redis客户端通讯协议 Redis制定RESP(Redis Serializable Protocol, RESP)协议,实现客户端和服务端的正常交互,这种协议简单高效,既能够被机器解析,又容易被人类识别。
RESP可以序列号不同数据类型数据(整型、字符串、数组以及特殊类型Error)。需要执行的Redis命令会封装成类似字符串数组的请求通过客户端发送到服务端。Redis会基于制定的命令类型选择对应的数据类型进行回复。
RESP发送命令格式 在RESP中,发送数据的类型取决于数据报文的首个字节:
  • 单行字符串:单行字符串的第一个字节为+ ,最后两个字节是\r\n,其他字节是字符串内容+OK\r\nError;
  • 错误消息:错误消息的第一个字节为-,最后两个字节是\r\n,其他字节是异常消息的文本内容-ERR\r\nInteger;
  • 整型数字:整型数字的第一个字节是:,最后两个字节是\r\n,其他字节是数字的文本内容:100\r\nBulk;
  • 定长字符串:定长字符串的String定长字符串第一个字节是美元符号 ,紧接着的字节是内容字符串长度\r\n,最后两个字节是\r\n,其他字节是字符串内容$4\r\ndoge\r\n;
  • RESP数组:RESP数组的第一个字节是,紧接着的字节是元素个数\r\n,最后两个字节是\r\n,其他字节是各个元素的内容,每个元素可以是任意一种数据类型2\r\n:100\r\n$4\r\ndoge\r\n。
发送的命令格式如下,CRLF代表"\r\n":
Copy*<参数数量> CRLF $<参数1的字节数量> CRLF <参数1> CRLF ... $<参数N的字节数量> CRLF <参数N> CRLF

以set hello world这个命令为例,发送的内容就是这样的:
Copy*3 $3 SET $5 hello $5 world

第一行*3表示有3个参数,5表示下一个参数有5个字节,接下来是参数,$5表示下一个参数有5个字节,接下来是参数。
所以set hello world最终发送给redis服务器的命令是:
Copy*3\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n

RESP响应内容 CopyRedis的返回结果类型分为以下五种:
正确回复:在RESP中第一个字节为"+"
错误回复:在RESP中第一个字节为"-"
整数回复:在RESP中第一个字节为":"
字符串回复:在RESP中第一个字节为"$"
多条字符串回复:在RESP中第一个字节为"*"
(+) 表示一个正确的状态信息,具体信息是当前行+后面的字符。
(-) 表示一个错误信息,具体信息是当前行-后面的字符。
(* ) 表示消息体总共有多少行,不包括当前行,*后面是具体的行数。
(美元符合) 表示下一行数据长度,不包括换行符长度\r\n,$后面则是对应的长度的数据。
(:) 表示返回一个数值,:后面是相应的数字节符。

分布式场景Redis客户端最优解决方案-lettuce|分布式场景Redis客户端最优解决方案-lettuce + Redisson
文章图片
Java客户端 Jedis 传统老牌Java客户端,一直在更新,对redis命令有比较全面的支持。
优点
  • 支持全面的Redis命令,具有全面的API。
缺点
  • 阻塞式I/O,其方法都是同步调用,程序流需要等到 sockets 处理完 I/O 才能执行,不支持异步;
  • Jedis 客户端实例不是线程安全的,所以需要通过连接池来使用 Jedis。
lettuce lettuce是可扩展的线程安全的客户端,支持异步模式。如果避免阻塞和事务操作,如BLPOP和MULTI/EXEC,多个线程就可以共享一个连接。lettuce 底层基于 Netty,支持高级的 Redis 特性,比如哨兵,集群,管道,自动重新连接和Redis数据模型。
优点:
  • 支持同步异步通信模式;
  • Lettuce 的 API 是线程安全的,如果不是执行阻塞和事务操作,如BLPOP和MULTI/EXEC,多个线程就可以共享一个连接。
  • SpringBoot 2.x默认Redis客户端,对Lettuce 提供完美支持
Redisson Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务。其中包括( BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson 提供了使用Redis 的最简单和最便捷的方法。Redisson 的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
优点:
  • 使用者对 Redis 的关注分离,可以类比 Spring 框架,这些框架搭建了应用程序的基础框架和功能,提升开发效率,让开发者有更多的时间来关注业务逻辑;
  • 提供很多分布式相关操作服务,例如,分布式锁,分布式集合,可通过Redis支持延迟队列等。
    缺点:
  • Redisson 对字符串的操作支持比较差。
Redis Java客户端技术选型 文章基于SpringBoot 2.x进行选型
  • 仅使用Redis进行字符串的数据进行缓存处理,可以使用默认的lettuce。
  • 需要使用Redis做分布式事务,可以选择lettuce + Redisson。
  • 需要缓存分布式集合数据并对集合进行频繁读写操作,可以选择lettuce + Redisson。
  • 使用Redis 做消息队列,可以选择lettuce + Redisson。
SpringBoot 集成lettuce + Redisson 在大多数分布式互联网应用中,redis经常用来处理复杂业务而不是单纯的字符串存储,比如使用Redis集合、分布式锁、队列等,因此集成lettuce + Redisson,按照不同业务而使用不同组件是比较优秀的方案。
集成lettuce
  • 引入依赖
org.springframework.session spring-session-data-redis

SpringBoot 2.X默认使用lettuce ,因此提供了比较好的支持,RedisAutoConfiguration完成了对JedisConnectionFactory和LettuceConnectionFactory 的自动配置。同时RedisProperties封装了redis配置。
  • 添加配置
    application.yml
spring: redis: host: 192.168.128.20 #单机配置 port: 6379 #单机配置 #cluster: #nodes: xxxurl集群配置 # Redis默认情况下有16个分片,配置具体使用的分片,默认为0 database: 0 lettuce: pool: # 连接池最大连接数 默认8 ,负数表示没有限制 max-active: 8 # 连接池最大阻塞等待时间(使用负值表示没有限制) 默认-1 max-wait: -1 # 连接池中的最大空闲连接 默认8 max-idle: 8 # 连接池中的最小空闲连接 默认0 min-idle: 0

集成Redission
  • 引入依赖
org.redisson redisson ${redisson.version}

  • 添加配置
    redisson.yml
redission: pool: # 连接池最大连接数 默认8 ,负数表示没有限制 max-active: 8 # 连接池最大阻塞等待时间(使用负值表示没有限制) 默认-1 max-wait: -1 # 连接池中的最大空闲连接 默认8 max-idle: 8 # 连接池中的最小空闲连接 默认0 min-idle: 0 connTimeout: 10000 timeout: 3000 soTimeout: 3000 size: 32 #集群配置 cluster: #集群扫描间隔时间 单位毫秒 scanInterval: 1000 # 只在从服务节点里读取 readMode: "SLAVE" slaveSubscriptionConnectionPoolSize: 50 # 从节点最小空闲连接数 slaveConnectionMinimumIdleSize: 32 # 从节点连接池大小 slaveConnectionPoolSize: 64 # 主节点最小空闲连接数 masterConnectionMinimumIdleSize: 32 # 主节点连接池大小 masterConnectionPoolSize: 64 # 命令失败重试次数 retryAttempts: 3 # 命令重试发送时间间隔 retryInterval: 1500 # failedAttempts failedAttempts: 3

配置客户端
@EnableConfigurationProperties({RedissionProperties.class, RedisProperties.class}) public class RedisConfig {@Value("${spring.profiles.active}") private String env; @Value("${global-conf.active-profiles.prod}") private String activeProd; @Autowired private RedisProperties redisProperties; @Autowired private RedissionProperties redissionProperties; /** * springboot2.x 使用LettuceConnectionFactory 代替 RedisConnectionFactory * 在application.yml配置基本信息后,springboot2.xRedisAutoConfiguration能够自动装配 LettuceConnectionFactory 和 * RedisConnectionFactory 及其 RedisTemplate 单机模式和集群模式配置都自动注入,因为在定义Lettuce客户端时无需显示指定模式 * * @param redisConnectionFactory * @return */ @Bean public RedisTemplate redisTemplate( LettuceConnectionFactory redisConnectionFactory) {RedisTemplate redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(redisSerializer()); redisTemplate.setHashKeySerializer(new StringRedisSerializer()); redisTemplate.setHashValueSerializer(redisSerializer()); return redisTemplate; }@Bean @ConditionalOnProperty(name = "spring.redis.model", havingValue = "https://www.it610.com/article/single") public RedissonClient getSingleRedisson() { Config config = new Config(); String node = redisProperties.getHost(); //用"rediss://"来启用SSL连接 node = node.startsWith("redis://") ? node : "redis://" + node; SingleServerConfig serverConfig = config.useSingleServer(); serverConfig.setAddress(node) .setConnectionPoolSize(redissionProperties.getPool().getSize()) .setTimeout(redissionProperties.getPool().getTimeout()) .setConnectionMinimumIdleSize(redissionProperties.getPool().getMinIdle()); if (StringUtils.isNotBlank(redisProperties.getPassword())) { serverConfig.setPassword(redisProperties.getPassword()); }return Redisson.create(config); }@Bean @ConditionalOnProperty(name = "spring.redis.model", havingValue = "https://www.it610.com/article/cluster") public RedissonClient getClusterRedisson() { Config config = new Config(); //用"rediss://"来启用SSL连接 List newNodes = redisProperties.getCluster().getNodes() .stream() .map(node -> node.startsWith("redis://") ? node : "redis://" + node) .collect( Collectors.toList()); ClusterServersConfig serverConfig = config.useClusterServers() .addNodeAddress(newNodes.toArray(new String[0])) .setScanInterval( redissionProperties.getCluster().getScanInterval()) .setIdleConnectionTimeout( redissionProperties.getPool().getSoTimeout()) .setConnectTimeout( redissionProperties.getPool().getConnTimeout()) .setRetryAttempts( redissionProperties.getCluster().getRetryAttempts()) .setRetryInterval( redissionProperties.getCluster().getRetryInterval()) .setMasterConnectionPoolSize(redissionProperties.getCluster() .getMasterConnectionPoolSize()) .setSlaveConnectionPoolSize(redissionProperties.getCluster() .getSlaveConnectionPoolSize()) .setTimeout(redissionProperties.getPool().getTimeout()); if (StringUtils.isNotBlank(redisProperties.getPassword())) { serverConfig.setPassword(redisProperties.getPassword()); } return Redisson.create(config); }@Bean public RedisSerializer redisSerializer() { //创建JSON序列化器 Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer<>( Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); //必须设置,否则无法将JSON转化为对象,会转化成Map类型 objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL); serializer.setObjectMapper(objectMapper); return serializer; }@Bean public RedisCacheManager redisCacheManager(LettuceConnectionFactory redisConnectionFactory) { RedisCacheWriter redisCacheWriter = RedisCacheWriter .nonLockingRedisCacheWriter(redisConnectionFactory); //设置Redis缓存有效期为1天 RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig() .serializeValuesWith( RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer())).entryTtl( Duration.ofDays(1)); return new RedisCacheManager(redisCacheWriter, redisCacheConfiguration); } }
自定义Redis工具类 通常情况key一般是String类型,而value可能不一样,一般有两种:String和Object。
如果k-v都是String,我们直接使用StringRedisTemplate
  • SimpleStringRedisUtil
@Component public class SimpleStringRedisUtil {@Autowired private StringRedisTemplate redisTemplate; /** * 删除key * * @param key */ public void delete(String key) { redisTemplate.delete(key); }/** * 序列化key * * @param key * @return */ public byte[] dump(String key) { return redisTemplate.dump(key); }/** * 是否存在key * * @param key * @return */ public Boolean exists(String key) { return redisTemplate.hasKey(key); }/** * 设置指定 key 的值 * * @param key * @param value */ public void set(String key, String value) { redisTemplate.opsForValue().set(key, value); }/** * 写入redis缓存(设置expire存活时间) * @param key * @param value * @param expire * @return */ public void set(final String key, String value, Long expire){ ValueOperations operations = redisTemplate.opsForValue(); operations.set(key, value); redisTemplate.expire(key, expire, TimeUnit.SECONDS); }/** * 获取指定 key 的值 * * @param key * @return */ public String get(String key) { return redisTemplate.opsForValue().get(key); } }

如果k-是String,v-是Object,比如Json对象、可序列化对象以及固定不变的集合等,那就需要需要RedisTemplate接口
  • SimpleObjectRedisUtil
@Component public class SimpleObjectRedisUtil {@Autowired private RedisTemplate redisTemplate; public Set keys(String keys){ try { return redisTemplate.keys(keys); }catch (Exception e){ e.printStackTrace(); return null; } }/** * 指定缓存失效时间 * @param key 键 * @param time 时间(秒) * @return */ public boolean expire(String key, long time) { try { if (time > 0) { redisTemplate.expire(key, time, TimeUnit.SECONDS); } return true; } catch (Exception e) { e.printStackTrace(); return false; } }/** * 指定缓存失效时间 * @param key 键 * @param time 时间 * @param time timeUnit * @return */ public boolean expire(String key, long time, TimeUnit timeUnit) { try { if (time > 0) { redisTemplate.expire(key, time, timeUnit); } return true; } catch (Exception e) { e.printStackTrace(); return false; } }/** * 根据key 获取过期时间 * @param key 键 不能为null * @return 时间(秒) 返回0代表为永久有效 */ public long getExpire(String key) { return redisTemplate.getExpire(key, TimeUnit.SECONDS); }/** * 根据key 获取过期时间 * @param key 键 不能为null * @param timeUnit * @return 时间(秒) 返回0代表为永久有效 */ public long getExpire(String key, TimeUnit timeUnit) { return redisTemplate.getExpire(key, timeUnit); }/** * 判断key是否存在 * @param key 键 * @return true 存在 false不存在 */ public boolean hasKey(String key) { try { return redisTemplate.hasKey(key); } catch (Exception e) { e.printStackTrace(); return false; } } /** * 删除缓存 * @param key 可以传一个值 或多个 */ @SuppressWarnings("unchecked") public void del(String... key) { if (key != null && key.length > 0) { if (key.length == 1) { redisTemplate.delete(key[0]); } else { redisTemplate.delete(CollectionUtils.arrayToList(key)); } } } /** * 普通缓存获取 * @param key 键 * @return 值 */ public Object get(String key) { return key == null ? null : redisTemplate.opsForValue().get(key); } /** * 普通缓存放入 * @param key 键 * @param value 值 * @return true成功 false失败 */ public boolean set(String key, Object value) { try { redisTemplate.opsForValue().set(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } }/** * 普通缓存放入并设置时间 * @param key 键 * @param value 值 * @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期 * @return true成功 false 失败 */ public boolean set(String key, Object value, long time) { try { if (time > 0) { redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS); } else { set(key, value); } return true; } catch (Exception e) { e.printStackTrace(); return false; } }/** * 普通缓存放入并设置时间 * @param key 键 * @param value 值 * @param time 时间 time要大于0 如果time小于等于0 将设置无限期 * @param timeUnit * @return true成功 false 失败 */ public boolean set(String key, Object value, long time, TimeUnit timeUnit) { try { if (time > 0) { redisTemplate.opsForValue().set(key, value, time, timeUnit); } else { set(key, value); } return true; } catch (Exception e) { e.printStackTrace(); return false; } }/** * 递增 * @param key 键 * @param delta 要增加几(大于0) * @return */ public long incr(String key, long delta) { if (delta < 0) { throw new RuntimeException("递增因子必须大于0"); } return redisTemplate.opsForValue().increment(key, delta); } /** * 递减 * @param key 键 * @param delta 要减少几(小于0) * @return */ public long decr(String key, long delta) { if (delta < 0) { throw new RuntimeException("递减因子必须大于0"); } return redisTemplate.opsForValue().increment(key, -delta); } /** * HashGet * @param key 键 不能为null * @param item 项 不能为null * @return 值 */ public Object hget(String key, String item) { return redisTemplate.opsForHash().get(key, item); } /** * 获取hashKey对应的所有键值 * @param key 键 * @return 对应的多个键值 */ public Map hmget(String key) { return redisTemplate.opsForHash().entries(key); }/** * HashSet * @param key 键 * @param map 对应多个键值 * @return true 成功 false 失败 */ public boolean hmset(String key, Map map) { try { redisTemplate.opsForHash().putAll(key, map); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * HashSet 并设置时间 * @param key 键 * @param map 对应多个键值 * @param time 时间(秒) * @return true成功 false失败 */ public boolean hmset(String key, Map map, long time) { try { redisTemplate.opsForHash().putAll(key, map); if (time > 0) { expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 向一张hash表中放入数据,如果不存在将创建 * @param key 键 * @param item 项 * @param value 值 * @return true 成功 false失败 */ public boolean hset(String key, String item, Object value) { try { redisTemplate.opsForHash().put(key, item, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 向一张hash表中放入数据,如果不存在将创建 * @param key 键 * @param item 项 * @param value 值 * @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间 * @return true 成功 false失败 */ public boolean hset(String key, String item, Object value, long time) { try { redisTemplate.opsForHash().put(key, item, value); if (time > 0) { expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 删除hash表中的值 * @param key 键 不能为null * @param item 项 可以使多个 不能为null */ public void hdel(String key, Object... item) { redisTemplate.opsForHash().delete(key, item); } /** * 判断hash表中是否有该项的值 * @param key 键 不能为null * @param item 项 不能为null * @return true 存在 false不存在 */ public boolean hHasKey(String key, String item) { return redisTemplate.opsForHash().hasKey(key, item); } /** * hash递增 如果不存在,就会创建一个 并把新增后的值返回 * @param key 键 * @param item 项 * @param by 要增加几(大于0) * @return */ public double hincr(String key, String item, double by) { return redisTemplate.opsForHash().increment(key, item, by); } /** * hash递减 * @param key 键 * @param item 项 * @param by 要减少记(小于0) * @return */ public double hdecr(String key, String item, double by) { return redisTemplate.opsForHash().increment(key, item, -by); }// ===============================Set================================= /** * 根据key获取Set中的所有值 * @param key 键 * @return */ public Set sGet(String key) { try { return redisTemplate.opsForSet().members(key); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 根据value从一个set中查询,是否存在 * @param key 键 * @param value 值 * @return true 存在 false不存在 */ public boolean sHasKey(String key, Object value) { try { return redisTemplate.opsForSet().isMember(key, value); } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将数据放入set缓存 * @param key 键 * @param values 值 可以是多个 * @return 成功个数 */ public long sSet(String key, Object... values) { try { return redisTemplate.opsForSet().add(key, values); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 将set数据放入缓存 * @param key 键 * @param time 时间(秒) * @param values 值 可以是多个 * @return 成功个数 */ public long sSetAndTime(String key, long time, Object... values) { try { Long count = redisTemplate.opsForSet().add(key, values); if (time > 0) expire(key, time); return count; } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 获取set缓存的长度 * @param key 键 * @return */ public long sGetSetSize(String key) { try { return redisTemplate.opsForSet().size(key); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 移除值为value的 * @param key 键 * @param values 值 可以是多个 * @return 移除的个数 */ public long setRemove(String key, Object... values) { try { Long count = redisTemplate.opsForSet().remove(key, values); return count; } catch (Exception e) { e.printStackTrace(); return 0; } } // ===============================list================================= /** * 获取list缓存的内容 * @param key 键 * @param start 开始 * @param end 结束 0 到 -1代表所有值 * @return */ public List lGet(String key, long start, long end) { try { return redisTemplate.opsForList().range(key, start, end); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 获取list缓存的长度 * @param key 键 * @return */ public long lGetListSize(String key) { try { return redisTemplate.opsForList().size(key); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 通过索引 获取list中的值 * @param key 键 * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推 * @return */ public Object lGetIndex(String key, long index) { try { return redisTemplate.opsForList().index(key, index); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 将list放入缓存 * @param key 键 * @param value 值 * @return */ public boolean lSet(String key, T value) { try { redisTemplate.opsForList().rightPush(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将list放入缓存 * @param key 键 * @param value 值 * @param time 时间(秒) * @return */ public boolean lSet(String key, T value, long time) { try { redisTemplate.opsForList().rightPush(key, value); if (time > 0) expire(key, time); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将list放入缓存 * @param key 键 * @param value 值 * @return */ public boolean lSet(String key, List value) { try { redisTemplate.opsForList().rightPushAll(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将list放入缓存 * * @param key 键 * @param value 值 * @param time 时间(秒) * @return */ public boolean lSet(String key, List value, long time) { try { redisTemplate.opsForList().rightPushAll(key, value); if (time > 0) expire(key, time); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 根据索引修改list中的某条数据 * @param key 键 * @param index 索引 * @param value 值 * @return */ public boolean lUpdateIndex(String key, long index, T value) { try { redisTemplate.opsForList().set(key, index, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 移除N个值为value * @param key 键 * @param count 移除多少个 * @param value 值 * @return 移除的个数 */ public long lRemove(String key, long count, T value) { try { Long remove = redisTemplate.opsForList().remove(key, count, value); return remove; } catch (Exception e) { e.printStackTrace(); return 0; } }/**------------------zSet相关操作--------------------------------*//** * 添加元素,有序集合是按照元素的score值由小到大排列 * * @param key * @param value * @param score * @return */ public Boolean zAdd(String key, String value, double score) { return redisTemplate.opsForZSet().add(key, value, score); }/** * * @param key * @param values * @return */ public Long zAdd(String key, Set values) { return redisTemplate.opsForZSet().add(key, values); }/** * * @param key * @param values * @return */ public Long zRemove(String key, Object... values) { return redisTemplate.opsForZSet().remove(key, values); }/** * 增加元素的score值,并返回增加后的值 * * @param key * @param value * @param delta * @return */ public Double zIncrementScore(String key, String value, double delta) { return redisTemplate.opsForZSet().incrementScore(key, value, delta); }/** * 返回元素在集合的排名,有序集合是按照元素的score值由小到大排列 * * @param key * @param value * @return 0表示第一位 */ public Long zRank(String key, Object value) { return redisTemplate.opsForZSet().rank(key, value); }/** * 返回元素在集合的排名,按元素的score值由大到小排列 * * @param key * @param value * @return */ public Long zReverseRank(String key, Object value) { return redisTemplate.opsForZSet().reverseRank(key, value); }/** * 获取集合的元素, 从小到大排序 * * @param key * @param start *开始位置 * @param end *结束位置, -1查询所有 * @return */ public Set zRange(String key, long start, long end) { return redisTemplate.opsForZSet().range(key, start, end); }/** * 获取集合元素, 并且把score值也获取 * * @param key * @param start * @param end * @return */ public Set zRangeWithScores(String key, long start, long end) { return redisTemplate.opsForZSet().rangeWithScores(key, start, end); }/** * 根据Score值查询集合元素 * * @param key * @param min *最小值 * @param max *最大值 * @return */ public Set zRangeByScore(String key, double min, double max) { return redisTemplate.opsForZSet().rangeByScore(key, min, max); }/** * 根据Score值查询集合元素, 从小到大排序 * * @param key * @param min *最小值 * @param max *最大值 * @return */ public Set zRangeByScoreWithScores(String key, double min, double max) { return redisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max); }/** * * @param key * @param min * @param max * @param start * @param end * @return */ public Set zRangeByScoreWithScores(String key, double min, double max, long start, long end) { return redisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max, start, end); }/** * 获取集合的元素, 从大到小排序 * * @param key * @param start * @param end * @return */ public Set zReverseRange(String key, long start, long end) { return redisTemplate.opsForZSet().reverseRange(key, start, end); }/** * 获取集合的元素, 从大到小排序, 并返回score值 * * @param key * @param start * @param end * @return */ public Set zReverseRangeWithScores(String key, long start, long end) { return redisTemplate.opsForZSet().reverseRangeWithScores(key, start, end); }/** * 根据Score值查询集合元素, 从大到小排序 * * @param key * @param min * @param max * @return */ public Set zReverseRangeByScore(String key, double min, double max) { return redisTemplate.opsForZSet().reverseRangeByScore(key, min, max); }/** * 根据Score值查询集合元素, 从大到小排序 * * @param key * @param min * @param max * @return */ public Set zReverseRangeByScoreWithScores( String key, double min, double max) { return redisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, min, max); }/** * * @param key * @param min * @param max * @param start * @param end * @return */ public Set zReverseRangeByScore(String key, double min, double max, long start, long end) { return redisTemplate.opsForZSet().reverseRangeByScore(key, min, max, start, end); }/** * 根据score值获取集合元素数量 * * @param key * @param min * @param max * @return */ public Long zCount(String key, double min, double max) { return redisTemplate.opsForZSet().count(key, min, max); }/** * 获取集合大小 * * @param key * @return */ public Long zSize(String key) { return redisTemplate.opsForZSet().size(key); }/** * 获取集合大小 * * @param key * @return */ public Long zZCard(String key) { return redisTemplate.opsForZSet().zCard(key); }/** * 获取集合中value元素的score值 * * @param key * @param value * @return */ public Double zScore(String key, Object value) { return redisTemplate.opsForZSet().score(key, value); }/** * 移除指定索引位置的成员 * * @param key * @param start * @param end * @return */ public Long zRemoveRange(String key, long start, long end) { return redisTemplate.opsForZSet().removeRange(key, start, end); }/** * 根据指定的score值的范围来移除成员 * * @param key * @param min * @param max * @return */ public Long zRemoveRangeByScore(String key, double min, double max) { return redisTemplate.opsForZSet().removeRangeByScore(key, min, max); }/** * 获取key和otherKey的并集并存储在destKey中 * * @param key * @param otherKey * @param destKey * @return */ public Long zUnionAndStore(String key, String otherKey, String destKey) { return redisTemplate.opsForZSet().unionAndStore(key, otherKey, destKey); }/** * * @param key * @param otherKeys * @param destKey * @return */ public Long zUnionAndStore(String key, Collection otherKeys, String destKey) { return redisTemplate.opsForZSet() .unionAndStore(key, otherKeys, destKey); }/** * 交集 * * @param key * @param otherKey * @param destKey * @return */ public Long zIntersectAndStore(String key, String otherKey, String destKey) { return redisTemplate.opsForZSet().intersectAndStore(key, otherKey, destKey); }/** * 交集 * * @param key * @param otherKeys * @param destKey * @return */ public Long zIntersectAndStore(String key, Collection otherKeys, String destKey) { return redisTemplate.opsForZSet().intersectAndStore(key, otherKeys, destKey); }/** * * @param key * @param options * @return */ public Cursor zScan(String key, ScanOptions options) { return redisTemplate.opsForZSet().scan(key, options); }}
对于锁、分布式集合操作、队列等分布式对象或服务,使用Redisson
RedissonClient对锁、集合、队列等分布式操作进行了封装,开发者只需要简单的调用其接口进行操作:
  • 使用Redisson实现Redis分布式锁
    Redisson RLock实现了java.util.concurrent.locks.Lock接口,开发者可以显示调用Lock接口进行相关锁操作
//根据key获取分布式锁对象-普通可重入锁 RLock lock = redissonClient.getLock("redisKey"); //根据key获取分布式锁对象-公平可重入锁 RLock fairLock = redissonClient.getFairLock("redisKey"); //可重入锁 //不指定时间,当加锁成功后会创建一个锁监听看门狗的监听器, // Redisson实例被关闭前,不断的延长锁的有效期。 // 默认情况下,看门狗的检查锁的超时时间是30秒钟 //fairLock.lock(); lock.lock(); try { //... } finally { //fairLock.unlock(); lock.unlock(); //需要显示释放锁 }//2. 显示指定加锁时间 // 加锁以后10秒钟自动解锁 // 无需调用unlock方法手动解锁 //fairLock.lock(10, TimeUnit.SECONDS); lock.lock(10, TimeUnit.SECONDS); try { //... } finally { //fairLock.unlock(); lock.unlock(); //需要显示释放锁 }//3. 尝试获取锁 // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁 boolean res = false; try { //res = fairLock.tryLock(100, 10, TimeUnit.SECONDS); res = lock.tryLock(100, 10, TimeUnit.SECONDS); if (res) {} } catch (InterruptedException e) { e.printStackTrace(); } finally { if (res) { //fairLock.unlock(); lock.unlock(); //需要显示释放锁 }}

  • 使用Redisson实现集合操作
@Autowired private RedissonClient redissonClient; //List操作 //RList Java对象在实现了java.util.List,同时确保了插入顺序 RList nameList = client.getList("redisKey"); nameList.clear(); nameList.add("bingo"); nameList.add("yanglbme"); nameList.add("https://github.com/redisson"); nameList.remove(-1); boolean contains = nameList.contains("redisson"); nameList.forEach(System.out::println); //Hash表 //RMap对象实现了`java.util.concurrent.ConcurrentMap`接口和`java.util.Map`接/口。与HashMap不同的是,RMap保持了元素的插入顺序。 RMap map = redisson.getMap("redisKey"); SomeObject prevObject = map.put("redisson1", new SomeObject()); SomeObject currentObject = map.putIfAbsent("redisson2", new SomeObject()); SomeObject obj = map.remove("redisson1"); map.fastPut("redisson1", new SomeObject()); map.fastRemove("redisson2"); //锁定map内元素 RLock keyLock = map.getLock("mapKey"); keyLock.lock(); try { SomeObjectv = map.get("mapKey"); // 其他业务逻辑 } finally { keyLock.unlock(); } //读写锁 RReadWriteLock rwLock = map.getReadWriteLock("mapKey"); rwLock.readLock().lock(); try { MyValue v = map.get("mapKey"); // 其他业务逻辑 } finally { keyLock.readLock().unlock(); }

  • 使用Redisson实现延迟队列
//延迟队列工具类 @Component @Slf4j public class RedisDelayedQueue {@Autowired RedissonClient redissonClient; /** * 任务回调监听 * * @param */ public abstract static class TaskEventListener {/** * 执行方法 * * @param t */ public abstract void invoke(T t); }/** * 添加队列 * * @param tDTO传输类 * @param delay时间数量 * @param timeUnit 时间单位 */ public void addQueue(T t, long delay, TimeUnit timeUnit) { RBlockingQueue blockingFairQueue = redissonClient.getBlockingQueue(t.getClass().getName()); RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue); delayedQueue.offer(t, delay, timeUnit); delayedQueue.destroy(); }/** * 获取队列 * * @param zClassDTO泛型 * @param taskEventListener 任务回调监听 * @return */ public void getQueue(Class zClass, TaskEventListener taskEventListener) { RBlockingQueue blockingFairQueue = redissonClient.getBlockingQueue(zClass.getName()); ((Runnable) () -> { while (true) { try { T t = (T) blockingFairQueue.take(); taskEventListener.invoke(t); } catch (InterruptedException e) { log.error("获取队列异常,原因:{}", e); } } }).run(); } } //添加延迟队列监听 @Component @Slf4j public class NotifySubscribeListener implements CommandLineRunner {@Autowired private RedisDelayedQueue redisDelayedQueue; @Autowired private EmsEventQueueHelper emsEventQueueHelper; @Override public void run(String... args) throws Exception {//监听延迟队列 RedisDelayedQueue.TaskEventListener taskEventListener = new RedisDelayedQueue.TaskEventListener() {@Override public void invoke(Object o) { EmsEventInfo info = (EmsEventInfo) o; log.info("延迟队列执行任务:{}", info.toString()); emsEventQueueHelper.publishEvent(info); }}; redisDelayedQueue.getQueue(EmsEventInfo.class, taskEventListener); } }//队列入列 //省略...... @Autowired private RedisDelayedQueue redisDelayedQueue; //省略...... //入列 redisDelayedQueue.addQueue(info, emsTmpProperties.getNotifyTime(), TimeUnit.MINUTES);

【分布式场景Redis客户端最优解决方案-lettuce|分布式场景Redis客户端最优解决方案-lettuce + Redisson】更多用法参考Redisson官网https://github.com/redisson/redisson

    推荐阅读