Elasticsearch|Mysql同步数据到Elasticsearch(实时Canal)

这里只是作为一个想法,Canal有监听binlog文件的功能。所以简单看了一下Canal的入门使用。
后续Canal实时数据同步的功能希望不会被我阉割......当然有大佬已经实现或者有其他方法实现mysql到es的实时数据同步,请一定要@我一哈,感激不尽以身相许!!!
阿里中的一个开源框架,用来实时同步mysql...数据到es...。主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
参考:QuickStart · alibaba/canal Wiki (github.com)
1.mysql准备

[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;


2.下载canal Release v1.1.5 · alibaba/canal · GitHub
根据需要的版本进行下载。
Elasticsearch|Mysql同步数据到Elasticsearch(实时Canal)
文章图片

3.adapter (1)解压缩
mkdir /tmp/canal tar zxvf canal.deployer-$version.tar.gz-C /tmp/canal

(2)配置修改
vi conf/example/instance.properties

## mysql serverId 可以自动配置 #canal.instance.mysql.slaveId = 1234 #position info,需要改成自己的数据库信息 canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #username/password,需要改成自己的数据库信息 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8 #table regex canal.instance.filter.regex = .\*\\\\..\*

canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false
(3)启动
sh bin/startup.sh

(4)查看 server 日志
vi logs/canal/canal.log


2013-02-05 22:45:27.967 [main] INFOcom.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server. 2013-02-05 22:45:28.113 [main] INFOcom.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111] 2013-02-05 22:45:28.210 [main] INFOcom.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

(5)查看 instance 的日志
vi logs/example/example.log

? 2013-02-05 22:50:45.636 [main] INFOc.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties] 2013-02-05 22:50:45.641 [main] INFOc.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties] 2013-02-05 22:50:45.803 [main] INFOc.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 2013-02-05 22:50:45.810 [main] INFOc.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

(6) 关闭
sh bin/stop.sh

(7)测试adapter
【Elasticsearch|Mysql同步数据到Elasticsearch(实时Canal)】需要创建一个maven项目来测试
com.alibaba.otter canal.client 1.1.0

package com.alibaba.otter.canal.sample; import java.net.InetSocketAddress; import java.util.List; ? ? import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; ? ? public class SimpleCanalClientExample { ? ? public static void main(String args[]) { // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } ? connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } ? System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } ? private static void printEntry(List entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } ? RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } ? EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); ? for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } ? private static void printColumn(List columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + "update=" + column.getUpdated()); } } ? }

运行Client
首先启动Canal Server,可参见QuickStart
启动Canal Client后,可以从控制台从看到类似消息:
empty count : 1 empty count : 2 empty count : 3 empty count : 4

此时代表当前数据库无变更数据
触发数据库变更
mysql> use test; Database changed mysql> CREATE TABLE `xdual` ( ->`ID` int(11) NOT NULL AUTO_INCREMENT, ->`X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, ->PRIMARY KEY (`ID`) -> ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ; Query OK, 0 rows affected (0.06 sec) mysql> insert into xdual(id,x) values(null,now()); Query OK, 1 row affected (0.06 sec)

可以从控制台中看到:
empty count : 1 empty count : 2 empty count : 3 empty count : 4 ================> binlog[mysql-bin.001946:313661577] , name[test,xdual] , eventType : INSERT ID : 4update=true X : 2013-02-05 23:29:46update=true

4.deployer (1)解压缩
tar -zxvf canal.adapter-1.1.5.tar.gz -C canal-adapter

(2)修改启动器配置
vi application.yml

server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null ? canal.conf: # tcp kafka rocketMQ rabbitMQ canal-server运行的模式,TCP模式就是直连客户端,不经过中间件。kafka和mq是消息队列的模式 mode: tcp # kafka rocketMQ canalServerHost: 127.0.0.1:11111 #zookeeperHosts: slave1:2181 #mqServers: 127.0.0.1:9092 #or rocketmq #flatMessage: true batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: #srcDataSources: #defaultDS: #url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true #username: root #password: 121212 srcDataSources: defaultDS: #这里用来修改为自己的数据库信息 url: jdbc:mysql://192.168.188.128:3306/test?useUnicode=true username: canal password: canal canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: #没有logger会出现错误且没有日志产生 - name: logger #- name: rdb #key: mysql1 #properties: #jdbc.driverClassName: com.mysql.jdbc.Driver #jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true #jdbc.username: root #jdbc.password: 121212 #- name: rdb #key: oracle1 #properties: #jdbc.driverClassName: oracle.jdbc.OracleDriver #jdbc.url: jdbc:oracle:thin:@localhost:49161:XE #jdbc.username: mytest #jdbc.password: m121212 #- name: rdb #key: postgres1 #properties: #jdbc.driverClassName: org.postgresql.Driver #jdbc.url: jdbc:postgresql://localhost:5432/postgres #jdbc.username: postgres #jdbc.password: 121212 #threads: 1 #commitSize: 3000 #- name: hbase #properties: #hbase.zookeeper.quorum: 127.0.0.1 #hbase.zookeeper.property.clientPort: 2181 #zookeeper.znode.parent: /hbase #9300对应transport,9200对应rest,es对应conf下的包。 - name: es hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode properties: mode: rest # transport or rest # security.auth: test:123456 #only used for rest mode cluster.name: elasticsearch

(3)表映射文件
# 指定数据源,这个值和adapter的application.yml文件中配置的srcDataSources值对应。 dataSourceKey: defaultDS # 指定canal-server中配置的某个实例的名字,不同实例对应不同业务 destination: example # 组ID ,tcp方式这里填写空,不要填写值,不然可能会接收不到数据 groupId: # ES的mapping(映射) esMapping: # ES索引名称 _index: testsync2 # ES标示文档的唯一标示,通常对应数据表中的主键ID字段 _id: _id #upsert: true #pk: id # 数据表每个字段映射到表中的具体名称,不能重复 sql: "select a.id as _id, a.name,a.age,a.age_2,a.message,a.insert_time from testsync as a" #objFields: #_labels: array:; #etlCondition: "where a.c_time>={}" commitBatch: 10 dataSourceKey: defaultDS#对应application.yml中的datasourceConfigs下的配置 destination: example# 对应tcp模式下的canal instance或者MQ模式下的topic groupId: g1# 注意,同步Hbase数据这里groupId不要填写内容,对应MQ模式下的groupId, 只会同步对应groupId的数据 esMapping: _index: ceshi#索引库名称 _type: _doc#索引库类型 _id: _id#索引库主键 upsert: true #pk: id sql: "select a.id as _id,a.id,a.name as lname,a.age as lage,b.name as rname,b.age as rage from hleft a left join hright b on a.id=b.id" #objFields: #_labels: array:; etlCondition: "where a.is_delete=={0} and b.is_delete={0}" commitBatch: 3000

(4) 启动
启动:bin/startup.sh 停止:bin/stop.sh 重启:bin/restart.sh 日志目录:logs/adapter/adapter.log

启动前提是已经有索引库和相应的字段。
这里也不得不提一下使用Logstash同步数据时,是可以直接创建索引中的字段的。只是后续的字段类型可以需要根据需求进行修改。因此我觉得可以先用logstash进行一次全量同步,然后后续的实时数据交给canal。

欢迎各位大佬批评指正,如需转载请表明出处......

    推荐阅读