blink|blink 改造,http方式提交sql任务

1.现状 1.1目前现有的提交flink任务流程:
1.写代码,打fat jar
2.flink web界面上传jar包或者rest api上传jar包,拿到jarId
3.flink web界面上点击jar,填写args等,提交任务。或者发送rest api。
1.2缺点
1.写代码,打jar,上传jar流程繁琐,一个jar包上百M。慢。
2.代码稍作修改又要重复1。
1.3优化空间
1.写sql文件,逻辑都在sql里,sql文件上传到jobmanager所在机器。通过http方式提交任务,jobmanager接收到http请求后本地执行./sql-client.sh sql文件。逻辑更改只要稍作修改sql就可以。
通过sql-client 提交sql文件可以参考我之前写的文章 https://www.jianshu.com/p/603b5343e4a7
2.用sql做流处理,离线目前也是sql(hive),sql可以稍作修改就可以复用。
局限性:不适用于比较复杂的逻辑,比如要用到很多类。
2 blink代码修改 2.1 sql-client模块

org.apache.flink.table.client.SqlClient和CliOptions

稍作修改,增加一个入参是sql的文件目录。(略)
2.2 flink-runtime-web模块
这个模块主要是flink-web界面的接口。目前要增加一个接口来接收sql文件目录,然后通过sql-client提交到集群运行。
类似类JarRunHandler
/** * Handler to submit jobs uploaded via the Web UI. */ public class JarRunHandler extends AbstractRestHandler

定义一个JarSqlRunHandler
public class JarRunSqlHandler extends AbstractRestHandler {

核心逻辑是拿到JarRunSqlRequestBody拼成一个数组 数组内容大概是
/bin/sql-client.sh embedded --sqlPath *** -e *** -j *** -s *** -l *** -p ***
然后执行本地命令:
pro = Runtime.getRuntime().exec(commands.toArray(new String[commands.size()])); pro.waitFor();

接收结果:
in = pro.getInputStream(); read = new BufferedReader(new InputStreamReader(in)); Stream lines = read.lines(); resultList = lines.collect(Collectors.toList());

parse下这个resultList拿到里面的jobId,返回。
3.演示一下
1.修改完代码 mvn clean install -Dmaven.test.skip -Dcheckstyle.skip -Drat.ignoreErrors=true 然后启动集群
2.编辑一个sql文件: 业务逻辑是读取一个csv文件,过滤一下写到另一个csv文件。

blink|blink 改造,http方式提交sql任务
文章图片
image.png 3.postman提交http请求 blink|blink 改造,http方式提交sql任务
文章图片
image.png 4.结果验证 localhost:8081查看任务

blink|blink 改造,http方式提交sql任务
文章图片
image.png 数据验证

blink|blink 改造,http方式提交sql任务
文章图片
image.png 【blink|blink 改造,http方式提交sql任务】得到了预期结果。
总结
至此我们就可以通过先写完sql,上传到flink jobmanager的可读目录,发送http请求就可以提交一个sql任务(批流都可以)。
通过这种方式可大大减少代码开发量,简化任务提交方式,能用于业务逻辑频繁修改的场景,同时兼顾批和流,使批流sql逻辑可以复用。

    推荐阅读