#|canal同步MySQL数据到Elasticsearch


canal同步数据详解

  • 前言
  • canal
    • 下载
    • 工作原理
    • 开启mysql数据同步功能
    • 安装
    • 配置详情
      • 配置 `canal-deployer`
      • 配置 `canal-adapter`
    • 测试
      • 数据库准备
      • 创建es索引
      • 增删改
    • admin

前言 上篇文章讲了es同步数据的方案和使用logstash同步的讲解(es数据同步方案),本文详细讲解canal方式。
canal可实现对es数据进行增删改的增量同步,删除是真正的物理删除,这点是logstash不具备的。
es和kibana的下载安装可以参照:centos/docker下载安装elasticsearch、kibana及ik分词器
canal 说明:
canal官网的介绍也是很详细的,可以对照此文和官网配置来搭建canal,官网用的是1.1.1来作为案例使用,本文是1.1.5,可能会多一些参数或者有些参数配置不一样,都会有提到,但也毕竟是少数不同,大部分配置都是相同的。
官方文档案例
同时也可以参考官方文档同步到其他软件的教程,比如MQ、hbase等等。
下载 下载地址
canal三大组件:canal-server、canal-adapter、canal-admin。
canal-server:也就是deployer,服务端,进行数据获取;
canal-adapter:客户端,负责从服务端获取数据,同步数据到es;
canal-admin:管理端,可视化页面,方便运维查看。
#|canal同步MySQL数据到Elasticsearch
文章图片

工作原理 流程图:
核心就是把自己伪装成数据库的一个从库,通过binlog获取日志信息,然后进行转发同步。
#|canal同步MySQL数据到Elasticsearch
文章图片

#|canal同步MySQL数据到Elasticsearch
文章图片

开启mysql数据同步功能 上边说过,canal实际就是伪装成mysql的从库来实现数据同步,所以,在安装配置canal之前,先把数据库的准备工作做好,后边配置canal的时候直接用就好。
开启binlog
# 设置server_id,一般设置为IP,保证唯一就好 server_id=1 # binlog日志格式,(mixed,statement,row,默认格式是statement) binlog_format=row # 开启binlog,名字可以随意取 log-bin=mysql-canal-binlog

重启mysql,查看是否开启成功!
show variables like '%log_bin%';

#|canal同步MySQL数据到Elasticsearch
文章图片

show master status;

#|canal同步MySQL数据到Elasticsearch
文章图片

创建授权用户
grant select, replication slave, replication client on *.* to 'canal'@'%' identified by '123456'; flush PRIVILEGES;

至此,mysql主库配置准备完毕,下边开始canal的配置。
安装 将下载的三个压缩包分别放在对应目录下,全部解压即可。
#|canal同步MySQL数据到Elasticsearch
文章图片

当然如果你的机器内存够大,就放在一起,否则,就分散到多台机器上运行,不然加上es的各个软件,会内存不足。
配置详情 配置一定要小心,sql语句不能写错,es地址不能写错,这都是后话,先留意一下。
说明:由于内存总是不够,所以我这里使用了两台centos虚拟机部署
192.168.150.129:部署了canal-deployer和mysql;
192.168.150.132:部署了canal-adapter、es、kibana、canal-admin;
配置 canal-deployer
由于deployer和adapter不在一台机器上,所以我要先配置canal.properties,开放访问权限。
1.配置canal.properties
进入 conf 路径,修改配置文件 canal.properties,只修改下边的一个配置即可。
#|canal同步MySQL数据到Elasticsearch
文章图片

2. 配置 instance.properties
进入 conf/example 路径,修改配置文件 instance.properties
#|canal同步MySQL数据到Elasticsearch
文章图片

进入 bin 路径,启动
sh startup.sh

查看日志是否正常 ·logs/example/example.log·
cat example.log

#|canal同步MySQL数据到Elasticsearch
文章图片

因为要被另一台机器的adapter连接,所以要开放本台linux的11111端口。
# 开启防火墙 systemctl start firewalld # 开放指定端口 不加--permanent也是临时开放 firewall-cmd --zone=public --add-port=11111/tcp --permanent # 重启防火墙 firewall-cmd --reload # 查看 firewall-cmd --zone=public --query-port=11111/tcp

有问题,对照日志报错,仔细修改,勿急勿躁。
配置 canal-adapter
  1. 配置 application.ym
修改配置文件 conf/application.ym,主要是配置服务端deployer、mysql、es的连接地址和密码等信息,以便知道,数据从哪获取,要送到那里去。
#|canal同步MySQL数据到Elasticsearch
文章图片

#|canal同步MySQL数据到Elasticsearch
文章图片

数据源配置:
url: jdbc:mysql://192.168.150.129:3306/test_data?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&allowMultiQueries=true&serverTimezone=Asia/Shanghai

注意,如果是两台机器,mysql和es的ip地址一定要写对了,不能混淆了。
  1. 配置 mytest_user.yml
修改 conf/es7/mytest_user.yml 文件,主要用于配置MySQL中的表与Elasticsearch中索引的映射关系。
dataSourceKey: defaultDS destination: example groupId: g1 esMapping: _index: user _id: id upsert: true #是否更新 #pk: id sql: "select a.id, a.user_code AS userCode, a.role_id AS roleId,a.user_name AS userName,a.user_phone AS userPhone,a.icon,a.source_type AS sourceType,a.openid,a.status,a.create_time AS createTime,b.role_name AS roleName from cl_user a left join role b on b.id=a.role_id" #objFields: ##_labels: array:; #可以存对象或数组,具体参考官网案例 etlCondition: "where a.createTime>={}" # ETL参数,可以手动请求传参数到这里,全量同步时可以使用 commitBatch: 3000

etlCondition 使用就可以参考官网的这里:Client Adapter说明
这里千万注意,表明不能写错,索引不能写错,字段不能写错,否则会出现各种报错。
_index:代表es中的索引名称,这个索引以及映射关系一定先在kibana中定义好。
如果sql有问题,比如多一个逗号等,启动之后就会出现下边的错误提信息。
#|canal同步MySQL数据到Elasticsearch
文章图片

到这里就配置完了,下边进入bin目录开始启动,查看日志,启动失败可能会出现下边的问题。
  1. 常见问题解决:
java.lang.ClassCastException::class com.alibaba.druid.pool.DruidDataSource cannot be cast to class com.alibaba.druid.pool.DruidDataSource
启动会出现下边的错误:这是由于druid包冲入导致的
#|canal同步MySQL数据到Elasticsearch
文章图片

解决方法:
去官网将源码下载下来,解压之后用IDE开发工具导入项目。
#|canal同步MySQL数据到Elasticsearch
文章图片

修改pom文件
#|canal同步MySQL数据到Elasticsearch
文章图片

com.alibaba druid>provided

项目加载完成直接打包:install或者package,记得把上边的测试关闭一下在执行打包。
#|canal同步MySQL数据到Elasticsearch
文章图片

打完包之后就得到下方的jar包
#|canal同步MySQL数据到Elasticsearch
文章图片

直接上传至adapter的plugin目录中,替换掉原来的jar包。
#|canal同步MySQL数据到Elasticsearch
文章图片

再重启就没问题了。
提示:由于上面的操作太过耗费时间,这边准备了打好jar包,可以直接下载来替换使用
client-adapter.es7x-1.1.5-jar-with-dependencies.jar下载地址,密码:b3uu
  1. 全量同步
启动之后,我们来做数据库全量同步,因为canal默认是不支持全量同步的,所以我们通过ETL手动来实现全量同步(请求adapter客户端,web端口默认为8081)。
curl -X POST http://127.0.0.1:8081/etl/es7/mytest_user.yml

#|canal同步MySQL数据到Elasticsearch
文章图片

另外大数据量同步可能会出现问题,解决办法参考这里`:全量同步Elasticsearch方案之Canal
测试 数据库准备
#|canal同步MySQL数据到Elasticsearch
文章图片

创建es索引
PUT /user { "mappings": { "properties": { "id": { "type": "keyword" }, "userCode": { "type": "keyword", "index" : false }, "roleId": { "type": "integer", "index" : false }, "userName": { "type": "text" }, "userPhone": { "type": "keyword" }, "icon": { "type": "text", "index" : false }, "sourceType": { "type": "integer", "index" : false }, "openid": { "type": "text", "index" : false }, "status": { "type": "keyword" }, "createTime": { "type": "date", "index" : false }, "roleName": { "type": "text", "index" : false } } } }

增删改
添加一条数据
#|canal同步MySQL数据到Elasticsearch
文章图片

直接对这条数据进行删改即可在kibana上看到效果。
admin 具体就不多讲了,参考下边两个连接即可:
官网admin文档
#|canal同步MySQL数据到Elasticsearch
文章图片

canal-admin可视化管理
【#|canal同步MySQL数据到Elasticsearch】注意一定要区分开,集群和单机的配置。

    推荐阅读