数字化时代,业务的实时处理需求越来越迫切,实时预警、实时风控、实时推荐等,Flink作为新一代流批统一的计算引擎,具有独特的天然流式计算特性和更为先进的架构设计的特点,它可以从不同的第三方存储引擎中读取数据,进行处理,然后再写出到另外的存储引擎中。
GES拥抱变化,开发了与Flink的对接工具GES-Flink-Connector。GES-Flink-Connector是一款自定义的离线/实时数据同步Flink连接器(connector),用于外部数据源与GES图数据库的数据同步。Connector的作用就相当于一个连接器,连接 Flink 计算引擎跟外界存储系统。GES-Flink-Connector具备流批统一的能力,对于离线计算与流计算的数据都可以写入GES图数据库中。利用Flink连接器机制,只要实现了数据源的Source Connector读取数据,就可以通过GES-Flink-Connector将数据进行自定义转换并导入到GES图数据库中。
GES-Flink-Connector的架构图如下所示:
文章图片
功能介绍
GES-Flink-Connector具备如下能力:
- 流批统一,支持流数据与批数据
- 数据导入支持三种提交模式,批量提交、间隔提交、混合提交
- 利用Flink提供的Checkpoint机制,具备一定的容错能力
- 具备导入失败处理能力,批导入失败转单条导入,单条导入失败转存储
- 具备脏数据发现能力,验证属性数量是否符合要求,验证label是否存在
- 具备脏数据和错误数据存储能力,可将数据存储到LOCAL、OBS、HDFS
- 具备错误数据限制能力,当错误率达到一定上限时,停止任务
以向GES中导入JDBC离线数据为例,操作步骤如下:
- 将GES-Flink-Connector jar包打入本地maven仓库
mvn install:install-file -DgroupId=com.huawei.ges -DartifactId=ges-flink-connector -Dversion=1.0.0 -Dpackaging=jar -Dfile=../jars/ges-flink-connector-1.0.0.jar
- 添加相关maven依赖(flink版本需高于1.7.2)
com.huawei.ges ges-flink-connector1.0.0
- 配置相关参数
- 编写数据转换方法
// T is your data type public class GraphStringDataConverter implements GraphDataConverter
{ /** * Your convert method. * Separate your data fields with commas * e.g. * vertex * id, label, property 1, property 2,… * edge * id 1, id 2, label, property 1, property 2, … * * @param t your data * @return format string */ @Override public String convert(T t) { // Implement your transformation method String s = ... return s; } }
- 创建flink任务
// ------------------------flink环境创建---------------------------------- // 创建flink流数据环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行度 environment.setParallelism(CONCURRENT_COUNT); // 开启checkpoint 设置checkpoint时间间隔与checkpoint模式 environment.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE); // -------------------------数据源获取------------------------------------- // table schema TypeInformation[] fieldTypes = new TypeInformation[]{ // id BasicTypeInfo.INT_TYPE_INFO, // label BasicTypeInfo.STRING_TYPE_INFO, // property 1 BasicTypeInfo.STRING_TYPE_INFO // ... }; RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes); // query sql String querySql = "select * from {$your_table_name}"; // 数据源获取,JDBCInputFormat 读出来数据为flink Row类型 DataStream
dataSource = environment.createInput( JDBCInputFormat.buildJDBCInputFormat() .setDrivername("com.mysql.jdbc.Driver") .setDBUrl("your_mysql_jdbc_url") .setUsername("you_mysql_username") .setPassword("you_mysql_password") .setQuery(querySql) .setRowTypeInfo(rowTypeInfo) .finish()); // -------------------------输出源配置--------------------------------------- // 读取配置文件 Properties gesProp = new Properties(); InputStream in = GraphFlinkConnectorJdbcVertexExample.class.getClassLoader().getResourceAsStream("config.properties"); gesProp.load(in); // 创建flink Row数据转为要求的逗号分隔字符串的策略 GraphDataConverter
graphRowDataConvert = new GraphRowDataConvert(); GraphDataConvertStrategy
> rowConvertStrategy = new GraphDataConvertStrategy<>(graphRowDataConvert); // 创建batch输出方法,并添加转化策略与配置文件 GraphBatchOutputFormat
outputFormat = new GraphBatchOutputFormat<>(rowConvertStrategy, gesProp); // 创建sink输出方法 GraphSinkFunction
sinkFunction = new GraphSinkFunction<>(outputFormat); // 为数据源添加输出方法 dataSource.addSink(sinkFunction).setParallelism(CONCURRENT_COUNT); // 启动flink environment.execute();
文章图片
GES-Flink-Connector-DLI版本用于云上DLI Flink队列,采用Flink SQL的方式完成数据到GES的导入,操作步骤如下:
- 修改jar包内config.properties参数配置
- 将jar包导入OBS
文章图片
- DLI创建程序包(数据管理-程序包管理-创建程序包)
文章图片
- DLI购买队列并创建Flink作业
文章图片
- 创建DLI Flink队列与GES图服务的对等连接(跨源连接-创建连接)
文章图片
- 编辑Flink SQL
# SOURCE表示数据源,可以是DLI支持的任意数据源 CREATE SOURCE STREAM v_labels ( id STRING, label STRING, uuid STRING, d1 STRING, d2 STRING ) WITH ( type = "obs", bucket = "your bucket", region = "your region", object_name = "your file", row_delimiter = "\n", field_delimiter = "," ); # SINK表示输出源 为GES图数据库 CREATE SINK STREAM ges_sink ( id STRING, label STRING, uuid STRING, d1 STRING, d2 STRING ) WITH ( type = "user_defined", type_class_name = "com.huawei.ges.flink.connector.sink.GraphSinkFunction", -- 指定sinkFunction type_class_parameter = "" ); # Some data processing ...# 执行数据由输入源导入输出源 INSERT INTO ges_sink SELECT * -- 选择想要输出的字段 FROM v_labels;
【图数据库的易用性—GES与Flink的对接】
文章图片
本文由华为云发布
推荐阅读
- linux|答应我不要再用Xshell了 这个新开源的终端工具酷炫又好用
- 程序员|怎么用Redis分布式锁才能确保万无一失(进阶加薪全靠它!)
- 程序员|SpringCloud微服务详解(最新“美团+字节+腾讯”三面问题)
- 程序员|SpringBoot整合!Java程序员月薪20k的涨薪秘籍,真香!
- 程序员|腾讯Redis压轴笔记,成功入职阿里
- 华为云“DDoS高防+CDN”联动
- 华为云会议构筑全场景智能云会议,使能行业创新协作
- 华为云CDN加速WAF防护资源实践
- 华为云会议SmartRooms,打造全场景云端协同智能会议室