使用canal同步mysql数据到es:日期格式问题
原文链接: https://blog.csdn.net/weixin_41546244/article/details/108381219
问题:
使用canal同步mysql数据到es,时间格式转换问题。
常规安装部署以及使用暂不提供,百度搜一堆。。。
环境:
canal1.1.5,mysql8,Elasticsearch6.7
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
错误信息:
java.lang.RuntimeException: ES sync commit error ElasticsearchException[Elasticsearch exception
[type=mapper_parsing_exception, reason=failed to parse field [createTime] of type [date] in document with id 'XXX']];
nested: ElasticsearchException[Elasticsearch exception [type=illegal_argument_exception, reason=Invalid format: "2020-08-25T15:27:01+08:00" is malformed at "-08-25T15:27:01+08:00"]];
问题描述,分析:
*前提:*安装配置完成
/usr/local/canal-adapter/conf/application.yml
/usr/local/canal-adapter/conf/es6 等配置完成
1. 创建索引:
"createTime": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || yyyy/MM/dd HH:mm:ss|| yyyy/MM/dd ||epoch_millis"
},
根据日志:
DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML:中data的信息找到:
"create_time":1599031621000
确认数据库查询是java.sql.Timestamp
可以知道canal同步数据时候时间转换存在问题。
2. 下载canal源码:
https://github.com/alibaba/canal/releases
https://github.com/alibaba/canal/releases/tag/canal-1.1.5-alpha-1
备忘下载:https://toolwa.com/github/
结构:
文章图片
找到ES6xTemplate类找到读取mapping的地方getValFromData方法:
// 如果是对象类型
if (mapping.getObjFields().containsKey(fieldName)) {
return ESSyncUtil.convertToEsObj(value, mapping.getObjFields().get(fieldName));
} else {
return ESSyncUtil.typeConvert(value, esType);
}
【es+canal|使用canal同步mysql数据到ES:日期格式问题.】最终定位到:typeConvert方法,继续跟踪到ESSyncUtil中
找到type为date的地方:
解释:本来我的日期就是正常的北京时间所以我所需要的就是直接改掉+8:00以及T并不需要
2020-06-23T06:52:56.000+08:00----》2020-06-23 06:52:56
注意,根据每个人的不同进行更改,复杂的存储需要自己进行存储时候更改,目前本人只存在根据binglog同步各个单表,根据业务需求自己定。
3. 修改源码
修改源码:(logger.info请忽略)
else if ("date".equals(esType)) {
if (val instanceof java.sql.Time) {
DateTime dateTime = new DateTime(((java.sql.Time) val).getTime());
if (dateTime.getMillisOfSecond() != 0) {
res = dateTime.toString("HH:mm:ss.SSS");
} else {
res = dateTime.toString("HH:mm:ss");
}
} else if (val instanceof java.sql.Timestamp) {
DateTime dateTime = new DateTime(((java.sql.Timestamp) val).getTime());
if (dateTime.getMillisOfSecond() != 0) {
logger.info("Timestamp1"+dateTime.toString());
res = dateTime.toString("yyyy-MM-dd HH:mm:ss.SSS");
logger.info("Timestamp2"+res.toString());
} else {
logger.info("Timestamp3"+dateTime.toString());
res = dateTime.toString("yyyy-MM-dd HH:mm:ss");
logger.info("Timestamp4"+res);
}
} else if (val instanceof java.sql.Date || val instanceof Date) {
logger.info("Date1"+val.toString());
DateTime dateTime;
if (val instanceof java.sql.Date) {
dateTime = new DateTime(((java.sql.Date) val).getTime());
} else {
dateTime = new DateTime(((Date) val).getTime());
}
if (dateTime.getHourOfDay() == 0 && dateTime.getMinuteOfHour() == 0 && dateTime.getSecondOfMinute() == 0
&& dateTime.getMillisOfSecond() == 0) {
res = dateTime.toString("yyyy-MM-dd");
} else {
logger.info("Date2"+dateTime.toString());
if (dateTime.getMillisOfSecond() != 0) {
res = dateTime.toString("yyyy-MM-dd HH:mm:ss.SSS");
} else {
res = dateTime.toString("yyyy-MM-dd HH:mm:ss");
}
}
} else if (val instanceof Long) {
if(((Long)val) < 10000000000L){
res = val;
}else {
logger.info("Long1"+val.toString());
DateTime dateTime = new DateTime(((Long) val).longValue());
if (dateTime.getHourOfDay() == 0 && dateTime.getMinuteOfHour() == 0 && dateTime.getSecondOfMinute() == 0
&& dateTime.getMillisOfSecond() == 0) {
res = dateTime.toString("yyyy-MM-dd");
} else if (dateTime.getMillisOfSecond() != 0) {
res = dateTime.toString("yyyy-MM-dd HH:mm:ss.SSS");
/*res = dateTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS" + Util.timeZone);
*/
} else {
res = dateTime.toString("yyyy-MM-dd HH:mm:ss");
}
}
**
4. 打包替换:
maven打包后找到自己的目录如:
F:\es\canal-canal-1.1.5-alpha-2 (1)\canal-canal-1.1.5-alpha-2\client-adapter\launcher\target\canal-adapter\plugin
文章图片
导入到
/usr/local/canal-adapter/plugin目录下替换。(注意红框的包,如果不确定哪一个,直接全拉进去替换就是了)
drwxr-xr-x. 2 root root95 9月3 14:16 bin
drwxrwxrwx. 7 root root131 9月3 14:16 conf
drwxr-xr-x. 2 root root 8192 9月3 10:16 lib
drwxrwxrwx. 3 root root21 9月3 11:14 logs
drwxrwxrwx. 2 root root 4096 9月3 11:40 plugin
[root@localhost canal-adapter]# pwd
/usr/local/canal-adapter
[root@localhost canal-adapter]#
与
文章图片
注意:如果创建索引时候只是"format": “yyyy-MM-dd HH:mm:ss”:
"createTime": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
会报错(原因自行百度):
ElasticsearchException[Elasticsearch exception [type=illegal_argument_exception, reason=Invalid
format: "2020-09-02 15:27:01" is malformed at "-09-02 15:27:01"]];
推荐阅读
- elasticsearch|使用canal实时同步MySQL数据到es
- 技术|基于canal实现mysql和es增量数据同步
- elasticSearch|canal1.15 增量同步 mysql5.7 数据到ElasticSearch7.14.0
- elasticsearch|通过canal将MySQL数据同步到Elasticsearch
- elasticsearch|使用canal同步MySQL数据到Elasticsearch(ES)
- Linux|彻底搞懂linux中的权限【详解】
- Linux|Linux常用指令大全【详解】
- Linux|Linux环境搭建,创建用户和删除用户
- KafKa|Kafka快速入门(1)