阅读 204

解析spark源码yarn-cluster模式任务提交

目录一,运行命令二,任务提交流程图三,启动脚本四,程序入口类org.apache.spark.deploy.SparkSubmit五,org.apache.spark.deploy.yarn.Yarn...

目录
  • 一,运行命令

  • 二,任务提交流程图

  • 三,启动脚本

  • 四,程序入口类org.apache.spark.deploy.SparkSubmit

  • 五,org.apache.spark.deploy.yarn.YarnClusterApplication类

  • 六, org.apache.spark.deploy.yarn.ApplicationMaster 类。


一,运行命令

bin/spark-submit \--master yarn \--deploy-mode cluster \--class org.apache.spark.examples.SparkPi \examples/jars/spark-examples_2.11-2.3.1.3.0.1.0-187.jar


二,任务提交流程图

在这里插入图片描述


三,启动脚本

查看spark-submit 脚本文件,程序入口为

在这里插入图片描述

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@“

查看${SPARK_HOME}"/bin/spark-class可知该脚本执行了java -cp main-class 命令启动了一个java进程,进程名为SparkSubmit,main函数在主类org.apache.spark.deploy.SparkSubmit中。

实际执行的具体命令为:

/etc/alternatives/jre/bin/java -Dhdp.version=3.0.1.0-187 -cp /usr/hdp/3.0.1.0-187/spark2/conf/:/usr/hdp/3.0.1.0-187/spark2/jars/*:/usr/hdp/3.0.1.0-187/hadoop/conf/ -Xmx1g org.apache.spark.deploy.SparkSubmit --master yarn --class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.3.1.3.0.1.0-187.jar


四,程序入口类org.apache.spark.deploy.SparkSubmit

该类有个伴生对象,其中有main函数,创建了SparkSubmit对象并执行doSubmit();

override def main(args: Array[String]): Unit = {val submit = new SparkSubmit() {...}submit.doSubmit(args)}

doSubmit 解析args参数,封装到appArgs:SparkSubmitArguments对象中,然后执行submit(appArgs, uninitLog)。

def doSubmit(args: Array[String]): Unit = {// Initialize logging if it hasn't been done yet. Keep track of whether logging needs to// be reset before the application starts.val uninitLog = initializeLogIfNecessary(true, silent = true)val appArgs = parseArguments(args)if (appArgs.verbose) {logInfo(appArgs.toString)}appArgs.action match {case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)case SparkSubmitAction.KILL => kill(appArgs)case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)case SparkSubmitAction.PRINT_VERSION => printVersion()}}

submit(appArgs, uninitLog) 调用 runMain(args: SparkSubmitArguments, uninitLog: Boolean)

private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)...try {mainClass = Utils.classForName(childMainClass)} catch {...}val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]} else {new JavaMainApplication(mainClass)}...try {app.start(childArgs.toArray, sparkConf)} catch {case t: Throwable =>throw findCause(t)}}

这里mainClass十分重要,先判读mainClass是否是SparkApplication的子类,如果是则通过反射调用其构造器创建对象;

如果不是则创建一个JavaMainApplication(是SparkApplication的子类)对象并在其override def start(args: Array[String], conf: SparkConf)函数中利用反射执行mainClass中main函数。

SparkApplication创建完毕后执行其start(childArgs.toArray, sparkConf) 方法。

/*** Entry point for a Spark application. Implementations must provide a no-argument constructor.*/private[spark] trait SparkApplication {def start(args: Array[String], conf: SparkConf): Unit}/*** Implementation of SparkApplication that wraps a standard Java class with a "main" method.** Configuration is propagated to the application via system properties, so running multiple* of these in the same JVM may lead to undefined behavior due to configuration leaks.*/private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {override def start(args: Array[String], conf: SparkConf): Unit = {val mainMethod = klass.getMethod("main", new Array[String](0).getClass)if (!Modifier.isStatic(mainMethod.getModifiers)) {throw new IllegalStateException("The main method in the given main class must be static")}val sysProps = conf.getAll.toMapsysProps.foreach { case (k, v) =>sys.props(k) = v}mainMethod.invoke(null, args)}}

如果**–deploy-mode** 是client mainClass的值由命令行参数 –class 决定,也就是org.apache.spark.examples.SparkPi。

这种情况下会在当前虚拟机中执行客户端代码,如果是其它条件情况会比较复杂。

以上文指定的运行命令为例,这里mainClass是org.apache.spark.deploy.yarn.YarnClusterApplication类class对象。

private[deploy] val YARN_CLUSTER_SUBMIT_CLASS ="org.apache.spark.deploy.yarn.YarnClusterApplication"...if (isYarnCluster) {childMainClass = YARN_CLUSTER_SUBMIT_CLASSif (args.ispython) {childArgs += ("--primary-py-file", args.primaryResource)childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")} else if (args.isR) {val mainFile = new Path(args.primaryResource).getNamechildArgs += ("--primary-r-file", mainFile)childArgs += ("--class", "org.apache.spark.deploy.RRunner")} else {if (args.primaryResource != SparkLauncher.NO_RESOURCE) {childArgs += ("--jar", args.primaryResource)}childArgs += ("--class", args.mainClass)}if (args.childArgs != null) {args.childArgs.foreach { arg => childArgs += ("--arg", arg) }}}


五,org.apache.spark.deploy.yarn.YarnClusterApplication类

该类在spark-yarn包中。

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-yarn_${scala.version}</artifactId><version>${spark.version}</version></dependency>

开始执行其override def start(args: Array[String], conf: SparkConf) 方法。

private[spark] class YarnClusterApplication extends SparkApplication { override def start(args: Array[String], conf: SparkConf): Unit = {// SparkSubmit would use yarn cache to distribute files & jars in yarn mode,// so remove them from sparkConf here for yarn mode.conf.remove(JARS)conf.remove(FILES)new Client(new ClientArguments(args), conf, null).run()}}

SparkSubmi进程中创建一个客户端Client,该类是一个代理类其中包括YarnClient,执行run() 方法。

提交Application给yarn集群ResourceManager,提交成功后返回appid,

如果spark.submit.deployMode=cluster&&spark.yarn.submit.waitAppCompletion=true,

SparkSubmit进程会定期输出appId日志直到任务结束(monitorApplication(appId)),否则会输出一次日志然后退出。

def run(): Unit = {this.appId = submitApplication()if (!launcherBackend.isConnected() && fireAndForget) {val report = getApplicationReport(appId)val state = report.getYarnApplicationStatelogInfo(s"Application report for $appId (state: $state)")logInfo(formatReportDetails(report))if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {throw new SparkException(s"Application $appId finished with status: $state")}} else {val YarnAppReport(appState, finalState, diags) = monitorApplication(appId)if (appState == YarnApplicationState.FAILED || finalState == FinalApplicationStatus.FAILED) {diags.foreach { err =>logError(s"Application diagnostics message: $err")}throw new SparkException(s"Application $appId finished with failed status")}if (appState == YarnApplicationState.KILLED || finalState == FinalApplicationStatus.KILLED) {throw new SparkException(s"Application $appId is killed")}if (finalState == FinalApplicationStatus.UNDEFINED) {throw new SparkException(s"The final status of application $appId is undefined")}}}

继续跟踪submitApplication()

def submitApplication(): ApplicationId = {ResourceRequestHelper.validateResources(sparkConf)var appId: ApplicationId = nulltry {launcherBackend.connect()yarnClient.init(hadoopConf)yarnClient.start()logInfo("Requesting a new application from cluster with %d NodeManagers".format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))// Get a new application from our RMval newApp = yarnClient.createApplication()val newAppResponse = newApp.getNewApplicationResponse()appId = newAppResponse.getApplicationId()// The app staging dir based on the STAGING_DIR configuration if configured// otherwise based on the users home directory.val appStagingBaseDir = sparkConf.get(STAGING_DIR).map { new Path(_, UserGroupInformation.getCurrentUser.getShortUserName) }.getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),Option(appId.toString)).setCurrentContext()// Verify whether the cluster has enough resources for our AMverifyClusterResources(newAppResponse)// Set up the appropriate contexts to launch our AMval containerContext = createContainerLaunchContext(newAppResponse)val appContext = createApplicationSubmissionContext(newApp, containerContext)// Finally, submit and monitor the applicationlogInfo(s"Submitting application $appId to ResourceManager")yarnClient.submitApplication(appContext)launcherBackend.setAppId(appId.toString)reportLauncherState(SparkAppHandle.State.SUBMITTED)appId} catch {case e: Throwable =>if (stagingDirPath != null) {cleanupStagingDir()}throw e}

该方法做了如下工作(对应于任务提交流程图中的1,2,3):
1,向ResourceManager发送请求创建Application,获取全局唯一的
appId。
2,根据配置的缓存目录信息+appId信息,创建运行Application运行的缓存目录stagingDirPath。
3,verifyClusterResources 验证集群中是否有足够资源可用,没有的话抛出异常。
4,createContainerLaunchContext 创建Container,其中封装了Container进程的启动命令。
5,提交appContext。

查看createContainerLaunchContext(newAppResponse) 代码。

val amClass =if (isClusterMode) {Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName} else {Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName}...// Command for the ApplicationMasterval commands = prefixEnv ++Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++javaOpts ++ amArgs ++Seq("1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout","2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")// TODO: it would be nicer to just make sure there are no null commands hereval printableCommands = commands.map(s => if (s == null) "null" else s).toListamContainer.setCommands(printableCommands.asJava)

Container的启动代码大概为
bin/java -server org.apache.spark.deploy.yarn.ApplicationMaster --class …


六, org.apache.spark.deploy.yarn.ApplicationMaster 类。

yarn集群某一个NodeManager收到ResourceManager的命令,启动ApplicationMaster进程,对应任务提交流程图中的步骤4.
查看ApplicationMaster 伴生对象中的main方法。

def main(args: Array[String]): Unit = {SignalUtils.registerLogger(log)val amArgs = new ApplicationMasterArguments(args)val sparkConf = new SparkConf()if (amArgs.propertiesFile != null) {Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) =>sparkConf.set(k, v)}}// Set system properties for each config entry. This covers two use cases:// - The default configuration stored by the SparkHadoopUtil class// - The user application creating a new SparkConf in cluster mode//// Both cases create a new SparkConf object which reads these configs from system properties.sparkConf.getAll.foreach { case (k, v) =>sys.props(k) = v}val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))master = new ApplicationMaster(amArgs, sparkConf, yarnConf)val ugi = sparkConf.get(PRINCIPAL) match {// We only need to log in with the keytab in cluster mode. In client mode, the driver// handles the user keytab.case Some(principal) if master.isClusterMode =>val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()SparkHadoopUtil.get.loginUserFromKeytab(principal, sparkConf.get(KEYTAB).orNull)val newUGI = UserGroupInformation.getCurrentUser()if (master.appAttemptId == null || master.appAttemptId.getAttemptId > 1) {// Re-obtain delegation tokens if this is not a first attempt, as they might be outdated// as of now. Add the fresh tokens on top of the original user's credentials (overwrite).// Set the context class loader so that the token manager has access to jars// distributed by the user.Utils.withContextClassLoader(master.userClassLoader) {val credentialManager = new HadoopDelegationTokenManager(sparkConf, yarnConf, null)credentialManager.obtainDelegationTokens(originalCreds)}}// Transfer the original user's tokens to the new user, since it may contain needed tokens// (such as those user to connect to YARN).newUGI.addCredentials(originalCreds)newUGIcase _ =>SparkHadoopUtil.get.createSparkUser()}ugi.doAs(new PrivilegedExceptionAction[Unit]() {override def run(): Unit = System.exit(master.run())})}

创建了ApplicationMaster对象并执行其run() 方法。

final def run(): Int = {try {val attemptID = if (isClusterMode) {// Set the web ui port to be ephemeral for yarn so we don't conflict with// other spark processes running on the same boxSystem.setProperty(UI_PORT.key, "0")// Set the master and deploy mode property to match the requested mode.System.setProperty("spark.master", "yarn")System.setProperty(SUBMIT_DEPLOY_MODE.key, "cluster")// Set this internal configuration if it is running on cluster mode, this// configuration will be checked in SparkContext to avoid misuse of yarn cluster mode.System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())Option(appAttemptId.getAttemptId.toString)} else {None}new CallerContext("APPMASTER", sparkConf.get(APP_CALLER_CONTEXT),Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext()logInfo("ApplicationAttemptId: " + appAttemptId)// This shutdown hook should run *after* the SparkContext is shut down.val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1ShutdownHookManager.addShutdownHook(priority) { () =>val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)val isLastAttempt = appAttemptId.getAttemptId() >= maxAppAttemptsif (!finished) {// The default state of ApplicationMaster is failed if it is invoked by shut down hook.// This behavior is different compared to 1.x version.// If user application is exited ahead of time by calling System.exit(N), here mark// this application as failed with EXIT_EARLY. For a good shutdown, user shouldn't call// System.exit(0) to terminate the application.finish(finalStatus,ApplicationMaster.EXIT_EARLY,"Shutdown hook called before final status was reported.")}if (!unregistered) {// we only want to unregister if we don't want the RM to retryif (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {unregister(finalStatus, finalMsg)cleanupStagingDir(new Path(System.getenv("SPARK_YARN_STAGING_DIR")))}}}if (isClusterMode) {runDriver()} else {runExecutorLauncher()}} catch {case e: Exception =>// catch everything else if not specifically handledlogError("Uncaught exception: ", e)finish(FinalApplicationStatus.FAILED,ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,"Uncaught exception: " + StringUtils.stringifyException(e))} finally {try {metricssystem.foreach { ms =>ms.report()ms.stop()}} catch {case e: Exception =>logWarning("Exception during stopping of the metric system: ", e)}} exitCode}

执行runDriver()方法。
userClassThread = startUserApplication() 启动了一个名为Driver的线程,该线程中通过反射执行命令行中**–class指定的类(org.apache.spark.examples.SparkPi)中的main**函数,初始化SparkContext。主线程唤醒后,向ResourceManager注册ApplicationMaster,步骤5;

private def runDriver(): Unit = {addAmIpFilter(None, System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV))userClassThread = startUserApplication()// This a bit hacky, but we need to wait until the spark.driver.port property has// been set by the Thread executing the user class.logInfo("Waiting for spark context initialization...")val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)try {val sc = ThreadUtils.awaitResult(sparkContextPromise.future,Duration(totalWaitTime, TimeUnit.MILLISECONDS))if (sc != null) {val rpcEnv = sc.env.rpcEnvval userConf = sc.getConfval host = userConf.get(DRIVER_HOST_ADDRESS)val port = userConf.get(DRIVER_PORT)registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)val driverRef = rpcEnv.setupEndpointRef(RpcAddress(host, port),YarnSchedulerBackend.ENDPOINT_NAME)createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)} else {// Sanity check; should never happen in normal operation, since sc should only be null// if the user app did not create a SparkContext.throw new IllegalStateException("User did not initialize spark context!")}resumeDriver()userClassThread.join()} catch {case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>logError(s"SparkContext did not initialize after waiting for $totalWaitTime ms. " +"Please check earlier log output for errors. Failing the application.")finish(FinalApplicationStatus.FAILED,ApplicationMaster.EXIT_SC_NOT_INITED,"Timed out waiting for SparkContext.")} finally {resumeDriver()}}
private def startUserApplication(): Thread = {logInfo("Starting the user application in a separate Thread")var userArgs = args.userArgsif (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {// When running pyspark, the app is run using PythonRunner. The second argument is the list// of files to add to PYTHONPATH, which Client.scala already handles, so it's empty.userArgs = Seq(args.primaryPyFile, "") ++ userArgs}if (args.primaryRFile != null &&(args.primaryRFile.endsWith(".R") || args.primaryRFile.endsWith(".r"))) {// TODO(davies): add R dependencies here}val mainMethod = userClassLoader.loadClass(args.userClass).getMethod("main", classOf[Array[String]])val userThread = new Thread {override def run(): Unit = {try {if (!Modifier.isStatic(mainMethod.getModifiers)) {logError(s"Could not find static main method in object ${args.userClass}")finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS)} else {mainMethod.invoke(null, userArgs.toArray)finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)logDebug("Done running user class")}} catch {case e: InvocationTargetException =>e.getCause match {case _: InterruptedException =>// Reporter thread can interrupt to stop user classcase SparkUserAppException(exitCode) =>val msg = s"User application exited with status $exitCode"logError(msg)finish(FinalApplicationStatus.FAILED, exitCode, msg)case cause: Throwable =>logError("User class threw exception: " + cause, cause)finish(FinalApplicationStatus.FAILED,ApplicationMaster.EXIT_EXCEPTION_USER_CLASS,"User class threw exception: " + StringUtils.stringifyException(cause))}sparkContextPromise.tryFailure(e.getCause())} finally {// Notify the thread waiting for the SparkContext, in case the application did not// instantiate one. This will do nothing when the user code instantiates a SparkContext// (with the correct master), or when the user code throws an exception (due to the// tryFailure above).sparkContextPromise.trySuccess(null)}}}userThread.setContextClassLoader(userClassLoader)userThread.setName("Driver")userThread.start()userThread}

注册完成后,主线程处理yarn返回的资源createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)。

private def createAllocator(driverRef: RpcEndpointRef,_sparkConf: SparkConf,rpcEnv: RpcEnv,appAttemptId: ApplicationAttemptId,distCacheConf: SparkConf): Unit = {// In client mode, the AM may be restarting after delegation tokens have reached their TTL. So// always contact the driver to get the current set of valid tokens, so that local resources can// be initialized below.if (!isClusterMode) {val tokens = driverRef.askSync[Array[Byte]](RetrieveDelegationTokens)if (tokens != null) {SparkHadoopUtil.get.addDelegationTokens(tokens, _sparkConf)}}val appId = appAttemptId.getApplicationId().toString()val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port,CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toStringval localResources = prepareLocalResources(distCacheConf)// Before we initialize the allocator, let's log the information about how executors will// be run up front, to avoid printing this out for every single executor being launched.// Use placeholders for information that changes such as executor IDs.logInfo {val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toIntval executorCores = _sparkConf.get(EXECUTOR_CORES)val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "<executorId>","<hostname>", executorMemory, executorCores, appId, securityMgr, localResources,ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)dummyRunner.launchContextDebugInfo()}allocator = client.createAllocator(yarnConf,_sparkConf,appAttemptId,driverUrl,driverRef,securityMgr,localResources)// Initialize the AM endpoint *after* the allocator has been initialized. This ensures// that when the driver sends an initial executor request (e.g. after an AM restart),// the allocator is ready to service requests.rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))allocator.allocateResources()val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.APPLICATION_MASTER,sparkConf, securityMgr)val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId)ms.registerSource(new ApplicationMasterSource(prefix, allocator))// do not register static sources in this case as per SPARK-25277ms.start(false)metricsSystem = Some(ms)reporterThread = launchReporterThread()}

只看关键代码allocator.allocateResources(),处理分配的资源。

def allocateResources(): Unit = synchronized {updateResourceRequests()val progressIndicator = 0.1f// Poll the ResourceManager. This doubles as a heartbeat if there are no pending container// requests.val allocateResponse = amClient.allocate(progressIndicator)val allocatedContainers = allocateResponse.getAllocatedContainers()allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes)if (allocatedContainers.size > 0) {logDebug(("Allocated containers: %d. Current executor count: %d. " +"Launching executor count: %d. Cluster resources: %s.").format(allocatedContainers.size,runningExecutors.size,numExecutorsStarting.get,allocateResponse.getAvailableResources))handleAllocatedContainers(allocatedContainers.asScala)}val completedContainers = allocateResponse.getCompletedContainersStatuses()if (completedContainers.size > 0) {logDebug("Completed %d containers".format(completedContainers.size))processCompletedContainers(completedContainers.asScala)logDebug("Finished processing %d completed containers. Current running executor count: %d.".format(completedContainers.size, runningExecutors.size))}}

如果分配的Container数量大于0,调用** handleAllocatedContainers(allocatedContainers.asScala)**

def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)// Match incoming requests by hostval remainingAfterHostMatches = new ArrayBuffer[Container]for (allocatedContainer <- allocatedContainers) {matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,containersToUse, remainingAfterHostMatches)}// Match remaining by rack. Because YARN's RackResolver swallows thread interrupts// (see SPARK-27094), which can cause this code to miss interrupts from the AM, use// a separate thread to perform the operation.val remainingAfterRackMatches = new ArrayBuffer[Container]if (remainingAfterHostMatches.nonEmpty) {var exception: Option[Throwable] = Noneval thread = new Thread("spark-rack-resolver") {override def run(): Unit = {try {for (allocatedContainer <- remainingAfterHostMatches) {val rack = resolver.resolve(allocatedContainer.getNodeId.getHost)matchContainerToRequest(allocatedContainer, rack, containersToUse,remainingAfterRackMatches)}} catch {case e: Throwable =>exception = Some(e)}}}thread.setDaemon(true)thread.start()try {thread.join()} catch {case e: InterruptedException =>thread.interrupt()throw e}if (exception.isDefined) {throw exception.get}}// Assign remaining that are neither node-local nor rack-localval remainingAfterOffRackMatches = new ArrayBuffer[Container]for (allocatedContainer <- remainingAfterRackMatches) {matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,remainingAfterOffRackMatches)}if (remainingAfterOffRackMatches.nonEmpty) {logDebug(s"Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were " +s"allocated to us")for (container <- remainingAfterOffRackMatches) {internalReleaseContainer(container)}}runAllocatedContainers(containersToUse)logInfo("Received %d containers from YARN, launching executors on %d of them.".format(allocatedContainers.size, containersToUse.size))}

这里会根据主机host,机架rack等信息队container进行分配。完成后启动Container,runAllocatedContainers(containersToUse)。

private val launcherPool = ThreadUtils.newDaemonCachedThreadPool("ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS))

创建线程池launcherPool。

/*** Launches executors in the allocated containers.*/private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {for (container <- containersToUse) {executorIdCounter += 1val executorHostname = container.getNodeId.getHostval containerId = container.getIdval executorId = executorIdCounter.toStringassert(container.getResource.getMemory >= resource.getMemory)logInfo(s"Launching container $containerId on host $executorHostname " +s"for executor with ID $executorId")def updateInternalState(): Unit = synchronized {runningExecutors.add(executorId)numExecutorsStarting.decrementAndGet()executorIdToContainer(executorId) = containercontainerIdToExecutorId(container.getId) = executorIdval containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,new HashSet[ContainerId])containerSet += containerIdallocatedContainerToHostMap.put(containerId, executorHostname)}if (runningExecutors.size() < targetNumExecutors) {numExecutorsStarting.incrementAndGet()if (launchContainers) {launcherPool.execute(() => {try {new ExecutorRunnable(Some(container),conf,sparkConf,driverUrl,executorId,executorHostname,executorMemory,executorCores,appAttemptId.getApplicationId.toString,securityMgr,localResources,ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID // use until fully supported).run()updateInternalState()} catch {case e: Throwable =>numExecutorsStarting.decrementAndGet()if (NonFatal(e)) {logError(s"Failed to launch executor $executorId on container $containerId", e)// Assigned container should be released immediately// to avoid unnecessary resource occupation.amClient.releaseAssignedContainer(containerId)} else {throw e}}})} else {// For test onlyupdateInternalState()}} else {logInfo(("Skip launching executorRunnable as running executors count: %d " +"reached target executors count: %d.").format(runningExecutors.size, targetNumExecutors))}}}

查看ExecutorRunnable 类,其中nmClient = NMClient.createNMClient(), NodeManager客户端,负责于NodeManager交互;其prepareCommand() 方法拼接了一个进程启动命令,大体格式为:

bin/java -server org.apache.spark.executor.YarnCoarseGrainedExecutorBackend ...

在这里插入图片描述

ApplicationMaster进程中的launcherPool线程池,会根据Container的个数挨个启动线程ExecutorRunnable,ExecutorRunnable中的NMClient会将拼接好的jvm启动命令发送给相关的NodeManager,启动Container进程,进程名为YarnCoarseGrainedExecutorBackend。
ExecutorRunnable完整代码:

private[yarn] class ExecutorRunnable(container: Option[Container],conf: YarnConfiguration,sparkConf: SparkConf,masterAddress: String,executorId: String,hostname: String,executorMemory: Int,executorCores: Int,appId: String,securityMgr: SecurityManager,localResources: Map[String, LocalResource],resourceProfileId: Int) extends Logging {var rpc: YarnRPC = YarnRPC.create(conf)var nmClient: NMClient = _def run(): Unit = {logDebug("Starting Executor Container")nmClient = NMClient.createNMClient()nmClient.init(conf)nmClient.start()startContainer()}def launchContextDebugInfo(): String = {val commands = prepareCommand()val env = prepareEnvironment()s"""|===============================================================================|Default YARN executor launch context:| env:|${Utils.redact(sparkConf, env.toSeq).map { case (k, v) => s" $k -> $v\n" }.mkString}| command:| ${Utils.redactCommandLineArgs(sparkConf, commands).mkString(" \\ \n ")}|| resources:|${localResources.map { case (k, v) => s" $k -> $v\n" }.mkString}|===============================================================================""".stripMargin}def startContainer(): java.util.Map[String, ByteBuffer] = {val ctx = Records.newRecord(classOf[ContainerLaunchContext]).asInstanceOf[ContainerLaunchContext]val env = prepareEnvironment().asJavactx.setLocalResources(localResources.asJava)ctx.setEnvironment(env)val credentials = UserGroupInformation.getCurrentUser().getCredentials()val dob = new DataOutputBuffer()credentials.writeTokenStorageToStream(dob)ctx.setTokens(ByteBuffer.wrap(dob.getData()))val commands = prepareCommand()ctx.setCommands(commands.asJava)ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr).asJava)// If external shuffle service is enabled, register with the Yarn shuffle service already// started on the NodeManager and, if authentication is enabled, provide it with our secret// key for fetching shuffle files laterif (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) {val secretString = securityMgr.getSecretKey()val secretBytes =if (secretString != null) {// This conversion must match how the YarnShuffleService decodes our secretJavaUtils.stringToBytes(secretString)} else {// Authentication is not enabled, so just provide dummy metadataByteBuffer.allocate(0)}ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes))}// Send the start request to the ContainerManagertry {nmClient.startContainer(container.get, ctx)} catch {case ex: Exception =>throw new SparkException(s"Exception while starting container ${container.get.getId}" +s" on host $hostname", ex)}}private def prepareCommand(): List[String] = {// Extra options for the JVMval javaOpts = ListBuffer[String]()// Set the JVM memoryval executorMemoryString = executorMemory + "m"javaOpts += "-Xmx" + executorMemoryString// Set extra Java options for the executor, if definedsparkConf.get(EXECUTOR_JAVA_OPTIONS).foreach { opts =>val subsOpt = Utils.substituteAppNExecIds(opts, appId, executorId)javaOpts ++= Utils.splitCommandString(subsOpt).map(YarnSparkHadoopUtil.escapeForShell)}// Set the library path through a command prefix to append to the existing value of the// env variable.val prefixEnv = sparkConf.get(EXECUTOR_LIBRARY_PATH).map { libPath =>Client.createLibraryPathPrefix(libPath, sparkConf)}javaOpts += "-Djava.io.tmpdir=" +new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)// Certain configs need to be passed here because they are needed before the Executor// registers with the Scheduler and transfers the spark configs. Since the Executor backend// uses RPC to connect to the scheduler, the RPC settings are needed as well as the// authentication settings.sparkConf.getAll.filter { case (k, v) => SparkConf.isExecutorStartupConf(k) }.foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }// Commenting it out for now - so that people can refer to the properties if required. Remove// it once cpuset version is pushed out.// The context is, default gc for server class machines end up using all cores to do gc - hence// if there are multiple containers in same node, spark gc effects all other containers// performance (which can also be other spark containers)// Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in// multi-tenant environments. Not sure how default java gc behaves if it is limited to subset// of cores on a node./*else {// If no java_opts specified, default to using -XX:+cmsIncrementalMode// It might be possible that other modes/config is being done in// spark.executor.extraJavaOptions, so we don't want to mess with it.// In our expts, using (default) throughput collector has severe perf ramifications in// multi-tenant machines// The options are based on// http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use// %20the%20Concurrent%20Low%20Pause%20Collector|outlinejavaOpts += "-XX:+UseConcMarkSweepGC"javaOpts += "-XX:+CMSIncrementalMode"javaOpts += "-XX:+CMSIncrementalPacing"javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"javaOpts += "-XX:CMSIncrementalDutyCycle=10"}*/// For log4j configuration to referencejavaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri =>val absPath =if (new File(uri.getPath()).isAbsolute()) {Client.getClusterPath(sparkConf, uri.getPath())} else {Client.buildPath(Environment.PWD.$(), uri.getPath())}Seq("--user-class-path", "file:" + absPath)}.toSeqYarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)val commands = prefixEnv ++Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++javaOpts ++Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend","--driver-url", masterAddress,"--executor-id", executorId,"--hostname", hostname,"--cores", executorCores.toString,"--app-id", appId,"--resourceProfileId", resourceProfileId.toString) ++userClassPath ++Seq(s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")// TODO: it would be nicer to just make sure there are no null commands herecommands.map(s => if (s == null) "null" else s).toList}private def prepareEnvironment(): HashMap[String, String] = {val env = new HashMap[String, String]()Client.populateClasspath(null, conf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH))System.getenv().asScala.filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v }sparkConf.getExecutorEnv.foreach { case (key, value) =>if (key == Environment.CLASSPATH.name()) {// If the key of env variable is CLASSPATH, we assume it is a path and append it.// This is kept for backward compatibility and consistency with hadoopYarnSparkHadoopUtil.addPathToEnvironment(env, key, value)} else {// For other env variables, simply overwrite the value.env(key) = value}}env}}

以上就是解析spark源码yarn-cluster模式任务提交的详细内容,更多关于spark源码解析的资料请关注我们其它相关文章!


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐