什么是StarRocks?
StarRocks是新一代极速统一的olap新型mpp分析型数据库,全面向量化引擎,全新的CBO优化器,性能强悍,单表查询媲美业界最强悍的clickhouse,支持多表join,支持数据秒级更新;
且同时支持高并发,架构极简,方便运维扩展,完全国产,安全可控,在国内外各行各业已经得到了广泛使用。
StarRocks提供了丰富的数据接入方式:stream load,routine load,broker load,spark load等,对接比如本地文件,对象存储,hdfs,数据库,kafka,还可以使用flink cdc方式同步数据到starrocks,也支持开源工具比如datax,seatunnel等,也定制了flink connector source/sink 到starrocks。具体可以参考官网文档:https://docs.starrocks.com/zh-cn/main/loading/Loading_intro
本文示例如何通过Routine load工具通过kafka将TP类型的增量数据方便快捷同步到StarRocks中(除了下文使用到的方法,也可以使用flink cdc借助flink sql同步)
Routine Load原理:
文章图片
导入流程如上图:
- 用户通过支持MySQL协议的客户端向 FE 提交一个Kafka导入任务。
- FE将一个导入任务拆分成若干个Task,每个Task负责导入指定的一部分数据。
- 每个Task被分配到指定的 BE 上执行。在 BE 上,一个 Task 被视为一个普通的导入任务, 通过 Stream Load 的导入机制进行导入。
- BE导入完成后,向 FE 汇报。
- FE 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试。
- FE 会不断的产生新的 Task,来完成数据不间断的导入。
测试步骤:
1.开启mysql binlog
确认mysql binlog已经开启:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=2# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
也可以在mysql中通过show variables like '%xxx%'方式确认相关配置已经开启;
2.配置好canal环境,使数据sink到kakfa
配置两个文件,conf/canal.properties, conf/exmaple/instance.properties,启动canal;
3.准备好StarRocks集群
方便测试一个节点就可以,生产环境推荐至少3台服务器以上,分布式部署,多副本,保障数据的不丢失以及服务的高可用;
4.建好kafka topic
kafka中数据格式:
{"data":[{"id":"401","k1":"st","v1":"401"}],"database":"gong","es":1648790506000,"id":19,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648790506948,"type":"INSERT"}
{"data":[{"id":"401","k1":"st","v1":"401"}],"database":"gong","es":1648790577000,"id":20,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648790577916,"type":"DELETE"}
{"data":[{"id":"402","k1":"st","v1":"402"}],"database":"gong","es":1648790789000,"id":21,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648790797431,"type":"INSERT"}
{"data":[{"id":"402","k1":"st","v1":"402"}],"database":"gong","es":1648790832000,"id":22,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648790832760,"type":"DELETE"}
{"data":[{"id":"403","k1":"st","v1":"403"}],"database":"gong","es":1648791354000,"id":23,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648791354904,"type":"INSERT"}
{"data":[{"id":"403","k1":"st","v1":"403"}],"database":"gong","es":1648791385000,"id":24,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648791395247,"type":"DELETE"}
可以看到在mysql binlog输出到kafka中的json数据,后面都会有一个type字段,类型为insert,update or delete,StarRocks正是通过去解析这个字段类型,来做后续在内部的添加,更新,删除数据。
5.建好在StarRocks中建好routine load job
create routine load gong.cdc0401 on cdc_0401 columns(id,k1,v1,temp,__op=(CASE temp WHEN "DELETE" THEN 1 ELSE 0 END))PROPERTIES ("format"="json","jsonpaths"="[\"$.data[0].id\",\"$.data[0].k1\",\"$.data[0].v1\",\"$.type\"]","desired_concurrent_number"="1", "max_error_number"="1000","max_batch_interval"="5","strict_mode" = "false")FROM KAFKA ("kafka_broker_list"= "cs01:9092,cs02:9092,cs03:9092","kafka_topic" = "gong_test","kafka_partitions"="0","kafka_offsets"="OFFSET_BEGINNING");
需要注意的是,columns和jsonpath部分较容易弄错,参考StarRocks 论坛文章:https://forum.starrocks.com/t/topic/851
6.验证:测试,在mysql中insert,update,delete数据,是否同步到starrocks
mysql中插入数据,再删除:
MariaDB [gong]> insert into gong_cdc values(98777777,"987",9888888);
Query OK, 1 row affected (0.00 sec)MariaDB [gong]> delete from gong_cdc where id = 98777777;
Query OK, 1 row affected (0.00 sec)mysql> select * from cdc_0401;
+----------+-----------+---------+
| id| k1| v1|
+----------+-----------+---------+
|321 | 3321|321 |
|444 | main|1 |
|666 | starrocks |666 |
|777 | 777|777 |
|888 | 777|888 |
|987 | 987|987 |
|10086 | cheng|1 |
|11111 | sr|1 |
|30003 | sr|30 |
|88888 | 88888|8888 |
|100002 | march|1 |
|100003 | gong|1 |
|200000 | cheng|1 |
| 98777777 | 987| 9888888 |
+----------+-----------+---------+
14 rows in set (0.01 sec)
在StarRocks sql cli端check routine load任务是否正常,以及报错等:
mysql> show routine load\G;
*************************** 1. row ***************************
Id: 10252
Name: cdc0401
CreateTime: 2022-04-01 17:01:15
PauseTime: NULL
EndTime: NULL
DbName: default_cluster:gong
TableName: cdc_0401
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","columnToColumnExpr":"id,k1,v1,temp,__op=(CASE `temp` WHEN 'DELETE' THEN 1 ELSE 0 END)","maxBatchIntervalS":"5","whereExpr":"*","dataFormat":"json","timezone":"Asia/Shanghai","format":"json","json_root":"","strict_mode":"false","jsonpaths":"[\"$.data[0].id\",\"$.data[0].k1\",\"$.data[0].v1\",\"$.type\"]","desireTaskConcurrentNum":"1","maxErrorNum":"100000","strip_outer_array":"false","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"gong_test","currentKafkaPartitions":"0","brokerList":"cs01:9092,cs02:9092,cs03:9092"}
CustomProperties: {}
Statistic: {"receivedBytes":1787751,"errorRows":3001,"committedTaskNum":3,"loadedRows":41,"loadRowsRate":0,"abortedTaskNum":0,"totalRows":3042,"unselectedRows":0,"receivedBytesRate":198000,"taskExecuteTimeMs":9028}
Progress: {"0":"3041"}
ReasonOfStateChanged:
ErrorLogUrls: http://172.26.194.184:29122/api/_load_error_log?file=__shard_5/error_log_insert_stmt_5752f798-7efa-47d8-b7ba-fcbc08dcfad5_5752f7987efa47d8_b7bafcbc08dcfad5
OtherMsg:
1 row in set (0.00 sec)ERROR:
No query specified
check数据是否同步到StarRocks:
mysql> select * from cdc_0401;
+--------+-----------+------+
| id| k1| v1|
+--------+-----------+------+
|321 | 3321|321 |
|444 | main|1 |
|666 | starrocks |666 |
|777 | 777|777 |
|888 | 777|888 |
|987 | 987|987 |
|10086 | cheng|1 |
|11111 | sr|1 |
|30003 | sr|30 |
|88888 | 88888| 8888 |
| 100002 | march|1 |
| 100003 | gong|1 |
| 200000 | cheng|1 |
+--------+-----------+------+
13 rows in set (0.00 sec)
查看starrocks的数据,确实先进来,后删除了,同时也可以随时查看routie load job运行状况,确保任务没有异常,这样子数据才能同步进来;
7.测试过程碰到的问题
【StarRocks|使用StarRocks内置工具Routine Load同步Mysql/TiDB/PG等增量更新数据到StarRocks】 问题一:创建routine load方式如下
create routine load gong.cdc0401 on cdc_0401
columns(id,k1,v1)
PROPERTIES (
"format"="json",
"desired_concurrent_number"="1",
"max_error_number"="100",
"max_batch_interval"="5",
"strict_mode" = "false"
)
FROM KAFKA (
"kafka_broker_list"= "cs01:9092,cs02:9092,cs03:9092",
"kafka_topic" = "gong_test",
"kafka_partitions"="0",
"kafka_offsets"="OFFSET_BEGINNING"
);
发现不管如何测试,在mysql中的insert和update都可以同步到starrocks中,一度以为starrocks官网写得增量同步,只是同步新增或变更的数据,删除不了;
参考StarRoks论坛文档链接:https://docs.starrocks.com/zh-cn/main/loading/Json_loading
强调__op字段
问题二:__op字段配置的问题
1. columns(id,k1,v1,temp,__op=(CASE temp WHEN "DELETE" THEN 1 ELSE 0 END))2. "jsonpaths"="[\"$.data[0].id\",\"$.data[0].k1\",\"$.data[0].v1\",\"$.type\"]",
这两处配置是需要格外注意的地方,可以参考官网论坛链接:https://forum.starrocks.com/t/topic/851
字段配置错误,报错:
Reason: column count mismatch, expect=4 real=1. src line: [{"data":[{"id":"88888","k1":"88888","v1":"8888"}],"database":"gong","es":1648798201000,"id":30,"isDdl":false,"mysqlType":{"id":"int(11)","k1":"varchar(25)","v1":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"k1":12,"v1":4},"table":"gong_cdc","ts":1648798212928,"type":"INSERT"}];
3.由于数据质量等问题引起的null情况,需要配置参数"max_error_number"="100",可以配置为一个较大的值,否则routine load任务会paused
4.在建routine load任务时候,对应字段反引号``引起来了,会报错id取值为空,其他字段取到的值也都为空情况,定位了很久,应该是当前routine load作业的一个小bug,版本2.1.x:
文章图片
需要将反引号``符号去掉:
文章图片
推荐阅读
- 数据库|开源数据计算引擎,实现媲美ElasticSearch的高性能并发查询
- Hadoop|Hadoop之Flume采集文件到hdfs
- 思普大数据技术|Hadoop生态之Hadoop体系架构(一)
- 笔记|nfs网络文件系统
- 数据仓库|实时BI(四)低成本的数据准实时处理思路
- flink|Flink CDC 同步mysql数据
- 信息化建设|业务发展陷入停滞,决策没有信息支撑,数据分析才是解决方案
- #|Spark Streaming与流处理
- 笔记|hadoop核心组件——HDFS系列讲解之HDFS其他功能介绍