背景 在redis cluster模式下,基于某些redis业务为了保证redis command的原子性又要提高程序的高并发性能,从而直接使用redis事务可能会更好的满足的我们的要求,但是直接使用redis cluster的连接是不支持事务的,也就是multi模式,单是4.0以上的lettuce官网说是支持multi模式的,经过多次实践研究终于得出了一种可以在redis cluster模式下使用multi的方案。
引用jar
implementation 'io.lettuce:lettuce-core:5.2.2.RELEASE'
implementation 'org.apache.commons:commons-pool2:2.8.0'
使用 redis config file,YMAL格式
#redis config
redis:
timeout: 3000
password: password
cluster:
nodes:
- 172.168.0.1:7001
- 172.168.0.1:7002
- 172.168.0.2:7001
- 172.168.0.2:7002
- 172.168.0.3:7001
- 172.168.0.3:7002
#pool config
lettuce:
maxIdle: 8
minIdle: 0
maxTotal: 10
初始化redis cluster:代码我们选择kotlin,转换成java也可以
class RedisLettuceClient(
private val redisConfig: RedisConfig = RedisConfig(),
) {
private val redisCluster = clusterClient()private val redisPool = createPool()/**
* load redis cluster config
*/
private fun clusterClient(): RedisClusterClient? {
val nodes = ArrayList().apply {
redisConfig.cluster.nodes.forEach {
it.split(":").let { url ->
if (url.size == 2)
add(RedisURI.builder()
.withHost(url[0])
.withPort(Integer.valueOf(url[1]))
.withPassword(redisConfig.password)
.build())
}
}
}val clusterClient = RedisClusterClient.create(nodes)val topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
.enableAdaptiveRefreshTrigger(
ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT,
ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS
)
.adaptiveRefreshTriggersTimeout(Duration.ofMinutes(3))
.build()clusterClient.setOptions(ClusterClientOptions.builder()
.topologyRefreshOptions(topologyRefreshOptions)
.autoReconnect(true)
.pingBeforeActivateConnection(true)
.build())clusterClient.connect().async().set("neuralyzer", "").get()return clusterClient
}/**
* create redis connection pool
*/
private fun createPool() =
AsyncConnectionPoolSupport.createBoundedObjectPool(
{ redisCluster?.connectAsync(StringCodec.UTF8) },
BoundedPoolConfig.builder()
.maxIdle(redisConfig.lettuce.maxIdle)
.maxTotal(redisConfig.lettuce.maxTotal)
.minIdle(redisConfig.lettuce.minIdle)
.build()
)
}
使用multi的分析
在我们根据redisPool.acquire().get().async()拿到RedisAdvancedClusterAsyncCommands
private fun test() {
val commands =redisCluster?.connect()?.getConnection("127.0.0.1", 7001)?.async()
commands?.let { command ->
command.multi()
command.setex("test", 10, "123")
command.get("test")
val result = command.exec()
}
}
看代码我们直接取一个node的连接看来的确是可以使用multi模式的,我们测试下:
(error) MOVED 5798 127.0.0.1:7002出现了这样的异常。原来是redis cluster模式下,对每个key存的节点它内部都有自己的算法,所以你的这个可以不一定是落在你连接的这个节点上,而且cluster只有master才能写,针对这样的情况我们分析出在cluster模式下要是用multi的步骤:
- 计算出我们的key要落在那一个节点上
- 然后在在pool中获取当前节点的连接
- 最后我们就可以使用multi了
/**
* redis command interface
*/
private inline fun commands(block: RedisAdvancedClusterAsyncCommands.() -> R): R {
val con = redisPool.acquire().get()
con.use { con ->
return block(con.async())
}} private fun getNodeByKey(key: String): RedisURI {
commands {
//获取key所落的slot
val slot = clusterKeyslot(key).get()
//获取cluster所有的slot
val slots = clusterSlots().get()
slots.forEach {
val slotInfo = it as List<*>
if (slot > slotInfo[0] as Long && slot < slotInfo[1] as Long) {
//获取slot的master host
return try {
val nodeInfo = slotInfo[2] as List<*>
clusterSlaves(nodeInfo[2].toString())RedisURI.create(nodeInfo[0].toString(), nodeInfo[1].toString().toInt())
} catch (e: Exception) {
val nodeInfo = slotInfo[3] as List<*>
RedisURI.create(nodeInfo[0].toString(), nodeInfo[1].toString().toInt())
}
}
}
}
//自定义exception
throw RedisException()
}//根据key的slot使用redis事务
private fun setNXByMulti(node: RedisURI, key: String, value: String, ttl: Long): RedisFuture {
val command = redisPool.acquire().get().getConnectionAsync(node.host, node.port).get().async()
command.multi()
command.set(key, value, SetArgs.Builder.nx().ex(ttl))
command.decr(key)
return command.exec()
}
}
【kotiln|Redis Lettuce在Cluster模式下使用multi】Test
val node = getNodeByKey(key)
setNXByMulti(node, key, value, ttl)
推荐阅读
- Java|Java基础——数组
- 人工智能|干货!人体姿态估计与运动预测
- java简介|Java是什么(Java能用来干什么?)
- Java|规范的打印日志
- Linux|109 个实用 shell 脚本
- 程序员|【高级Java架构师系统学习】毕业一年萌新的Java大厂面经,最新整理
- Spring注解驱动第十讲--@Autowired使用
- SqlServer|sql server的UPDLOCK、HOLDLOCK试验
- jvm|【JVM】JVM08(java内存模型解析[JMM])
- 技术|为参加2021年蓝桥杯Java软件开发大学B组细心整理常见基础知识、搜索和常用算法解析例题(持续更新...)