一文讲清楚FusionInsight MRS CDL如何使用
摘要:CDL是一种简单、高效的数据实时集成服务,能够从各种OLTP数据库中抓取Data Change事件,然后推送至Kafka中,最后由Sink Connector消费Topic中的数据并导入到大数据生态软件应用中,从而实现数据的实时入湖。本文分享自华为云社区《华为FusionInsight MRS CDL使用指南》,作者:晋红轻。
说明 【一文讲清楚FusionInsight MRS CDL如何使用】CDL是一种简单、高效的数据实时集成服务,能够从各种OLTP数据库中抓取Data Change事件,然后推送至Kafka中,最后由Sink Connector消费Topic中的数据并导入到大数据生态软件应用中,从而实现数据的实时入湖。
CDL服务包含了两个重要的角色:CDLConnector和CDLService。CDLConnector是具体执行数据抓取任务的实例,CDLService是负责管理和创建任务的实例。
本此实践介绍以mysql作为数据源进行数据抓取
前提条件
- MRS集群已安装CDL服务。
- MySQL数据库需要开启mysql的bin log功能(默认情况下是开启的)。
使用工具或者命令行连接MySQL数据库(本示例使用navicat工具连接),执行show variables like 'log_%'命令查看。
例如在navicat工具选择"File > New Query"新建查询,输入如下SQL命令,单击"Run"在结果中"log_bin"显示为"ON"则表示开启成功。
show variables like 'log_%'
文章图片
工具准备 现在cdl只能使用rest api的方式进行命令提交,所以需要提前安装工具进行调试。本文使用VSCode工具。
文章图片
完成之后安装rest client插件:
文章图片
完成之后创建一个cdl.http的文件进行编辑:
文章图片
创建CDL任务 CDL任务创建的流程图如下所示:
文章图片
说明:需要先创建一个MySQL link, 在创建一个Kafka link, 然后再创建一个CDL同步任务并启动。
MySQL link部分rest请求代码
@hostname = 172.16.9.113
@port = 21495
@host = {{hostname}}:{{port}}
@bootstrap = "172.16.9.113:21007"
@bootstrap_normal = "172.16.9.113:21005"
@mysql_host = "172.16.2.118"
@mysql_port = "3306"
@mysql_database = "hudi"
@mysql_user = "root"
@mysql_password = "Huawei@123"### get links
get https://{{host}}/api/v1/cdl/link### mysql link validatepost https://{{host}}/api/v1/cdl/link?validate=true
content-type: application/json{
"name": "MySQL_link", //link名,全局唯一,不能重复
"description":"MySQL connection", //link描述
"link-type":"mysql", //link的类型
"enabled":"true",
"link-config-values":{
"inputs": [
{ "name": "host", "value": {{mysql_host}} }, //数据库安装节点的ip
{ "name": "port", "value": {{mysql_port}} },//数据库监听的端口
{ "name": "database.name", "value": {{mysql_database}} }, //连接的数据库名
{ "name": "user", "value": {{mysql_user}} }, //用户
{ "name": "password","value": {{mysql_password}} } ,//密码
{ "name":"schema", "value": {{mysql_database}}}//同数据库名
]
}
}### mysql link createpost https://{{host}}/api/v1/cdl/link
content-type: application/json{
"name": "MySQL_link", //link名,全局唯一,不能重复
"description":"MySQL connection", //link描述
"link-type":"mysql", //link的类型
"enabled":"true",
"link-config-values":{
"inputs": [
{ "name": "host", "value": {{mysql_host}} }, //数据库安装节点的ip
{ "name": "port", "value": {{mysql_port}} },//数据库监听的端口
{ "name": "database.name", "value": {{mysql_database}} }, //连接的数据库名
{ "name": "user", "value": {{mysql_user}} }, //用户
{ "name": "password","value": {{mysql_password}} } ,//密码
{ "name":"schema", "value": {{mysql_database}}}//同数据库名
]
}
}### mysql link updateput https://{{host}}/api/v1/cdl/link/MySQL_link
content-type: application/json{
"name": "MySQL_link", //link名,全局唯一,不能重复
"description":"MySQL connection", //link描述
"link-type":"mysql", //link的类型
"enabled":"true",
"link-config-values":{
"inputs": [
{ "name": "host", "value": {{mysql_host}} }, //数据库安装节点的ip
{ "name": "port", "value": {{mysql_port}} },//数据库监听的端口
{ "name": "database.name", "value": {{mysql_database}} }, //连接的数据库名
{ "name": "user", "value": {{mysql_user}} }, //用户
{ "name": "password","value": {{mysql_password}} } ,//密码
{ "name":"schema", "value": {{mysql_database}}}//同数据库名
]
}
}
Kafka link部分rest请求代码
### get links
get https://{{host}}/api/v1/cdl/link### kafka link validatepost https://{{host}}/api/v1/cdl/link?validate=true
content-type: application/json{
"name": "kafka_link",
"description":"test kafka link",
"link-type":"kafka",
"enabled":"true",
"link-config-values":{
"inputs": [
{ "name": "bootstrap.servers", "value": "172.16.9.113:21007" },
{ "name": "sasl.kerberos.service.name", "value": "kafka" },
{ "name": "security.protocol","value": "SASL_PLAINTEXT" }//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT
]
}
}### kafka link createpost https://{{host}}/api/v1/cdl/link
content-type: application/json{
"name": "kafka_link",
"description":"test kafka link",
"link-type":"kafka",
"enabled":"true",
"link-config-values":{
"inputs": [
{ "name": "bootstrap.servers", "value": "172.16.9.113:21007" },
{ "name": "sasl.kerberos.service.name", "value": "kafka" },
{ "name": "security.protocol","value": "SASL_PLAINTEXT" }//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT
]
}
}### kafka link updateput https://{{host}}/api/v1/cdl/link/kafka_link
content-type: application/json{
"name": "kafka_link",
"description":"test kafka link",
"link-type":"kafka",
"enabled":"true",
"link-config-values":{
"inputs": [
{ "name": "bootstrap.servers", "value": "172.16.9.113:21007" },
{ "name": "sasl.kerberos.service.name", "value": "kafka" },
{ "name": "security.protocol","value": "SASL_PLAINTEXT" }//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT
]
}
}
CDL任务命令部分rest请求代码
@hostname = 172.16.9.113
@port = 21495
@host = {{hostname}}:{{port}}
@bootstrap = "172.16.9.113:21007"
@bootstrap_normal = "172.16.9.113:21005"
@mysql_host = "172.16.2.118"
@mysql_port = "3306"
@mysql_database = "hudi"
@mysql_user = "root"
@mysql_password = "Huawei@123"### create job
post https://{{host}}/api/v1/cdl/job
content-type: application/json{
"job_type": "CDL_JOB", //job类型,目前只支持CDL_JOB这一种
"name": "mysql_to_kafka", //job名称
"description":"mysql_to_kafka", //job描述
"from-link-name": "MySQL_link",//数据源Link
"to-link-name": "kafka_link", //目标源Link
"from-config-values": {
"inputs": [
{"name" : "connector.class", "value" : "com.huawei.cdc.connect.mysql.MysqlSourceConnector"},
{"name" : "schema", "value" : "hudi"},
{"name" : "db.name.alias", "value" : "hudi"},
{"name" : "whitelist", "value" : "hudisource"},
{"name" : "tables", "value" : "hudisource"},
{"name" : "tasks.max", "value" : "10"},
{"name" : "mode", "value" : "insert,update,delete"},
{"name" : "parse.dml.data", "value" : "true"},
{"name" : "schema.auto.creation", "value" : "false"},
{"name" : "errors.tolerance", "value" : "all"},
{"name" : "multiple.topic.partitions.enable", "value" : "false"},
{"name" : "topic.table.mapping", "value" : "[
{\"topicName\":\"huditableout\", \"tableName\":\"hudisource\"}
]"
},
{"name" : "producer.override.security.protocol", "value" : "SASL_PLAINTEXT"},//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT
{"name" : "consumer.override.security.protocol", "value" : "SASL_PLAINTEXT"}//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT
]
},
"to-config-values": {"inputs": []},
"job-config-values": {
"inputs": [
{"name" : "global.topic", "value" : "demo"}
]
}
}### get all job
get https://{{host}}/api/v1/cdl/job
### submit job
put https://{{host}}/api/v1/cdl/job/mysql_to_kafka/start
### get job status
get https://{{host}}/api/v1/cdl/submissions?jobName=mysql_to_kafka
### stop job
put https://{{host}}/api/v1/cdl/job/mysql_to_kafka/submissions/13/stop
### delete job
DELETE https://{{host}}/api/v1/cdl/job/mysql_to_kafka
场景验证 生产库MySQL原始数据如下:
文章图片
提交CDL任务之后
文章图片
增加操作: insert into hudi.hudisource values (11,“蒋语堂”,38,“女”,“图”,“播放器”,28732);
对应kafka消息体:
文章图片
更改操作: UPDATE hudi.hudisource SET uname=‘Anne Marie333’ WHERE uid=11;
对应kafka消息体:
文章图片
删除操作:delete from hudi.hudisource where uid=11;
对应kafka消息体:
文章图片
点击关注,第一时间了解华为云新鲜技术~
推荐阅读
- mysql|一文深入理解mysql
- 数据技术|一文了解Gauss数据库(开发历程、OLTP&OLAP特点、行式&列式存储,及与Oracle和AWS对比)
- 一文弄懂MySQL中redo|一文弄懂MySQL中redo log与binlog的区别
- c语言|一文搞懂栈(stack)、堆(heap)、单片机裸机内存管理malloc
- 网络|一文彻底搞懂前端监控
- 【SpringCloud-Alibaba系列教程】8.一文学会使用sentinel
- 2020买重疾险看这篇就够了,一文明白重疾险怎么买|2020买重疾险看这篇就够了,一文明白重疾险怎么买,更划算
- 《繁凡的深度学习笔记》|一文绝对让你完全弄懂信息熵、相对熵、交叉熵的意义《繁凡的深度学习笔记》第 3 章 分类问题与信息论基础(中)(DL笔记整理
- 开源生态|GPL、MIT、Apache...开发者如何选择开源协议(一文讲清根本区别)
- python|深度盘点(一文详解数据分析中100个常用指标和术语)