Flink|Flink 实践教程-进阶(2)(复杂格式数据抽取)
作者:腾讯云流计算 Oceanus 团队
流计算 Oceanus 简介
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。本文将为您详细介绍如何实时获取 CKafka 中的 JSON 格式数据,经过数据抽取、平铺转换后存入 MySQL 中。
操作视频
前置准备
创建流计算 Oceanus 集群
进入流计算 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考流计算 Oceanus 官方文档 创建独享集群 [2]。
创建消息队列 CKafka
进入 CKafka 控制台 [3],点击左上角【新建】,即可完成 CKafka 的创建,具体可参考 CKafka 创建实例 [4]。创建 Topic:进入 CKafka 实例,点击【topic 管理】>【新建】,即可完成 Topic 的创建,具体可参考 CKafka 创建 Topic [5]。数据准备:进入同子网的 CVM 下,启动 Kafka 客户端,模拟发送数据,具体操作参见 运行 Kafka 客户端 [6]。
// 数据格式
{
"id": 1,
"message": "流计算 Oceanus 1元限量秒杀活动",
"userInfo": {
"name": "张三",
"phone": ["12345678910", "8547942"]
},
"companyInfo": {
"name": "Tencent",
"address": "深圳市腾讯大厦"
}
}
创建 MySQL 实例
进入 MySQL 控制台 [7],点击【新建】。具体可参考官方文档 创建 MySQL 实例 [8]。
-- 建表语句
CREATE TABLE `oceanus_advanced2` (
`id`int (100) NOT NULL,
`message`varchar (100) NULL DEFAULT '',
`name`varchar (50)NULL DEFAULT '',
`phone`varchar (11)NULL DEFAULT '',
`company_name`varchar (100) NULL DEFAULT '',
`company_address` varchar (100) NULL DEFAULT '',
PRIMARY KEY (`id`)
) ENGINE = innodb
流计算 Oceanus 作业 1. 创建 Source
CREATE TABLE `kafka_json_source_table` (
`id`INT,
`message`STRING,
`userInfo`ROW<`name` STRING,`phone` ARRAY
>, -- 采用 ROW 嵌套 ARRAY 格式接收 JSON 字段 `companyInfo`MAP
-- 采用 MAP 格式接收 JSON 字段 ) WITH (
'connector' = 'kafka',
'topic' = 'oceanus_advanced2',-- 替换为您要消费的 Topic
'scan.startup.mode' = 'earliest-offset',-- 可以是 latest-offset/earliest-offset/specific-offsets/group-offsets/timestamp 的任何一种
'properties.bootstrap.servers' = '10.0.0.29:9092',-- 替换为您的 Kafka 连接地址
'properties.group.id' = 'testGroup',-- 必选参数, 一定要指定 Group ID
'format' = 'json',-- 定义 JSON 格式,部分其他格式可能不支持抽取平铺
'json.fail-on-missing-field' = 'false',-- 如果设置为 false, 则遇到缺失字段不会报错。
'json.ignore-parse-errors' = 'true'-- 如果设置为 true,则忽略任何解析报错。
);
2. 创建 Sink
CREATE TABLE `jdbc_upsert_sink_table` (
`id`INT,
`message`STRING,
`name`STRING,
`phone`STRING,
`company_name`STRING,
`company_address`STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.0.0.158:3306/testdb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',-- 请替换为您的实际 MySQL 连接参数
'table-name' = 'oceanus_advanced2',-- 需要写入的数据表
'username' = 'root',-- 数据库访问的用户名(需要提供 INSERT 权限)
'password' = 'Tencent123$',-- 数据库访问的密码
'sink.buffer-flush.max-rows' = '200',-- 批量输出的条数
'sink.buffer-flush.interval' = '2s'-- 批量输出的间隔
);
3. 编写业务 SQL
INSERT INTO `jdbc_upsert_sink_table`
SELECT
idASid,
messageASmessage,
userInfo.nameASname,-- 获取 Row 中成员采用.成员的方式
userInfo.phone[1]ASphone,-- 获取 Array 中成员采用 [数组下标] 的方式
companyInfo['name']AScompany_name,-- 获取 Map 中成员采用 ['属性名'] 的方式
companyInfo['address']AScompany_address
FROM `kafka_json_source_table`;
新版 Flink 1.13 集群无需用户选择内置 Connector,平台自动匹配获取总结 本文详细介绍了如何通过 SQL 作业定义和获取 MAP、ARRAY、ROW 类型数据。更多内置运算符和函数请参考流计算 Oceanus 官方文档 [9]。
参考链接 [1]流计算 Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview
[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298
[3] CKafka 控制台:https://console.cloud.tencent.com/ckafka/index?rid=1
[4] CKafka 创建实例:https://cloud.tencent.com/document/product/597/54839
[5] Ckafka 创建 Topic:https://cloud.tencent.com/document/product/597/54854
[6] 运行 Kafka 客户端:https://cloud.tencent.com/document/product/597/56840
[7] MySQL 控制台:https://console.cloud.tencent.com/cdb
[8] 创建 MySQL 实例:https://cloud.tencent.com/document/product/236/46433
[9] 内置运算符和函数:https://cloud.tencent.com/document/product/849/18083
流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓
文章图片
文章图片
【Flink|Flink 实践教程-进阶(2)(复杂格式数据抽取)】关注“腾讯云大数据”公众号,技术交流、最新活动、服务专享一站Get~
推荐阅读
- 2.6|2.6 Photoshop操作步骤的撤消和重做 [Ps教程]
- 漫画初学者如何学习漫画背景的透视画法(这篇教程请收藏好了!)
- 不废话,代码实践带你掌握|不废话,代码实践带你掌握 强缓存、协商缓存!
- 六项精进20180530
- 数据库|SQL行转列方式优化查询性能实践
- 【Day31课后实践】
- 用npm发布一个包的教程并编写一个vue的插件发布
- 20180322【w4复盘日志】
- 狗狗定点大小便视频教程下载地址
- SwiftUI|SwiftUI iOS 瀑布流组件之仿CollectionView不规则图文混合(教程含源码)