java中使用SparkLauncher提交spark应用

将开发好的spark application(对于java/scala来说是jar)提交到spark集群执行的方式通常包括两种,一种是通常使用的spark submit脚本(spark 2.x版本是spark2-shell,可以参考https://www.jianshu.com/p/1d41174441b6),另一种是spark官方提供的java API SparkLauncher类。下面来介绍使用SparkLauncher来完成spark application提交的方法。
既然要将application jar提交给spark集群处理,那么首先就涉及到两个问题:

    1. 待提交的application jar,该jar可以将application打包得到;
    1. 提交1中application jar(即调用SparkLauncher)的代码(既可以在本地执行,也可以打包成jar和1中的application jar一起放在spark集群某台机器上,本文就是打包成jar)。
Step 1. 开发好待提交的application并打包成jar(以下称appJar) ......
Step 2. 开发好完成提交过程的代码并打包成jar(以下称submitJar) 【java中使用SparkLauncher提交spark应用】完成提交过程的代码如下:
// 待提交给spark集群处理的spark application jar(即appJar)所在路径 String appJarName = "/usr/data/sparkProject/jars/testSpark.jar" SparkLauncher launcher = new SparkLauncher(); launcher.setAppResource(appJarName); // 设置spark driver主类,即appJar的主类 launcher.setMainClass("com.companyA.test.SparkTestMain"); // 添加传递给spark driver mian方法的参数 launcher.addAppArgs(arg1, arg2, arg3 ); // 设置该spark application的master launcher.setMaster("yarn"); // 在yarn-cluster上启动,也可以再local[*]上 // 关闭sparksubmit的详细报告 launcher.setVerbose(false); // 设置用于执行appJar的spark集群分配的driver、executor内存等参数 launcher.setConf(SparkLauncher.DRIVER_MEMORY, "2g"); launcher.setConf(SparkLauncher.EXECUTOR_MEMORY, "1g"); launcher.setConf(SparkLauncher.EXECUTOR_CORES, 16); launcher.setConf("spark.default.parallelism", 128); launcher.setConf("spark.executor.instances", 16); // 启动执行该application SparkAppHandle handle = launcher.startApplication(); // application执行失败重试机制 // 最大重试次数 booleanfailedflag = false; int maxRetrytimes = 3; int currentRetrytimes = 0; while (handle.getState() != SparkAppHandle.State.FINISHED) { currentRetrytimes ++; // 每6s查看application的状态(UNKNOWN、SUBMITTED、RUNNING、FINISHED、FAILED、KILLED、 LOST) Thread.sleep(6000L); System.out.println("applicationId is: " + handle.getAppId()); System.out.println("current state: " + handle.getState()); if ((handle.getAppId() == null && handle.getState() == SparkAppHandle.State.FAILED ) && currentRetrytimes > maxRetrytimes){ System.out.println(String.format("tried launching application for %s times but failed, exit.", maxRetrytimes)); failedflag = true; break; } }

Step 3. 将jar包放到spark集群上 将上述appJar和submitJar放置在spark集群某台机器的同一路径下,并启动submitJar,appJar便能够提交给spark集群处理了。

    推荐阅读