打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
【Spark三十六】Spark On Yarn之yarnyarn-client形式部署

【Spark三十六】Spark On Yarn之yarn-client形式部署

www.MyException.Cn  网友分享于:2015-02-03  浏览:0次
【Spark三十六】Spark On Yarn之yarn-client方式部署

按照Spark的部署设置,对于Spark运行于Yarn之上,有如下四种选择方式(本质上是两种),

  • yarn-client+client
  • yarn-cluster+cluster
  • yarn-client(部署方式默认为client)
  • yarn-cluster(部署方式默认为cluster)

yarn-client+cluster组合以及yarn-cluster+client是不正确的组合,Spark报错退出。

本文首先探讨Spark On Yarn之yarn-client+client方式部署下的代码执行流程

 

程序提交给Yarn运行时环境

 

  • 对于部署方式是Client的情况,SparkSubmit的main函数中通过反射执行应用程序的main方法
  • 在应用程序的main方法中,创建SparkContext实例
  • 在创建SparkContext的实例过程中,通过如下语句创建Scheduler和Backend实例

 

  private[spark] var (schedulerBackend, taskScheduler) =  SparkContext.createTaskScheduler(this, master)

 

  • 由于当前是yarn-client和client组合部署模式,

        1.代码执行逻辑是: taskScheduler是org.apache.spark.scheduler.cluster.YarnClientClusterScheduler实例,它是TaskSchedulerImpl的子类,它的文档说明为

 

 

/** * This scheduler launches executors through Yarn - by calling into Client to launch the Spark AM. */
      2.schedulerBackend是org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend实例,它是CoarseGrainedSchedulerBackend的子类,它是文档说明为无

 

 

  • 继续SparkContext实例的创建过程,调用taskScheduler的start方法,也即YarnClientClusterScheduler的start方法,因为YarnClientClusterScheduler并没有覆盖TaskSchedulerImpl的start方法,所以执行逻辑进入到TaskSchedulerImpl的start方法中
  • 在TaskSchedulerImpl的start方法中,调用backend的start方法,由于此处的backend是YarnClientSchedulerBackend,所以代码逻辑进入到YarnClientSchedulerBackend的start方法中
  • 在YarnClientSchedulerBackend的start方法中,创建YarnClient(将用户编写的应用程序提交给Yarn的ResourceManager)
  • 在YarnClient创建yarn.Client对象,然后调用submitApplication,等待Application执行完,如下代码所示
    client = new Client(args, conf) //yarn.client    appId = client.submitApplication()    waitForApplication() ///阻塞等待Application进入Running状态    asyncMonitorApplication() ///异步监控Application的运行状态,If the application has exited for any reason, stop the SparkContext.

 

  • 程序逻辑进入了yarn.Client调用submitApplication的逻辑,执行代码:

1.submitApplication的代码(Spark)

  /**   * Submit an application running our ApplicationMaster to the ResourceManager.   *   * The stable Yarn API provides a convenience method (YarnClient#createApplication) for   * creating applications and setting up the application submission context. This was not   * available in the alpha API.   */  ////这里借助Hadoop Yarn提供的API提交应用程序,这里的API是Hadoop Yarn的YarnClient  override def submitApplication(): ApplicationId = {    yarnClient.init(yarnConf) ////yarnClient是通过YarnClient.createYarnClient创建,而YarnClient是Hadoop API,所以yarnClient也是Hadoop的API    yarnClient.start() ///启动Yarn    logInfo("Requesting a new application from cluster with %d NodeManagers" ///NodeManager是什么概念?      .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))    // Get a new application from our RM    val newApp = yarnClient.createApplication()///代码运行到此处,还没有真正的把程序代码提交给Yarn去运行;这里使用YarnClient创建一个Application,类型为YarnClientApplication    val newAppResponse = newApp.getNewApplicationResponse() ///返回GetNewApplicationResponse类型    val appId = newAppResponse.getApplicationId() ///获取applicationId    // Verify whether the cluster has enough resources for our AM    verifyClusterResources(newAppResponse)    // Set up the appropriate contexts to launch our AM    val containerContext = createContainerLaunchContext(newAppResponse) ///创建启动ApplicationMaster容器的上下文环境    val appContext = createApplicationSubmissionContext(newApp, containerContext)///依据创建的newApp和containerContext,创建应用上下文环境    // Finally, submit and monitor the application    logInfo(s"Submitting application ${appId.getId} to ResourceManager")    yarnClient.submitApplication(appContext) ///根据创建的applicationContext,由Hadoop Yarn的yarnClient提交作业    appId  }

2. YarnClient的createApplication代码(Hadoop Yarn)

 

 

  @Override  public YarnClientApplication createApplication()      throws YarnException, IOException {    ApplicationSubmissionContext context = Records.newRecord        (ApplicationSubmissionContext.class);    GetNewApplicationResponse newApp = getNewApplication();    ApplicationId appId = newApp.getApplicationId();    context.setApplicationId(appId);    return new YarnClientApplication(newApp, context);  }

 

3. YarnClientApplication的getNewApplicationResponsed代码(Hadoop Yarn)

 

 public GetNewApplicationResponse getNewApplicationResponse() {    return newAppResponse;//类型为GetNewApplicationResponse   }

 

4. createContainerLaunchContext代码(Spark)

 /**   * Set up a ContainerLaunchContext to launch our ApplicationMaster container.   * This sets up the launch environment, java options, and the command for launching the AM.   */  ///创建启动ApplicationMaster容器的上下文环境  protected def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)      : ContainerLaunchContext = {    logInfo("Setting up container launch context for our AM")    val appId = newAppResponse.getApplicationId    val appStagingDir = getAppStagingDir(appId)    val localResources = prepareLocalResources(appStagingDir) ///准备本地资源    val launchEnv = setupLaunchEnv(appStagingDir)    val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) ///创建ApplicationMaster容器,类型为ContainerLaunchContext    amContainer.setLocalResources(localResources)    amContainer.setEnvironment(launchEnv)    val javaOpts = ListBuffer[String]()    // Set the environment variable through a command prefix    // to append to the existing value of the variable    var prefixEnv: Option[String] = None    // Add Xmx for AM memory    javaOpts += "-Xmx" + args.amMemory + "m"    val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)    javaOpts += "-Djava.io.tmpdir=" + tmpDir    // TODO: Remove once cpuset version is pushed out.    // The context is, default gc for server class machines ends up using all cores to do gc -    // hence if there are multiple containers in same node, Spark GC affects all other containers'    // performance (which can be that of other Spark containers)    // Instead of using this, rely on cpusets by YARN to enforce "proper" Spark behavior in    // multi-tenant environments. Not sure how default Java GC behaves if it is limited to subset    // of cores on a node.    val useConcurrentAndIncrementalGC = launchEnv.get("SPARK_USE_CONC_INCR_GC").exists(_.toBoolean)    if (useConcurrentAndIncrementalGC) {      // In our expts, using (default) throughput collector has severe perf ramifications in      // multi-tenant machines      javaOpts += "-XX:+UseConcMarkSweepGC"      javaOpts += "-XX:+CMSIncrementalMode"      javaOpts += "-XX:+CMSIncrementalPacing"      javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"      javaOpts += "-XX:CMSIncrementalDutyCycle=10"    }    // Forward the Spark configuration to the application master / executors.    // TODO: it might be nicer to pass these as an internal environment variable rather than    // as Java options, due to complications with string parsing of nested quotes.    for ((k, v) <- sparkConf.getAll) {      javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v")    }    // Include driver-specific java options if we are launching a driver    if (isLaunchingDriver) {  //什么情况是启动Driver??      sparkConf.getOption("spark.driver.extraJavaOptions")        .orElse(sys.env.get("SPARK_JAVA_OPTS"))        .foreach(opts => javaOpts += opts)      val libraryPaths = Seq(sys.props.get("spark.driver.extraLibraryPath"),        sys.props.get("spark.driver.libraryPath")).flatten      if (libraryPaths.nonEmpty) { ///此处对prefixEnv进行唯一的赋值,        prefixEnv = Some(Utils.libraryPathEnvPrefix(libraryPaths))      }    }    // For log4j configuration to reference    javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)    val userClass = ///只有启动Driver的情况下才会设置--class      if (isLaunchingDriver) {        Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))      } else {        Nil      }    val userJar = ////应用程序的jar文件      if (args.userJar != null) {        Seq("--jar", args.userJar)      } else {        Nil      }    val amClass = ///如果是Driver,则是ApplicationMaster,否则是ExecutorLauncher      if (isLaunchingDriver) {        Class.forName("org.apache.spark.deploy.yarn.ApplicationMaster").getName      } else {        Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName      }    val userArgs = args.userArgs.flatMap { arg =>      Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))    }    val amArgs = ///ApplicationMaster --class --jar      Seq(amClass) ++ userClass ++ userJar ++ userArgs ++      Seq(        "--executor-memory", args.executorMemory.toString + "m",        "--executor-cores", args.executorCores.toString,        "--num-executors ", args.numExecutors.toString)    // Command for the ApplicationMaster    ///封装要执行的命令,使用java -server javaOpts armArgs    val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++      javaOpts ++ amArgs ++      Seq(        "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",        "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")    // TODO: it would be nicer to just make sure there are no null commands here    val printableCommands = commands.map(s => if (s == null) "null" else s).toList    //amContainer包含的命令    amContainer.setCommands(printableCommands)    logDebug("===============================================================================")    logDebug("Yarn AM launch context:")    logDebug(s"    user class: ${Option(args.userClass).getOrElse("N/A")}")    logDebug("    env:")    launchEnv.foreach { case (k, v) => logDebug(s"        $k -> $v") }    logDebug("    resources:")    localResources.foreach { case (k, v) => logDebug(s"        $k -> $v")}    logDebug("    command:")    logDebug(s"        ${printableCommands.mkString(" ")}")    logDebug("===============================================================================")    // send the acl settings into YARN to control who has access via YARN interfaces    val securityManager = new SecurityManager(sparkConf)    amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager))    setupSecurityToken(amContainer)    UserGroupInformation.getCurrentUser().addCredentials(credentials)    amContainer  }

 

 

5. createApplicationSubmissionContext的代码(Spark)

  /**   * Set up the context for submitting our ApplicationMaster.   * This uses the YarnClientApplication not available in the Yarn alpha API.   */  ///提交ApplicationMaster的上下文环境  def createApplicationSubmissionContext(      newApp: YarnClientApplication,      containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {    val appContext = newApp.getApplicationSubmissionContext    appContext.setApplicationName(args.appName) ///应用程序的名字    appContext.setQueue(args.amQueue)    appContext.setAMContainerSpec(containerContext) ///将创建的ContainerLaunchContext包装到appContext中    appContext.setApplicationType("SPARK") ////UI上当应用程序执行完成后,显示的应用程序的类型    val capability = Records.newRecord(classOf[Resource])    capability.setMemory(args.amMemory + amMemoryOverhead)    appContext.setResource(capability)    appContext  }

 

  • 上面分析了yarn.Client调用submitApplication详细逻辑,程序回到的start方法中,继续下面的逻辑
    waitForApplication() ///阻塞等待程序进入Running状态    asyncMonitorApplication() ///异步监控程序的运行状态

 

  • 当作业提交到Yarn中之后,Yarn创建一个进程。如果是Driver则运行ApplicationMaster,否则运行ExecutorLauncher。是否是Driver通过yarn.client的isLaunchingDriver变量决定。isLaunchingDriver取值依赖于args.userClass是否存在,用户的指令中提供了--class参数,则args.userClass中的值就是用户提供的--class参数。此逻辑在.yarn.ClientArguments类中的parseArgs中
  • 我们的spark-submit提供了--class参数,所以,Yarn将启动ApplicationMaster进程。也就是说,在Yarn的运行时环境中,启动了Spark的ApplicationMaster进程

在Yarn运行时环境中运行Spark的ApplicationMaster进程的执行逻辑

  • 代码逻辑进入到ApplicationMaster的main方法中

 

  def main(args: Array[String]) = {    SignalLogger.register(log)    val amArgs = new ApplicationMasterArguments(args) ///args是什么内容?    SparkHadoopUtil.get.runAsSparkUser { () =>      master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs)) ///因为ApplicationMaster需要跟Yarn的ResourceManager交互,所以这里需要访问RM的YarnRMClientImpl实例      System.exit(master.run()) ///执行ApplicationMaster的run方法    }  }

 

  •  在ApplicationMaster的run方法中调用ApplicationMaster的runDriver或者runExecutorLauncher方法,究竟运行哪一个??
  • 此处的逻辑先暂时不表,因为还没有清楚ApplicationMaster的runDriver或者runExecutorLauncher方法,究竟运行哪一个,不过最终的逻辑都会走进ApplicationMaster的registerAM中,先继续吧!!!
  • 在ApplicationMaster的registerAM方法中,调用YarnRMClient的register方法,如下是代码:

 

  override def register(      conf: YarnConfiguration,      sparkConf: SparkConf,      preferredNodeLocations: Map[String, Set[SplitInfo]],      uiAddress: String,      uiHistoryAddress: String,      securityMgr: SecurityManager) = {    amClient = AMRMClient.createAMRMClient()    amClient.init(conf)    amClient.start()    this.uiHistoryAddress = uiHistoryAddress    logInfo("Registering the ApplicationMaster") ///注册APPlicationMaster到Yarn    synchronized {      amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)///调用AMRMClient的registerApplicationMaster方法      registered = true    }    new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args,      preferredNodeLocations, securityMgr) ///创建了一个YarnAllocationHandler,register方法不需要返回值  }

 

  • 在ApplicationMaster的registerRM方法中,继续调用YarnAllocator的allocateResources方法。这个方法将近300行,主要的功能如其方法说明文档所说,主要是分配container,按照本机优先,本机架次之,其它机架最后的次序进行container分配
  /**   * Allocate missing containers based on the number of executors currently pending and running.   *   * This method prioritizes the allocated container responses from the RM based on node and   * rack locality. Additionally, it releases any extra containers allocated for this application   * but are not needed. This must be synchronized because variables read in this block are   * mutated by other methods.   */

 

  • 在allocateResources的过程,会执行提交ExecutorRunnable实例到线程池的操作,(这里是对每个分配到的container交给线程池来处理使用)
          val executorRunnable = new ExecutorRunnable( ///实现了Runnable接口的任务            container,            conf,            sparkConf,            driverUrl,            executorId,            executorHostname,            executorMemory,            executorCores,            appAttemptId.getApplicationId.toString,            securityMgr)          launcherPool.execute(executorRunnable) ///提交给线程池        }

 

  • ExecutorRunnable在container中执行run方法中,封装启动org.apache.spark.executor.CoarseGrainedExecutorBackend进程的指令,这个逻辑是在prepareCommand中完成的,代码片段如下:
 val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", ///java命令所在的位置      "-server",      // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.      // Not killing the task leaves various aspects of the executor and (to some extent) the jvm in      // an inconsistent state.      // TODO: If the OOM is not recoverable by rescheduling it on different node, then do      // 'something' to fail job ... akin to blacklisting trackers in mapred ?      "-XX:OnOutOfMemoryError='kill %p'") ++      javaOpts ++      Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", ///启动CoarseGrainedExecutorBackend进程      masterAddress.toString,      slaveId.toString,      hostname.toString,      executorCores.toString,      appId,      "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",      "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

 

  • 调用NMClient.startContainer启动Container
    // Send the start request to the ContainerManager    nmClient.startContainer(container, ctx) //nmClient是NodeManager的实例,输入Hadoop Yarn的API

 

  • 当CoarseGrainedExecutorBackend启动时,代码逻辑回到我们熟悉的轨道上来。调用preStart方法启动Executor
  override def preStart() {    logInfo("Connecting to driver: " + driverUrl)    driver = context.actorSelection(driverUrl)    driver ! RegisterExecutor(executorId, hostPort, cores) ///给Driver发消息RegisterExecutor    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])  }

 

  • DriverActor收到RegisterExecutor的消息后,调用makeOffer方法,在makeOffer方法中,调用launchTasks方法给ExecutorActor发消息LaunchTask
  • ExecutorBackEnd收到LaunchTask的消息后,调用Executor的launchTask方法,然后通过Executor里面的线程池提交任务到线程池执行

 

总结

 囫囵吞枣似的的将yarn-client模式的执行流程走了一遍,毕竟是第一次接触到这里,同时对Yarn也没有很好的理解,所以这中间还有很多不明白的东西,还得不断的思考,细化。

以两幅图片作为结束,

 

 


 

 

 


 

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
理解Spark运行模式(一)(Yarn Client)
Apache Spark源码走读之8
spark on Yarn测试
Spark内核-部署模式
理解Spark运行模式(二)(Yarn Cluster)
Spark on Yarn遇到的几个问题
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服