《微服务项目相关》|使用canal解决Mysql和Redis数据同步问题

前言 千呼万唤始出来,停了好个月,终于又开始动手写文章了,今天带给大家的是阿里的一个工具Canal,这个工具是企业做数据同步使用的比较多的方案,希望对你有所帮助,喜欢的话请给个好评
工作原理分析 我们在面试的时候常常听面试官问这么一个问题:你们的Mysql和Redis怎么做数据同步的,根据不同的业务场景又很多方案,你可能会说先写库再删缓存,或者延迟双删或其他方案。今天我要给大家分享的就是比较成熟的方案-使用Canal实现Mysql和Redis数据的同步。
我不知道你是否了解Mysql主从,根据2/8原则,80%的性能问题都在读上面,当我们数据库的读并发较大的时候,我们可以使用Mysql主从来分担读的压力。它的原理是所有的写操作在主库上,读操作在从库上,当然主库也可以承担读请求,而从库的数据则通过主库复制而来,Mysql自带主从复制的功能。如下图
《微服务项目相关》|使用canal解决Mysql和Redis数据同步问题
文章图片

主从复制步骤:

  1. 将Master的binary-log日志文件打开,mysql会把所有的DDL,DML,TCL写入BinaryLog日志文件中
  2. Master会生成一个 log dump 线程,用来给从库的 i/o线程传binlog
  3. 从库的i/o线程去请求主库的binlog,并将得到的binlog日志写到中继日志(relaylog)中
  4. 从库的sql线程,会读取relaylog文件中的日志,并解析成具体操作,通过主从的操作一致,而达到最终数据一致
而Canal的原理就是伪装成Slave从Binlog中复制SQL语句或者数据。
Mysql和Redis数据同步方案 根据上面所说,我们就可以通过Canal去自动同步数据库的binlog数据日志文件,然后再把数据同步到Redis,从而达到Mysql和Redis自动同步的功能。很遗憾的是Canal没办法直接把数据库同步到Redis,它支持的是组件有 : mysql、Kafka、ElasticSearch、Hbase、RocketMQ等
《微服务项目相关》|使用canal解决Mysql和Redis数据同步问题
文章图片

当然 canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑
  • canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample
  • canal c# 客户端: https://github.com/dotnetcore/CanalSharp
  • canal go客户端: https://github.com/CanalClient/canal-go
  • canal Python客户端: https://github.com/haozi3156666/canal-python
canal 作为 MySQL binlog 增量获取和解析工具,可将变更记录投递到 MQ 系统中,比如 Kafka/RocketMQ,可以借助于 MQ 的多语言能力,因此我们可以使用下面这种方案来同步数据
《微服务项目相关》|使用canal解决Mysql和Redis数据同步问题
文章图片

这里对流程做一个解释
  1. 首选我们需要开启Mysql的bin-log日志文件
  2. 接着安装好Canal去同步bin-log日志,同时配置RocketMQ地址,Canal会把消息推送给MQ
  3. 需要编写Java客户端去监听MQ中的消息,然后往Redis中进行同步
开启Mysql bin-log日志 找到Mysql安装目录中的my.ini 配置文件,我以mysql 5.5为例,在 mysqld 下做如下配置
[mysqld] #开启bInlog log-bin=mysql-bin #给mysql服务指定一个唯一的ID server-id=1 #以数据的方式写binlog日志 :statement 是记录SQL,row是记录数据 binlog-format=ROW #同步的数据库名 binlog-do-db=canaldb #忽略的表 binlog-ignore-db=mysql # 启动mysql时不启动grant-tables授权表 skip-grant-tables

修改好之后,重启Mysql服务。注意:我这里指定了需要同步的数据库为canaldb,所以需要创建一个数据库,同时创建了一个employee表作为演示
《微服务项目相关》|使用canal解决Mysql和Redis数据同步问题
文章图片

然后创建一个用户提供给canal来链接Mysql做数据同步
flush privileges; #创建用户cannal CREATE USER canal IDENTIFIED BY 'canal'; #把所有权限赋予canal,密码也是canal GRANT ALL PRIVILEGES ON canaldb.user TO 'canal'@'%' identified by "canal"; //GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' identified by "canal"; #刷新权限 flush privileges;

到这,Mysql部分就搞定了
安装Canal 去官网下载 Canal : https://github.com/alibaba/canal/releases ,我使用的是canal.deployer-1.1.5.tar.gz版本《微服务项目相关》|使用canal解决Mysql和Redis数据同步问题
文章图片

下载好之后解压,目录结构如下
《微服务项目相关》|使用canal解决Mysql和Redis数据同步问题
文章图片

接下来修改instance 配置文件 : conf/example/instance.properties
#按需修改成自己的数据库信息 ################################################# ... #我的端口是3307 canal.instance.master.address=192.168.1.20:3307 # username/password,数据库的用户名和密码 ... #刚才开通的mysql的账户密码 canal.instance.dbUsername = canal canal.instance.dbPassword = canal ... ################################################## mq config 数据同步到MQ中的topic名字 canal.mq.topic=example # 针对库名或者表名发送动态topic #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #库名.表名: 唯一主键,多个表之间用逗号分隔 #canal.mq.partitionHash=mytest.person:id,mytest.role:id

这里注意如下几个东西,其他的不用管
  • master.address :Mysql的地址,我的端口是3307,默认是3306
  • dbUsername :上面开通的Mysql用户
  • dbPassword : 密码
  • canal.mq.topic=example : 数据同步到MQ中的topic名字
接着修改canal 配置文件 conf/canal.properties
# ... # 可选项: tcp(默认), kafka, RocketMQ # 这里使用RocketMQ canal.serverMode = rocketMQ # ... ################################################## #########RocketMQ############# ################################################## rocketmq.producer.group = test rocketmq.enable.message.trace = false rocketmq.customized.trace.topic = rocketmq.namespace = # RocketMQ的地址 rocketmq.namesrv.addr = 127.0.0.1:9876 rocketmq.retry.times.when.send.failed = 0 rocketmq.vip.channel.enabled = false rocketmq.tag =

【《微服务项目相关》|使用canal解决Mysql和Redis数据同步问题】这里需要注意2个东西
  • canal.serverMode = rocketMQ : 我这里以RocketMQ为例
  • rocketmq.namesrv.addr = 127.0.0.1:9876 : 指向RocketMQ的地址
配置好之后,找到 canal 安装目录下 bin目录下的 startup.bat 双击启动,linux上启动:startup.sh
《微服务项目相关》|使用canal解决Mysql和Redis数据同步问题
文章图片

对于RocketMQ没使用过的童鞋可以看我《RocketMQ极简入门专题》,启动Canal和RocketMQ之后,尝试往employee表中增加数据,观察MQ控制台是否有数据同步,访问http://localhost:8080/#/message 如下
《微服务项目相关》|使用canal解决Mysql和Redis数据同步问题
文章图片

点击 MESSAGE DETAIL 如下
《微服务项目相关》|使用canal解决Mysql和Redis数据同步问题
文章图片

数据是JSON格式存储到MQ中的 , 这里可以看到数据同步于哪个数据库,哪个表,以及数据内容以及old老数据。注意这里的内容格式,后面我们需要封装对象
Java程序把数据同步到Redis 接下来需要编写一个Java程序来消费MQ中的消息同步到Redis ,创建项目,导入依赖
org.springframework.boot spring-boot-starter-parent 2.2.5.RELEASE org.springframework.boot spring-boot-starter-web org.apache.rocketmq rocketmq-spring-boot-starter 2.0.4 org.springframework.boot spring-boot-starter-data-redis io.lettuce lettuce-core redis.clients jedis

编写yml配置MQ和Redis
#..其他省略.. spring: redis: database: 0 host: 127.0.0.1 port: 6379 password: 123456 jedis: pool: max-wait: 2000ms min-idle: 2 max-idle: 8 rocketmq: name-server: 127.0.0.1:9876 # 是否开启自动配置 producer: enable-msg-trace: true group: "service-producer" # 消息最大长度 默认 1024 * 4 (4M) max-message-size: 4096 # 发送消息超时时间,默认 3000 send-message-timeout: 3000 # 发送消息失败重试次数,默认2 retry-times-when-send-failed: 2 retry-times-when-send-async-failed: 2

编写实体类来封装MQ中的消息 , 注意: 下面的 CanalSynDto对象是根据MQ中的消息内容进行封装的,你可以把数据复制到在线JSON转换工具中进行分析。
@Data @NoArgsConstructor @AllArgsConstructor public class Employee { private Long id; private String username; }@AllArgsConstructor @NoArgsConstructor @Data public class CanalSynDto { private List data; private String database; private String table; private String type; //省略了一些不重要的内容 }

编写MQ消费者代码,把MQ消息封装成 CanalSynDto 对象,然后取到data数据,再根据SQL的类型(insert,delete,update)对Redis进行数据同步
@Slf4j @Component //对应了canal的instance.properties 中的canal.mq.topic=example @RocketMQMessageListener(topic = "example", //TOPIC主题, selectorExpression="*" //tag标签 ,consumerGroup = "canal-syn-consumer" ,messageModel = MessageModel.CLUSTERING ) public class CanalSynListennerimplements RocketMQListener { //注入Redis API @Autowired private RedisTemplate redisTemplate; @Override public void onMessage(MessageExt message) { try { //拿到MQ中的消息内容 String json = new String(message.getBody(), "utf-8"); //把数据转为实体类 CanalSynDto canalSynDto = JSON.parseObject(json, CanalSynDto.class); log.info("canal同步 {}", canalSynDto); //如果是INSERT或者UPDATE,直接往Redis添加 if(canalSynDto.getType().equals("INSERT") || canalSynDto.getType().equals("UPDATE")){ //insert就添加,update就覆盖 canalSynDto.getData().forEach(employee -> { //以ID为key,把对象存储到Redis中 redisTemplate.opsForValue().set("ID:"+employee.getId(),employee); }); //删除命令 }else if (canalSynDto.getType().equals("DELETE")){ canalSynDto.getData().forEach(employee -> { //以ID为key,把对象从Redis中删除 redisTemplate.delete("ID:"+employee.getId()); }); }} catch (UnsupportedEncodingException e) { e.printStackTrace(); } } }

到这里代码就写完了,启动SpringBoot程序,对employee表中数据进行修改或者删除,观察Redis中的数据变化 :Redis始终会随着Mysql变化而变化
《微服务项目相关》|使用canal解决Mysql和Redis数据同步问题
文章图片

我们的效果达到了,弱弱的问一句:Mysql实现ElasticSearch的数据同步你有方案吗?可以在评论区说出你的见解
文章结束,喜欢的话请一定给个好评哦,你的鼓励是我最大的动力.

    推荐阅读