elasticsearch|使用canal实时同步MySQL数据到es


使用canal实时同步MySQL数据到es

  • 软件版本信息
  • 安装 elasticsearch
  • 安装 kibana
  • 下载和安装canal
    • 1、下载canal
    • 2、配置MySQL
    • 3、配置 canal-server (canal-deploy)
    • 4、配置 canal-adapter
    • 5、配置canal-admin
  • 同步MySQL到es
    • 1、创建es索引
    • 2、在MySQL数据库中创建数据
    • 3、修改MySQL数据
    • 4、对MySQL数据进行删除操作
  • 常规错误解决

软件版本信息 由于不同版本的MySQLElasticsearchcanal会有兼容性问题,所以我们先对其使用版本做个约定。
软件 版本
操作系统 windows 10
elasticsearch 7.16.2
kibana 7.16.2
canal 1.1.5
MySQL 5.7
jdk 1.8.0
elasticsearch下载地址:https://www.elastic.co/cn/downloads/past-releases#elasticsearch
kibana下载地址:https://www.elastic.co/cn/downloads/past-releases#kibana
canal下载地址:https://github.com/alibaba/canal/releases
安装 elasticsearch
  1. 根据自己所需下载对应 elasticsearch 版本,我这里使用的是 7.16.2 版本
    下载地址:https://www.elastic.co/cn/downloads/past-releases#elasticsearch
    elasticsearch|使用canal实时同步MySQL数据到es
    文章图片
  2. 下载完成后解压 elasticsearch-7.16.2-windows-x86_64.zip 压缩包到自己本地路径上,解压后目录如下:
    elasticsearch|使用canal实时同步MySQL数据到es
    文章图片
  3. 进入\elasticsearch-7.16.2\config 路径,修改 elasticsearch.yml 文件
    #配置es集群名称:es cluster.name: es #配置es端口:9200 http.port: 9200 #在文件中末尾处添加下面参数,开启x-pack验证 xpack.security.enabled: true xpack.license.self_generated.type: basic xpack.security.transport.ssl.enabled: true #关闭geoip数据库的更新,要不然启动es会报错 ingest.geoip.downloader.enabled: false

  4. 设置 elasticsearch 账号密码
    打开cmd命令符,进入目录 \elasticsearch-7.16.2\bin ,执行设置用户名和密码的命令,这里需要为6个用户分别设置密码,elastic, apm_system, kibana_system, logstash_system, beats_system, remote_monitoring_user
    #首先启动elasticsearch,执行命令 elasticsearch.bat #另外打开命令符进入 \bin 路径,执行下面命令设置用户密码:123456 elasticsearch-setup-passwords interactive

    下图表示密码设置成功,重启 elasticsearch
    elasticsearch|使用canal实时同步MySQL数据到es
    文章图片
  5. 测试启动是否成功
    在浏览器输入以下地址:http://127.0.0.1:9200/,输入用户:elastic,密码:123456
    看到以下内容表示启动成功:
    { "name" : "DESKTOP-M1CKQ7R", "cluster_name" : "es", "cluster_uuid" : "8gUkj244QD2gkTcrRNhbyw", "version" : { "number" : "7.16.2", "build_flavor" : "default", "build_type" : "zip", "build_hash" : "2b937c44140b6559905130a8650c64dbd0879cfb", "build_date" : "2021-12-18T19:42:46.604893745Z", "build_snapshot" : false, "lucene_version" : "8.10.1", "minimum_wire_compatibility_version" : "6.8.0", "minimum_index_compatibility_version" : "6.0.0-beta1" }, "tagline" : "You Know, for Search" }

安装 kibana
  1. 根据自己所需下载 kibana 版本,我这里使用的是 7.16.2 版本
    下载地址:https://www.elastic.co/cn/downloads/past-releases#kibana
    elasticsearch|使用canal实时同步MySQL数据到es
    文章图片
  2. 解压 kibana-7.16.2-windows-x86_64.zip 压缩包到本地目录下:elasticsearch|使用canal实时同步MySQL数据到es
    文章图片
  3. 进入路径 \kibana-7.16.2\config 修改 kibana.yml 文件
    #打开kibana端口:5601 server.port: 5601 #设置kibana服务host:127.0.0.1 server.host: "127.0.0.1" #配置kibana连接es的地址和端口 elasticsearch.hosts: ["http://127.0.0.1:9200"] #配置kibana连接es的验证用户和密码 elasticsearch.username: "elastic" elasticsearch.password: "123456" #设置中文编码 i18n.locale: "zh-CN" #在末尾处添加以下属性,开启es对kibana的xpack认证 xpack.monitoring.ui.container.elasticsearch.enabled: true

  4. 启动 kibana
    #打开cmd命令符,进入路径\kibana-7.16.2\bin,输出命令: kibana.bat

    浏览器访问:http://127.0.0.1:5601/
    如果进入以下界面,表示 kibana 启动成功
    elasticsearch|使用canal实时同步MySQL数据到es
    文章图片

    输入用户:elastic,密码:123456 登陆就可以进入使用 kbana
下载和安装canal 1、下载canal 首先我们需要下载canal的各个组件canal-servercanal-adaptercanal-admin,大家可以根据自己需要下载版本,我这里使用的版本是1.1.5版本
下载地址:https://github.com/alibaba/canal/releases
elasticsearch|使用canal实时同步MySQL数据到es
文章图片

2、配置MySQL 配置数据库 my.ini 文件
[mysqld] #设置serveri_id server_id=1 #开启二进制日志功能 log-bin=mall-mysql-bin #设置使用的二进制日志格式(mixed,statement,row) binlog_format=row

配置成功后重启 MySQL ,查看 binlog 是否启用:
mysql> show variables like '%log_bin%'; +---------------------------------+------------------------------------------+ | Variable_name| Value| +---------------------------------+------------------------------------------+ | log_bin| ON| | log_bin_basename| D:\MySQL5.7.26\data\mall-mysql-bin| | log_bin_index| D:\MySQL5.7.26\data\mall-mysql-bin.index | | log_bin_trust_function_creators | OFF| | log_bin_use_v1_row_events| OFF| | sql_log_bin| ON| +---------------------------------+------------------------------------------+ 6 rows in set, 1 warning (0.00 sec)

查看binlog模式:
mysql> show variables like '%binlog_format%'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW| +---------------+-------+ 1 row in set, 1 warning (0.01 sec)

拥有从库权限的账号,用于订阅binlog,这里创建的账号为canal:root
mysql> create user canal identified by 'root'; mysql> grant select, replication slave, replication client on *.* to 'canal'@'%'; mysql> flush privtleges;

创建一个用于测试的数据库:canal
mysql> CREATE DATABASE canal; mysql> CREATE DATABASE IF NOT EXISTS canal default charset utf8 COLLATE utf8_general_ci;

然后创建一张数据库表 product
mysql> use canal; mysql> CREATE TABLE `product`( `id` bigint(20) NOT NULL AUTO_INCREMENT, `title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `sub_title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `price` decimal(10, 2) NULL DEFAULT NULL, `pic` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

3、配置 canal-server (canal-deploy) canal-server(canal-deploy):可以直接监听MySQL的binlog,把自己伪装成MySQL的从库,只负责接收数据,并不做处理。
解压我们下载好的压缩包 canal.deployer-1.1.5-SNAPSHOT.tar.gz到本地目录,解压完成后目录结构如下:
├── bin │├── restart.sh │├── startup.bat │├── startup.sh │└── stop.sh ├── conf │├── canal_local.properties │├── canal.properties │├── example ││└── instance.properties │├── logback.xml │├── metrics ││└── Canal_instances_tmpl.json │└── spring ├── lib ├── logs └── plugin

进入 conf 路径,修改配置文件 canal.properties,主要修改
#canal的server地址:127.0.0.1 canal.ip =127.0.0.1 #canal端口 canal.port = 11111

进入 conf/example 路径,修改配置文件 instance.properties,主要是修改数据库相关配置
# 需要同步数据的MySQL地址 canal.instance.master.address=127.0.0.1:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # 用于同步数据的数据库账号 canal.instance.dbUsername=canal # 用于同步数据的数据库密码 canal.instance.dbPassword=root # 数据库连接编码 canal.instance.connectionCharset = UTF-8 # 需要订阅binlog的表过滤正则表达式 canal.instance.filter.regex=.*\\..*

打开cmd命令符,进入 bin 目录下, 执行启动命令:
startup.bat

启动后,查看服务日志信息: logs/canal/canal.logelasticsearch|使用canal实时同步MySQL数据到es
文章图片

查看instance日志信息:logs/example/example.log ,如下elasticsearch|使用canal实时同步MySQL数据到es
文章图片
canal-server 已成功启动
4、配置 canal-adapter canal-adapter:相当于canal的客户端,会从canal-server中获取数据,然后对数据进行同步,可以同步到MySQL、Elasticsearch和HBase等存储中去。
解压我们下载好的压缩包 canal.adapter-1.1.5.tar.gz 到本地目录,解压完成后目录结构如下:
├── bin │├── adapter.pid │├── restart.sh │├── startup.bat │├── startup.sh │└── stop.sh ├── conf │├── application.yml │├── es6 │├── es7 ││├── biz_order.yml ││├── customer.yml ││└── mytest_user.yml │├── hbase │├── kudu │├── logback.xml │├── META-INF ││└── spring.factories │└── rdb ├── lib ├── logs │└── adapter │└── adapter.log └── plugin

修改配置文件conf/application.ym
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_nullcanal.conf: mode: tcp # 客户端的模式,可选tcp kafka rocketMQ flatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效 zookeeperHosts:# 对应集群模式下的zk地址 syncBatchSize: 1000 # 每次同步的批数量 retries: 0 # 重试次数, -1为无限重试 timeout: # 同步超时时间, 单位毫秒 accessKey: secretKey: consumerProperties: # canal tcp consumer canal.tcp.server.host: 127.0.0.1:11111 #设置canal-server的地址 canal.tcp.zookeeper.hosts: canal.tcp.batch.size: 500 canal.tcp.username: canal.tcp.password: srcDataSources: # 源数据库配置 defaultDS: url: jdbc:mysql://127.0.0.1:3306/canal?useUnicode=true&useSSL=true username: canal password: root canalAdapters: # 适配器列表 - instance: example # canal实例名或者MQ topic名 groups: # 分组列表 - groupId: g1 # 分组id, 如果是MQ模式将用到该值 outerAdapters: - name: logger # 日志打印适配器 - name: es7 # ES同步适配器 hosts: http://127.0.0.1:9200 # ES连接地址 properties: mode: rest # 模式可选transport(9300) 或者 rest(9200) security.auth: elastic:123456 #连接es的用户和密码,仅rest模式使用 cluster.name: es # ES集群名称, 与es目录下 elasticsearch.yml文件cluster.name对应

修改 canal-adapter/conf/es7/mytest_user.yml 文件,用于配置MySQL中的表与Elasticsearch中索引的映射关系
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值 destination: example# canal的instance或者MQ的topic groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据 esMapping: _index: canal_product # es 的索引名称 _id: _id# es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配 sql: "SELECT p.id AS _id, p.title, p.sub_title, p.price, p.pic FROM product p"# sql映射 etlCondition: "where p.id>={}"#etl的条件参数 commitBatch: 3000# 提交批大小

启动 canal-adapter
#打开cmd命令符,bin路径,执行命令 startup.bat

以下图片表示启动成功:
elasticsearch|使用canal实时同步MySQL数据到es
文章图片

5、配置canal-admin canal-admin:为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。
解压我们下载好的压缩包 canal.admin-1.1.5.tar.gz 到本地目录,解压完成后目录结构如下:
├── bin │├── restart.sh │├── startup.bat │├── startup.sh │└── stop.sh ├── conf │├── application.yml │├── canal_manager.sql │├── canal-template.properties │├── instance-template.properties │├── logback.xml │└── public │├── avatar.gif │├── index.html │├── logo.png │└── static ├── lib └── logs

创建 canal-admin 需要使用的数据库canal_manager,创建SQL脚本为/mydata/canal-admin/conf/canal_manager.sql,会创建如下表;
elasticsearch|使用canal实时同步MySQL数据到es
文章图片
修改配置文件conf/application.yml,按如下配置即可,主要是修改数据源配置和canal-admin的管理账号配置,注意需要用一个有读写权限的数据库账号,比如管理账号root:root
server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: 127.0.0.1:3306 database: canal_manager username: root#数据库管理员账号 password: root#数据库管理员账号密码 driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1 canal: adminUser: admin adminPasswd: admin

启动canal-admin
bin/startup.bat

访问canal-admin的Web界面,输入账号密码admin:123456即可登录,访问地址:http://127.0.0.1:8089
elasticsearch|使用canal实时同步MySQL数据到es
文章图片

同步MySQL到es 1、创建es索引 浏览器访问:http://127.0.0.1:5601/,用户:elastic,密码:123456
elasticsearch|使用canal实时同步MySQL数据到es
文章图片
elasticsearch|使用canal实时同步MySQL数据到es
文章图片

# 在kibana工具里创建es索引 PUT canal_product { "mappings": { "properties": { "title": { "type": "text" }, "sub_title": { "type": "text" }, "pic": { "type": "text" }, "price": { "type": "double" } } } }

【elasticsearch|使用canal实时同步MySQL数据到es】elasticsearch|使用canal实时同步MySQL数据到es
文章图片

2、在MySQL数据库中创建数据
# 之后使用如下SQL语句在数据库中创建一条记录 INSERT INTO product ( id, title, sub_title, price, pic ) VALUES ( 5, '小米8', ' 全面屏游戏智能手机 6GB+64GB', 1999.00, NULL );

创建成功后,在Elasticsearch中搜索,发现数据已经同步了;
GET canal_product/_search

elasticsearch|使用canal实时同步MySQL数据到es
文章图片

3、修改MySQL数据
# 再使用如下SQL对数据进行修改; UPDATE product SET title='小米10' WHERE id=5

elasticsearch|使用canal实时同步MySQL数据到es
文章图片

4、对MySQL数据进行删除操作
# 再使用如下SQL对数据进行删除操作; DELETE FROM product WHERE id=5

elasticsearch|使用canal实时同步MySQL数据到es
文章图片

参考链接:https://blog.csdn.net/zhenghongcs/article/details/109476376
常规错误解决 canal-adapter 启动报错:
2022-02-16 14:27:11.043 [Thread-2] ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 failed java.lang.IllegalStateException: Extension instance(name: es, class: interface com.alibaba.otter.canal.client.adapter.OuterAdapter)could not be instantiated: class could not be found

这是由于 druid 包冲突导致的
问题解决:
  1. 下载源码包
    下载地址为:https://github.com/alibaba/canal/archive/refs/tags/canal-1.1.5.tar.gz
  2. 解压后,使用IDEA打开,定位到 client-adapter.escore 模块,将 pom.xmldruid 更新为:
    com.alibaba druid >provided

  3. 更新后,在项目根目录下执行命令
    mvn clean package

    然后到 canal-canal-1.1.5/client-adapter/es7x/target 下 将打包好的 client-adapter.es7x-1.1.5-jar-with-dependencies.jar 替换掉 canal-adapter/plugin 下原来的 jar包,然后重新启动 canal-adapter
参考链接:https://ask.csdn.net/questions/7501103

    推荐阅读