kotiln|Redis Lettuce在Cluster模式下使用multi

背景 在redis cluster模式下,基于某些redis业务为了保证redis command的原子性又要提高程序的高并发性能,从而直接使用redis事务可能会更好的满足的我们的要求,但是直接使用redis cluster的连接是不支持事务的,也就是multi模式,单是4.0以上的lettuce官网说是支持multi模式的,经过多次实践研究终于得出了一种可以在redis cluster模式下使用multi的方案。
引用jar

implementation 'io.lettuce:lettuce-core:5.2.2.RELEASE' implementation 'org.apache.commons:commons-pool2:2.8.0'

使用 redis config file,YMAL格式
#redis config redis: timeout: 3000 password: password cluster: nodes: - 172.168.0.1:7001 - 172.168.0.1:7002 - 172.168.0.2:7001 - 172.168.0.2:7002 - 172.168.0.3:7001 - 172.168.0.3:7002 #pool config lettuce: maxIdle: 8 minIdle: 0 maxTotal: 10

初始化redis cluster:代码我们选择kotlin,转换成java也可以
class RedisLettuceClient( private val redisConfig: RedisConfig = RedisConfig(), ) { private val redisCluster = clusterClient()private val redisPool = createPool()/** * load redis cluster config */ private fun clusterClient(): RedisClusterClient? { val nodes = ArrayList().apply { redisConfig.cluster.nodes.forEach { it.split(":").let { url -> if (url.size == 2) add(RedisURI.builder() .withHost(url[0]) .withPort(Integer.valueOf(url[1])) .withPassword(redisConfig.password) .build()) } } }val clusterClient = RedisClusterClient.create(nodes)val topologyRefreshOptions = ClusterTopologyRefreshOptions.builder() .enableAdaptiveRefreshTrigger( ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT, ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS ) .adaptiveRefreshTriggersTimeout(Duration.ofMinutes(3)) .build()clusterClient.setOptions(ClusterClientOptions.builder() .topologyRefreshOptions(topologyRefreshOptions) .autoReconnect(true) .pingBeforeActivateConnection(true) .build())clusterClient.connect().async().set("neuralyzer", "").get()return clusterClient }/** * create redis connection pool */ private fun createPool() = AsyncConnectionPoolSupport.createBoundedObjectPool( { redisCluster?.connectAsync(StringCodec.UTF8) }, BoundedPoolConfig.builder() .maxIdle(redisConfig.lettuce.maxIdle) .maxTotal(redisConfig.lettuce.maxTotal) .minIdle(redisConfig.lettuce.minIdle) .build() ) }

使用multi的分析
在我们根据redisPool.acquire().get().async()拿到RedisAdvancedClusterAsyncCommands ,他就是lettuce对async提供的使用redis的命令集,看源码后发现这个类及其父类中根本没有使用multi的方式,但是luttuce管网说已经支持,那到底是什么情况呢?既然直接连接cluster不可以那么我们直接连接到单台的redis总可以了吧?的确是可以的,但是又会出现一种问题,先看代码:
private fun test() { val commands =redisCluster?.connect()?.getConnection("127.0.0.1", 7001)?.async() commands?.let { command -> command.multi() command.setex("test", 10, "123") command.get("test") val result = command.exec() } }

看代码我们直接取一个node的连接看来的确是可以使用multi模式的,我们测试下:
(error) MOVED 5798 127.0.0.1:7002
出现了这样的异常。原来是redis cluster模式下,对每个key存的节点它内部都有自己的算法,所以你的这个可以不一定是落在你连接的这个节点上,而且cluster只有master才能写,针对这样的情况我们分析出在cluster模式下要是用multi的步骤:
  1. 计算出我们的key要落在那一个节点上
  2. 然后在在pool中获取当前节点的连接
  3. 最后我们就可以使用multi了
成功使用
/** * redis command interface */ private inline fun commands(block: RedisAdvancedClusterAsyncCommands.() -> R): R { val con = redisPool.acquire().get() con.use { con -> return block(con.async()) }} private fun getNodeByKey(key: String): RedisURI { commands { //获取key所落的slot val slot = clusterKeyslot(key).get() //获取cluster所有的slot val slots = clusterSlots().get() slots.forEach { val slotInfo = it as List<*> if (slot > slotInfo[0] as Long && slot < slotInfo[1] as Long) { //获取slot的master host return try { val nodeInfo = slotInfo[2] as List<*> clusterSlaves(nodeInfo[2].toString())RedisURI.create(nodeInfo[0].toString(), nodeInfo[1].toString().toInt()) } catch (e: Exception) { val nodeInfo = slotInfo[3] as List<*> RedisURI.create(nodeInfo[0].toString(), nodeInfo[1].toString().toInt()) } } } } //自定义exception throw RedisException() }//根据key的slot使用redis事务 private fun setNXByMulti(node: RedisURI, key: String, value: String, ttl: Long): RedisFuture { val command = redisPool.acquire().get().getConnectionAsync(node.host, node.port).get().async() command.multi() command.set(key, value, SetArgs.Builder.nx().ex(ttl)) command.decr(key) return command.exec() } }

【kotiln|Redis Lettuce在Cluster模式下使用multi】Test
val node = getNodeByKey(key) setNXByMulti(node, key, value, ttl)


    推荐阅读