Canal|Canal adapter 同步 ElasticSearch 记录

之前写过一篇介绍 canal 的文章《mysql增量同步 - canal》,在数据同步的部分,主要着重演示了在代码中通过 canal.client 来同步。当时也有提到 canal adapter,但并未详述。最近愈多地接触 elasticsearch 项目的开发,趁着假期试着做了个Demo,顺便记下笔记。
1. canal 介绍 1.1. 三兄弟简介
canal 对应包的下载和安装的教程,都直接看 canal官方github,安装包目前有三兄弟:

  • canal deployer:又称 canal server,是真正监听 mysql 日志的服务端。
  • canal adapter:顾名思义“适配器”,搭配 canal server,目前能实现mysql 数据到 hbase、rdb、es的增量同步,妥妥的 ETL 工具。
  • canal admin:也是为 canal server 服务的,为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面。如果 canal server 要搭建集群环境,必少不了 canal admin 这样专业的运维工具。
对于不太逛github的人,把文档也贴上:
  • wiki 文档
  • release下载包
1.2. canal adapter
它既然是适配器,那么就得介绍“源头”和“目标”这两个部位数据的对接:
  • 源头:(1)canal adapter 可以直连 canal server ,消费 instance的数据;(2)也可以在让 canal server 将数据投递到 MQ,然后 cancal adapter 消费 MQ 中的数据。
  • 目标:目前支持 hbase、rdb、es,后续将支持 mongodb、redis等。
本文实现的较简单,数据流向包括:mysql -> canal server -> canal adapter -> es
2. 数据准备 2.1. mysql 建表
开启 binlog 日志的部分查看之前的文章。准备两张表:
-- 员工表 CREATE TABLE `hr_user` ( `id` char(32) NOT NULL COMMENT '主键', `username` varchar(50) DEFAULT NULL COMMENT '账号', `fullname` varchar(50) DEFAULT NULL COMMENT '姓名', `sex` tinyint DEFAULT NULL COMMENT '性别 0-男/1-女', `birthday` date DEFAULT NULL COMMENT '生日', `dept_id` char(32) DEFAULT NULL COMMENT '所属部门ID', `deleted` tinyint DEFAULT NULL COMMENT '是否已删除 0-否/1-是', `created_by` char(32) DEFAULT NULL COMMENT '创建人ID', `create_time` datetime DEFAULT NULL COMMENT '创建时间', `updated_by` char(32) DEFAULT NULL COMMENT '更新人ID', `updated_time` datetime DEFAULT NULL COMMENT '更新时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- 部门表 CREATE TABLE `hr_dept` ( `id` char(32) NOT NULL COMMENT '主键', `dept_name` varchar(50) DEFAULT NULL COMMENT '部门名称', `manager_name` varchar(50) DEFAULT NULL COMMENT '部门经理姓名', `parent_id` char(32) DEFAULT NULL COMMENT '父级部门ID', `dept_path` varchar(1000) DEFAULT NULL COMMENT '部门路径', `deleted` tinyint DEFAULT NULL COMMENT '是否已删除 0-否/1-是', `created_by` char(32) DEFAULT NULL COMMENT '创建人ID', `create_time` datetime DEFAULT NULL COMMENT '创建时间', `updated_by` char(32) DEFAULT NULL COMMENT '更新人ID', `updated_time` datetime DEFAULT NULL COMMENT '更新时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

2.2. 安装 es、kibana
es docker shell
docker run -d \ --name elasticsearch \ --restart=on-failure:3 \ -p 9200:9200 \ -p 9300:9300 \ -e "discovery.type=single-node" \ -v /Volumes/elasticsearch/data/:/usr/share/elasticsearch/data/ \ -v /Volumes/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \ -v /Volumes/elasticsearch/plugins/:/usr/share/elasticsearch/plugins/ \ elasticsearch:7.9.3

kibana docker shell
docker run -d \ --name kibana \ --link elasticsearch:es \ -p 5601:5601 \ -e ELASTICSEARCH_URL=es:9200 \ kibana:7.9.3

2.3. 创建索引
在 kibana -> Management -> Dev Tools 执行创建索引 user:
PUT user
{ "mappings":{ "properties":{ "birthday":{ "type":"date", "format":"yyyy-MM-dd" }, "dept_id":{ "type":"keyword" }, "dept_name":{ "type":"text", "analyzer":"ik_max_word" }, "dept_updated_time":{ "type":"date" }, "fullname":{ "type":"text", "analyzer":"ik_max_word" }, "sex":{ "type":"byte" }, "user_id":{ "type":"keyword" }, "user_updated_time":{ "type":"date" }, "username":{ "type":"text" } } } }

3. canal 配置 目前最新的 release 版本是 1.1.6-alpha-1,这里都只下载该版本的 canal-deployer 、canal-adapter 两个压缩包,在本地解压下来各自对应一个目录。canal-admin 就不安装了,前面两个暂时就够用了。
3.1. canal server
canal server 的安装配置其实 《mysql增量同步 - canal》 文章中就有了,还是简单列一下。
因为不做 canal server 将数据投递到 MQ,所以关注conf/example/instance.properties 的下面参数即可:
canal.instance.master.address = 127.0.0.1:3306 canal.instance.dbUsername = canal canal.instance.dbPassword = canal

配置里面默认启动的是叫 example 的 instance,所以启动脚本和查看日志对应下面:
# 启动 sh bin/startup.sh # 关闭 sh bin/stop.sh # 查看具体实例日志 tail -500f logs/example/example.log

3.2. canal adapter
所有 adapter 的配置参考 adapter同步es的wiki
1. 修改 conf/application.yml
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_nullcanal.conf: mode: tcp #tcp kafka rocketMQ rabbitMQ flatMessage: true zookeeperHosts: syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: consumerProperties: # canal tcp consumer canal.tcp.server.host: 127.0.0.1:11111 canal.tcp.zookeeper.hosts: canal.tcp.batch.size: 500 canal.tcp.username: canal.tcp.password: srcDataSources: defaultDS: url: jdbc:mysql://127.0.0.1:3306/es?useUnicode=true username: canal password: canal canalAdapters: - instance: example groups: - groupId: g1 outerAdapters: - name: es7 hosts: http://127.0.0.1:9200 properties: mode: rest cluster.name: docker-cluster

因为是直连 canal server,所以 mode: tcp,没有选择其他 mq。其他的就是配置 canal server、mysql、es 的连接信息。
2. 新增 conf/es7/user.yml
因为需要做 mysql 往 es中 user 索引的同步,就在 es7中添加一个 user.yml 文件,因为前面 conf/application.yml 中配置了适配器加载路径 es7,所以默认会加载这个目录下所有 yml 文件。
user.yml
dataSourceKey: defaultDS destination: example groupId: g1 esMapping: _index: user _id: user_idsql: "SELECT u.id AS user_id, u.username, u.fullname, u.sex, u.birthday, u.dept_id, d.dept_name, u.updated_time as user_updated_time, d.updated_time as dept_updated_time FROM hr_user u LEFT JOIN hr_dept d ON u.dept_id = d.id"etlCondition: "where u.deleted = 0 AND d.deleted = 0" commitBatch: 3000

还是比较一目了然的,配置了往 es 中 user 索引同步的数据来源sql。不过是有一定规范要求的,具体规范要求,还是要看前面发的官方wiki文档。
3. 启动
# 启动 sh bin/startup.sh # 关闭 sh bin/stop.sh # 查看适配器日志 tail -500f logs/adapter/adapter.log

如果能看到 canal server、canal adapter 的日志都没报错信息,那就可以了。
4. 验证 为了方便查看 es 中的数据,在 kibana 中将 user索引添加到 Discover 中。在 Kibana -> Management -> Stack Management -> Kibana -> Index patterns -> Create index pattern,添加 user索引。然后回到 Discover 就能看到对应索引中的数据了。
在 mysql 中对应表中各自新增一条数据:
-- hr_dept INSERT INTO hr_dept (id,dept_name,manager_name,parent_id,dept_path,deleted,created_by,create_time,updated_by,updated_time) VALUES ('9ef57211ca3311ec8fe00242ac110004','中台研发部','罗永浩','66ab59dbcabf11ec8fe00242ac110004','研发中心>平台架构部>TPaaS研发部',0,NULL,now(),NULL,now()); -- hr_user INSERT INTO hr_user (id,username,fullname,sex,birthday,dept_id,deleted,created_by,create_time,updated_by,updated_time) VALUES ('b7205315cac811ec8fe00242ac110004','zhangsan','张三',0,'1995-02-18','9ef57211ca3311ec8fe00242ac110004',0,NULL,now(),NULL,now());

在 kibana 中就能看到对应 es索引中也新增了一条数据,对应的日志在 canal adapter adapter.log 日志中也能看到。
然后无论我们单独修改 hr_user 表,还是只是修改了 hr_dept 表中的 dept_name 字段,es 中对应那条的文档也会随之修改。
【Canal|Canal adapter 同步 ElasticSearch 记录】Canal|Canal adapter 同步 ElasticSearch 记录
文章图片

    推荐阅读