flink|flink on yarn启动流程分析

本文主要分析提交一个flink on yarn的任务的流程,以job模式为例子

  • CliFront作为提交命令行的入口
  • 一个命令行runJob的整体调用链路如下
    • run -> runProgram -> executeProgram -> ClusterClient.run
其中yarn相关的流程就在runProgram方法中
// 获取激活的customCommandLine final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine); try { runProgram(customCommandLine, commandLine, runOptions, program); } finally { program.deleteExtractedLibraries(); }private void runProgram( CustomCommandLine customCommandLine, CommandLine commandLine, RunOptions runOptions, PackagedProgram program) throws ProgramInvocationException, FlinkException { // 从customCommandLine获取到ClusterDescriptor final ClusterDescriptor clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine); ... // 集群部署 client = clusterDescriptor.deployJobCluster( clusterSpecification, jobGraph, runOptions.getDetachedMode()); // 执行任务提交逻辑 executeProgram(program, client, userParallelism); .... }

可以看到集群部署的逻辑是通过customCommandLine.createClusterDescriptor获取到的ClusterDescriptor来实现的。所以我们首先看一下获取customCommandLine的逻辑
public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) { for (CustomCommandLine cli : customCommandLines) { if (cli.isActive(commandLine)) { return cli; } } throw new IllegalStateException("No command-line ran."); }

就是遍历所有的CustomCommandLine,然后选取被激活的头一个,看一下customCommandLines有哪些,初始化customCommandLines的逻辑就是在CliFrontend.loadCustomCommandLines方法
public static List> loadCustomCommandLines(Configuration configuration, String configurationDirectory) { List> customCommandLines = new ArrayList<>(2); // Command line interface of the YARN session, with a special initialization here // to prefix all options with y/yarn. // Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the //active CustomCommandLine in order and DefaultCLI isActive always return true. final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli"; try { customCommandLines.add( loadCustomCommandLine(flinkYarnSessionCLI, configuration, configurationDirectory, "y", "yarn")); } catch (NoClassDefFoundError | Exception e) { LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e); }customCommandLines.add(new DefaultCLI(configuration)); return customCommandLines; }

实际上就两个customCommandLine,一个DefaultCLI,一个FlinkYarnSessionCli。我们直接看FlinkYarnSessionCli的isActive方法
@Override public boolean isActive(CommandLine commandLine) { String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null); boolean yarnJobManager = ID.equals(jobManagerOption); boolean yarnAppId = commandLine.hasOption(applicationId.getOpt()); return yarnJobManager || yarnAppId || (isYarnPropertiesFileMode(commandLine) && yarnApplicationIdFromYarnProperties != null); }

可以看到其中一个条件是-m选项为 private static final String ID = "yarn-cluster"; 即可。
我们看一下FlinkYarnSessionCli的createClusterDescriptor方法,最后返回的是一个YarnClusterDescriptor对象。所以我们直接看它的deployJobCluster方法,该方法最后调用的是AbstractYarnClusterDescriptor.startAppMaster
public ApplicationReport startAppMaster( Configuration configuration, String applicationName, String yarnClusterEntrypoint, JobGraph jobGraph, YarnClient yarnClient, YarnClientApplication yarnApplication, ClusterSpecification clusterSpecification) throws Exception { ...// 构建集群的启动方法 final ContainerLaunchContext amContainer = setupApplicationMasterContainer( yarnClusterEntrypoint, hasLogback, hasLog4j, hasKrb5, clusterSpecification.getMasterMemoryMB()); ...// set classpath from YARN configuration Utils.setupYarnClassPath(yarnConfiguration, appMasterEnv); amContainer.setEnvironment(appMasterEnv); // Set up resource type requirements for ApplicationMaster Resource capability = Records.newRecord(Resource.class); capability.setMemory(clusterSpecification.getMasterMemoryMB()); capability.setVirtualCores(flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES)); final String customApplicationName = customName != null ? customName : applicationName; appContext.setApplicationName(customApplicationName); appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink"); // 设置ApplicationMaster的启动方法 appContext.setAMContainerSpec(amContainer); appContext.setResource(capability); if (yarnQueue != null) { appContext.setQueue(yarnQueue); }setApplicationNodeLabel(appContext); setApplicationTags(appContext); // add a hook to clean up in case deployment fails Thread deploymentFailureHook = new DeploymentFailureHook(yarnApplication, yarnFilesDir); Runtime.getRuntime().addShutdownHook(deploymentFailureHook); LOG.info("Submitting application master " + appId); // 提交Application yarnClient.submitApplication(appContext); ... }

startAppMaster也是一堆逻辑,我们直接看主要的,主要逻辑就是构建ContainerLaunchContext,然后将ContainerLaunchContext作为提交参数之一提交一个yarn app。看一下构造ContainerLaunchContext的方法逻辑
protected ContainerLaunchContext setupApplicationMasterContainer( String yarnClusterEntrypoint, boolean hasLogback, boolean hasLog4j, boolean hasKrb5, int jobManagerMemoryMb) { // ------------------ Prepare Application Master Container------------------------------// respect custom JVM options in the YAML file String javaOpts = flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS); if (flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length() > 0) { javaOpts += " " + flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS); } //applicable only for YarnMiniCluster secure test run //krb5.conf file will be available as local resource in JM/TM container if (hasKrb5) { javaOpts += " -Djava.security.krb5.conf=krb5.conf"; }// Set up the container launch context for the application master ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); finalMap, String> startCommandValues = new HashMap<>(); startCommandValues.put("java", "$JAVA_HOME/bin/java"); int heapSize = Utils.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration); String jvmHeapMem = String.format("-Xms%sm -Xmx%sm", heapSize, heapSize); startCommandValues.put("jvmmem", jvmHeapMem); startCommandValues.put("jvmopts", javaOpts); String logging = ""; if (hasLogback || hasLog4j) { logging = "-Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\""; if (hasLogback) { logging += " -Dlogback.configurationFile=file:" + CONFIG_FILE_LOGBACK_NAME; }if (hasLog4j) { logging += " -Dlog4j.configuration=file:" + CONFIG_FILE_LOG4J_NAME; } }startCommandValues.put("logging", logging); // 将yarnClusterEntrypoint作为Container启动命令的入口类 startCommandValues.put("class", yarnClusterEntrypoint); startCommandValues.put("redirects", "1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out " + "2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err"); startCommandValues.put("args", ""); final String commandTemplate = flinkConfiguration .getString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE, ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE); // 根据参数构建启动command final String amCommand = BootstrapTools.getStartCommand(commandTemplate, startCommandValues); // 设置command amContainer.setCommands(Collections.singletonList(amCommand)); LOG.debug("Application Master start command: " + amCommand); return amContainer; }

最后我们看一下yarnClusterEntrypoint的来源,其实就是YarnSessionClusterEntrypoint(session模式)和YarnJobClusterEntrypoint(job模式)的两个类名。所以这个向yarn提交的AppMaster的逻辑实际上就是执行了YarnJobClusterEntrypoint/YarnSessionClusterEntrypoint的main方法。我们直接看YarnJobClusterEntrypoint的main方法。
public static void main(String[] args) { // startup checks and logging EnvironmentInformation.logEnvironmentInfo(LOG, YarnJobClusterEntrypoint.class.getSimpleName(), args); SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); Map, String> env = System.getenv(); final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key()); Preconditions.checkArgument( workingDirectory != null, "Working directory variable (%s) not set", ApplicationConstants.Environment.PWD.key()); try { YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG); } catch (IOException e) { LOG.warn("Could not log YARN environment information.", e); }Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, env, LOG); YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint( configuration, workingDirectory); // 启动逻辑 ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint); }

最后调用了ClusterEntrypoint.runCluster方法
private void runCluster(Configuration configuration) throws Exception { synchronized (lock) { initializeServices(configuration); // write host information into configuration configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort()); // 构建启动集群相关组件的factory final DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration); // 调用factory的create方法 clusterComponent = dispatcherResourceManagerComponentFactory.create( configuration, commonRpcService, haServices, blobServer, heartbeatServices, metricRegistry, archivedExecutionGraphStore, new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()), this); clusterComponent.getShutDownFuture().whenComplete( (ApplicationStatus applicationStatus, Throwable throwable) -> { if (throwable != null) { shutDownAsync( ApplicationStatus.UNKNOWN, ExceptionUtils.stringifyException(throwable), false); } else { // This is the general shutdown path. If a separate more specific shutdown was // already triggered, this will do nothing shutDownAsync( applicationStatus, null, true); } }); } }

DispatcherResourceManagerComponentFactory有两个具体子类, JobDispatcherResourceManagerComponentFactorySessionDispatcherResourceManagerComponentFactory,分别代表了session和job模式。然后createDispatcherResourceManagerComponentFactory方法在各个ClusterEntrypoint的子类均有实现,实际差别就是构建的ResourceManagerFactory类有区别。即资源申请方式有区别。
接下来看一下各个组件的构建
public DispatcherResourceManagerComponent create( Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, ArchivedExecutionGraphStore archivedExecutionGraphStore, MetricQueryServiceRetriever metricQueryServiceRetriever, FatalErrorHandler fatalErrorHandler) throws Exception {// 选主服务,为了HA LeaderRetrievalService dispatcherLeaderRetrievalService = null; LeaderRetrievalService resourceManagerRetrievalService = null; // webUI后端服务 WebMonitorEndpoint webMonitorEndpoint = null; // 资源管理服务 ResourceManager resourceManager = null; // metric服务 JobManagerMetricGroup jobManagerMetricGroup = null; // 对外交互的dispatcher T dispatcher = null; try { dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever(); resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever(); final LeaderGatewayRetriever dispatcherGatewayRetriever = new RpcGatewayRetriever<>( rpcService, DispatcherGateway.class, DispatcherId::fromUuid, 10, Time.milliseconds(50L)); final LeaderGatewayRetriever resourceManagerGatewayRetriever = new RpcGatewayRetriever<>( rpcService, ResourceManagerGateway.class, ResourceManagerId::fromUuid, 10, Time.milliseconds(50L)); final ExecutorService executor = WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS), configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), "DispatcherRestEndpoint"); final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL); final MetricFetcher metricFetcher = updateInterval == 0 ? VoidMetricFetcher.INSTANCE : MetricFetcherImpl.fromConfiguration( configuration, metricQueryServiceRetriever, dispatcherGatewayRetriever, executor); webMonitorEndpoint = restEndpointFactory.createRestEndpoint( configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, blobServer, executor, metricFetcher, highAvailabilityServices.getWebMonitorLeaderElectionService(), fatalErrorHandler); log.debug("Starting Dispatcher REST endpoint."); webMonitorEndpoint.start(); final String hostname = getHostname(rpcService); jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup( metricRegistry, hostname, ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration)); // 构建resourceManager resourceManager = resourceManagerFactory.createResourceManager( configuration, ResourceID.generate(), rpcService, highAvailabilityServices, heartbeatServices, metricRegistry, fatalErrorHandler, new ClusterInformation(hostname, blobServer.getPort()), webMonitorEndpoint.getRestBaseUrl(), jobManagerMetricGroup); final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint); // 构建dispatcher dispatcher = dispatcherFactory.createDispatcher( configuration, rpcService, highAvailabilityServices, resourceManagerGatewayRetriever, blobServer, heartbeatServices, jobManagerMetricGroup, metricRegistry.getMetricQueryServiceGatewayRpcAddress(), archivedExecutionGraphStore, fatalErrorHandler, historyServerArchivist); log.debug("Starting ResourceManager."); resourceManager.start(); resourceManagerRetrievalService.start(resourceManagerGatewayRetriever); log.debug("Starting Dispatcher."); dispatcher.start(); dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever); return createDispatcherResourceManagerComponent( dispatcher, resourceManager, dispatcherLeaderRetrievalService, resourceManagerRetrievalService, webMonitorEndpoint, jobManagerMetricGroup); } catch (Exception exception) { // clean up all started components if (dispatcherLeaderRetrievalService != null) { try { dispatcherLeaderRetrievalService.stop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } }if (resourceManagerRetrievalService != null) { try { resourceManagerRetrievalService.stop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } }final Collection> terminationFutures = new ArrayList<>(3); if (webMonitorEndpoint != null) { terminationFutures.add(webMonitorEndpoint.closeAsync()); }if (resourceManager != null) { terminationFutures.add(resourceManager.closeAsync()); }if (dispatcher != null) { terminationFutures.add(dispatcher.closeAsync()); }final FutureUtils.ConjunctFuture terminationFuture = FutureUtils.completeAll(terminationFutures); try { terminationFuture.get(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); }if (jobManagerMetricGroup != null) { jobManagerMetricGroup.close(); }throw new FlinkException("Could not create the DispatcherResourceManagerComponent.", exception); } }

到此为止集群就已经启动完毕了,接下来我们回到deployJobCluster方法来看如何构建ClusterClient了
protected ClusterClient deployInternal( ClusterSpecification clusterSpecification, String applicationName, String yarnClusterEntrypoint, @Nullable JobGraph jobGraph, boolean detached) throws Exception { // the Flink cluster is deployed in YARN. Represent cluster return createYarnClusterClient( this, validClusterSpecification.getNumberTaskManagers(), validClusterSpecification.getSlotsPerTaskManager(), report, flinkConfiguration, true); }@Override protected ClusterClient createYarnClusterClient( AbstractYarnClusterDescriptor descriptor, int numberTaskManagers, int slotsPerTaskManager, ApplicationReport report, Configuration flinkConfiguration, boolean perJobCluster) throws Exception { return new RestClusterClient<>( flinkConfiguration, report.getApplicationId()); }RestClusterClient( Configuration configuration, @Nullable RestClient restClient, T clusterId, WaitStrategy waitStrategy, @Nullable LeaderRetrievalService webMonitorRetrievalService) throws Exception { super(configuration); this.restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(configuration); if (restClient != null) { this.restClient = restClient; } else { // 构建RestClient this.restClient = new RestClient(restClusterClientConfiguration.getRestClientConfiguration(), executorService); }this.waitStrategy = Preconditions.checkNotNull(waitStrategy); this.clusterId = Preconditions.checkNotNull(clusterId); if (webMonitorRetrievalService == null) { this.webMonitorRetrievalService = highAvailabilityServices.getWebMonitorLeaderRetriever(); } else { this.webMonitorRetrievalService = webMonitorRetrievalService; } this.dispatcherRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever(); this.retryExecutorService = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-RestClusterClient-Retry")); startLeaderRetrievers(); }

RestClient实际上就是一个基于netty实现的http client。到此为止集群以及能对该集群发起请求的client已经构造好了。接下来就是提交作业了
最后调用RestClusterClient.submitJob方法
@Override public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { log.info("Submitting job {} (detached: {}).", jobGraph.getJobID(), isDetached()); final CompletableFuture jobSubmissionFuture = submitJob(jobGraph); if (isDetached()) { try { return jobSubmissionFuture.get(); } catch (Exception e) { throw new ProgramInvocationException("Could not submit job", jobGraph.getJobID(), ExceptionUtils.stripExecutionException(e)); } } else { final CompletableFuture jobResultFuture = jobSubmissionFuture.thenCompose( ignored -> requestJobResult(jobGraph.getJobID())); final JobResult jobResult; try { jobResult = jobResultFuture.get(); } catch (Exception e) { throw new ProgramInvocationException("Could not retrieve the execution result.", jobGraph.getJobID(), ExceptionUtils.stripExecutionException(e)); }try { this.lastJobExecutionResult = jobResult.toJobExecutionResult(classLoader); return lastJobExecutionResult; } catch (JobExecutionException e) { throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), e); } catch (IOException | ClassNotFoundException e) { throw new ProgramInvocationException("Job failed.", jobGraph.getJobID(), e); } } }

最后调用的url其实是到了JobSubmitHandler的处理逻辑里,如何最后调用的就是DispatcherGateway.submitJob方法了,接下来的逻辑就和上文类似了
@Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { final Collection uploadedFiles = request.getUploadedFiles(); final Map, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap( File::getName, Path::fromLocalFile )); if (uploadedFiles.size() != nameToFile.size()) { throw new RestHandlerException( String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s", uploadedFiles.size() < nameToFile.size() ? "lower" : "higher", nameToFile.size(), uploadedFiles.size()), HttpResponseStatus.BAD_REQUEST ); }final JobSubmitRequestBody requestBody = request.getRequestBody(); if (requestBody.jobGraphFileName == null) { throw new RestHandlerException( String.format("The %s field must not be omitted or be null.", JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH), HttpResponseStatus.BAD_REQUEST); } // 从request body里反序列化出JobGraph CompletableFuture jobGraphFuture = loadJobGraph(requestBody, nameToFile); Collection jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile); Collection> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile); // 将job的资源文件上传blobServer CompletableFuture finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration); // 调用dispatcher的submitJob方法 CompletableFuture jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout)); return jobSubmissionFuture.thenCombine(jobGraphFuture, (ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID())); }

关于on k8s 从yarn的流程我们也可以比较容易的迁移到k8s的流程,
  1. 实现一份k8s的CustomCommandLine,用来构建返回k8s相关的集群createClusterDescriptor
  2. 实现k8s的ClusterDescriptor,用于deploy集群
  3. 实现基于k8s的ResourceManager
【flink|flink on yarn启动流程分析】k8s上大致要做的应该就是实现JobManager/TaskManger相应的operator

    推荐阅读