canal是阿里巴巴集团提供的一个开源产品,能够通过解析数据库的增量日志,提供增量数据的订阅和消费功能,是Github中开源的ETL(Extract Transform Load)软件。当您需要将MySQL中的增量数据同步至Elasticsearch时,可通过Canal来实现。
当前canal支持的MySQL版本包括5.1.x 、5.5.x 、5.6.x 、5.7.x 、8.0.x。
本文以阿里云Elasticsearch和RDS MySQL为例,为您介绍数据同步的方法。阿里云Elasticsearch兼容开源Elasticsearch的功能,以及Security、Machine Learning、Graph、APM等商业功能,致力于数据分析、数据搜索等场景服务。支持5.5.3、6.3.2、6.7.0、6.8.0和7.4.0等版本,并提供了商业插件X-Pack服务。在开源Elasticsearch的基础上提供企业级权限管控、安全监控告警、自动报表生成等功能。阿里云Elasticsearch为您提供1个月的免费试用活动,单击此处即可免费试用。
与开源Elasticsearch相比,阿里云Elasticsearch提供了高可用性、高可靠性、高安全性等功能特性。并且提供Elasticsearch和Kibana的全托管服务,您可以按需付费,即买即用。在此基础上,还对内核性能进行了优化,提供独立的index build服务、存储计算分离、智能运维、达摩院分词器、商业插件等功能。
操作流程
- 准备工作
创建RDS MySQL实例、了解Canal、创建阿里云ES和ECS实例。
本文使用的RDS MySQL、阿里云ES和ECS在相同的专有网络VPC(Virtual Private Cloud)下。
- RDS MySQL:用来存放源数据和增量数据。
- canal:进行数据库日志解析,获取增量变更进行同步,是Github中开源的ETL(Extract Transform Load)软件,详情请参见canal。
- 阿里云ES:用来接收增量数据。
- 阿里云ECS:用来部署Canal-server和Canal-adapter。
- 步骤一:准备MySQL数据源
在RDS MySQL中,准备待同步的数据。
- 步骤二:创建索引和mapping
在阿里云ES实例中,创建索引和Mapping。要求Mapping中定义的字段名称和类型与待同步数据保持一致。
- 步骤三:安装JDK
【elasticsearch|通过canal将MySQL数据同步到Elasticsearch】在使用Canal前,必须先安装JDK,要求版本大于等于1.8.0。
- 步骤四:安装并启动Canal-server
安装Canal-server,然后修改配置文件关联RDS MySQL。Canal-server模拟MySQL集群的一个slave,获取MySQL集群Master节点的二进制日志(binary log),并将日志推送给Canal-adapter。
- 步骤五:安装并启动Canal-adapter
安装Canal-adapter,然后修改配置文件关联RDS MySQL和ES,以及定义MySQL数据到ES数据的映射字段,用来将数据同步到ES。
- 步骤六:验证增量数据同步
在RDS MySQL中新增、修改或删除数据,查看数据同步结果。
- 创建RDS MySQL实例。 具体操作步骤请参见创建RDS MySQL实例。本文使用的配置如下。
文章图片
- 创建阿里云ES实例。 具体操作步骤请参见创建阿里云Elasticsearch实例。本文创建的实例的 版本 为 通用商业版6.7 。
注意 Canal不支持ES 7.0及以上版本。
- 创建阿里云ECS实例。 具体操作步骤请参见步骤一:创建ECS实例。本文创建的实例的 镜像 为 CentOS 7.6 64位 。
本文创建的表名称为 es_test ,包含的字段如下所示。
文章图片
步骤二:创建索引和mapping
- 登录目标阿里云ES实例的Kibana控制台。
登录控制台的具体步骤请参见登录Kibana控制台。
- 在左侧导航栏,单击 Dev Tools (开发工具)。
- 在 Console 中执行以下命令创建索引和mapping。
注意 mapping中的字段需要与步骤一:准备MySQL数据源中创建的字段(名称和类型)保持一致。
PUT es_test?include_type_name=true {"settings" : { "index" : { "number_of_shards" : "5", "number_of_replicas" : "1" } }, "mappings" : { "_doc" : { "properties" : { "count": { "type": "text" }, "id": { "type": "integer" }, "name" : { "type" : "text", "analyzer": "ik_smart" }, "color" : { "type" : "text" } } } } }
创建成功后,返回如下结果。
{ "acknowledged" : true, "shards_acknowledged" : true, "index" : "es_test" }
- 在阿里云ECS实例中,查看可用的JDK软件包列表。
yum search java | grep -i --color JDK
- 选择合适的版本,安装JDK。
本文选择 java-1.8.0-openjdk-devel.x86_64 。
yum install java-1.8.0-openjdk-devel.x86_64
- 配置环境变量。
- 打开etc文件夹下的profile文件。
vi /etc/profile
- 在文件内添加如下的环境变量。
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.71-2.b15.el7_2.x86_64 export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$PATH:$JAVA_HOME/bin
- 使用
:wq
保存文件并退出vi模式,执行以下命令使配置生效。
source /etc/profile
- 打开etc文件夹下的profile文件。
- 分别执行以下命令,验证JDK是否安装成功。{#cmd-8w2-325-ohx}
java
javac
java -version
文章图片
步骤四:安装并启动Canal-server
- 下载Canal-server。
本文使用1.1.4版本。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
- 解压。
tar -zxvf canal.deployer-1.1.4.tar.gz
- 修改 conf/example/instance.properties 文件。
vi conf/example/instance.properties
文章图片
配置项 说明 canal.instance.master.address
{#codeph-bsl-0mx-w1l},相关信息可在RDS MySQL实例的 基本信息 页面获取。例如:<内网端口> rm-bp1u1xxxxxxxxx6ph.mysql.rds.aliyuncs.com:3306
{#codeph-jo9-5lk-n4z}。canal.instance.dbUsername
RDS MySQL数据库的账号名称,可在实例的 账号管理 页面获取。 canal.instance.dbPassword
RDS MySQL数据库的密码。 - 使用:wq命令保存文件并退出vi模式。
- 启动Canal-server,并查看日志。
./bin/startup.sh cat logs/canal/canal.log
文章图片
- 下载Canal-adapter。
本文使用1.1.4版本。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
- 解压。
tar -zxvf canal.adapter-1.1.4.tar.gz
- 修改 conf/application.yml 文件。
vi conf/application.yml
文章图片
配置项 说明 canal.conf.canalServerHost
canalDeployer访问地址。保持默认( 127.0.0.1:11111
)即可。canal.conf.srcDataSources.defaultDS.url
jdbc:mysql://
,相关信息可在RDS MySQL实例的 基本信息 页面获取。例如:<内网端口>/<数据库名称>?useUnicode=true jdbc:mysql://rm-bp1xxxxxxxxxnd6ph.mysql.rds.aliyuncs.com:3306/elasticsearch?useUnicode=true
。canal.conf.srcDataSources.defaultDS.username
RDS MySQL数据库的账号名称,可在RDS MySQL实例的 账号管理 页面获取。 canal.conf.srcDataSources.defaultDS.password
RDS MySQL数据库的密码。 canal.conf.canalAdapters.groups.outerAdapters.hosts
定位到 name:es
的位置,将hosts
替换为<阿里云ES实例的内网地址>:<内网端口>
,相关信息可在ES实例的基本信息页面获取。例如es-cn-v64xxxxxxxxx3medp.elasticsearch.aliyuncs.com:9200
。canal.conf.canalAdapters.groups.outerAdapters.mode
必须设置为 rest
。canal.conf.canalAdapters.groups.outerAdapters.properties.security.auth
<阿里云ES实例的账号>:<密码>
。例如elastic:es_password
。canal.conf.canalAdapters.groups.outerAdapters.properties.cluster.name
阿里云ES实例的ID,可在实例的基本信息页面获取。例如 es-cn-v64xxxxxxxxx3medp
。 - 使用:wq命令保存文件并退出vi模式。
- 同样的方式,修改 conf/es/*.yml 文件,定义MySQL数据到ES数据的映射字段。
文章图片
配置项 说明 esMapping._index
步骤二:创建索引和mapping章节中,在ES实例中所创建的索引的名称。本文使用 es_test 。 esMapping._type
步骤二:创建索引和mapping章节中,在ES实例中所创建的索引的类型。本文使用 _doc 。 esMapping._id
需要同步到ES实例的文档的id,可自定义。本文使用 _id 。 esMapping.sql
SQL语句,用来查询需要同步到ES中的字段。本文使用 select t.id as _id,t.id,t.count,t.name,t.color from es_test t
。 - 启动Canal-adapter服务,并查看日志。
./bin/startup.sh cat logs/adapter/adapter.log
服务启动正常时,结果如下所示。
文章图片
- 在RDS MySQL数据库中,新增、修改或删除数据库中 es_test 表的数据。
insert `elasticsearch`.`es_test`(`count`,`id`,`name`,`color`) values('11',2,'canal_test2','red');
- 登录目标阿里云ES实例的Kibana控制台。
具体步骤请参见登录Kibana控制台。
- 在 Dev Tools (开发工具)页面的 Console 中,执行以下命令查询同步成功的数据。
GET /es_test/_search
数据同步成功后,返回如下结果。
文章图片
推荐阅读
- elasticSearch|canal1.15 增量同步 mysql5.7 数据到ElasticSearch7.14.0
- elasticsearch|使用canal同步MySQL数据到Elasticsearch(ES)
- 历史上的今天|【历史上的今天】3 月 11 日(谷歌推出 Google Voice;互联网先驱诞生日;Foursquare 上线)
- python|来聊聊SourceMap
- 大数据|edge浏览器下载位置设置
- 大数据|UCLA李婧翌(女性最不需要做的就是「怀疑自己」| 妇女节特辑)
- java|safari chrome_Mac用户应放弃Safari的Google Chrome浏览器
- 算法|机器学习必学十大算法
- AIRX|数据科学家需要了解的15个Python库