Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark内核设计的艺术: 第9章 部署模式 #20

Open
shilinlee opened this issue Aug 27, 2019 · 0 comments
Open

Spark内核设计的艺术: 第9章 部署模式 #20

shilinlee opened this issue Aug 27, 2019 · 0 comments
Labels
Spark Apache Spark 大数据 整个大数据体系

Comments

@shilinlee
Copy link
Owner

shilinlee commented Aug 27, 2019

9.1 心跳接收器 HeartbeatReceiver

HeartbeatReceiver 运行在Driver上,用以接收各个Executor的心跳消息,对各个Executor的"生死”进行监控。

  • 注册Executor
    HeartbeatReceiver继承了SparkListener,并实现了onExecutorAdded方法(见代码清单9.1)。根据3.3节的内容,我们知道事件总线在接收到SparkListenerExecutorAdded消息后,将调用HeartbeatReceiver的onExecutorAdded方法,这样HeartbeatReceiver将监听到Executor的添加。

      /**
       * If the heartbeat receiver is not stopped, notify it of executor registrations.
       */
      override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
        addExecutor(executorAdded.executorId)
      }

    addExecutor实现如下:

      /**
       * Send ExecutorRegistered to the event loop to add a new executor. Only for test.
       *
       * @return if HeartbeatReceiver is stopped, return None. Otherwise, return a Some(Future) that
       *         indicate if this operation is successful.
       */
      def addExecutor(executorId: String): Option[Future[Boolean]] = {
        Option(self).map(_.ask[Boolean](ExecutorRegistered(executorId)))
      }
  • 移除 Executor

  • TaskSchedulerIsSet 消息

  • 检查超时的Executor

  • Executor的心跳

    image

9.2 Executor 的实现分析

9.2.1 Executor的心跳报告

在初始化Executor的过程中,Executor会调用自己的startDriverHeartbeater方法启动心跳报告的定时任务。

9.2.2 运行Task

Executor 的launchTask方法用于运行Task。

9.3 local 部署模式

本书在讲解各章内容时,主要以local模式为例。local部署模式只有Driver,没有Master和Worker,执行任务的Executor与Driver在同一个JVM进程内。local模式中使用的ExecutorBackend和SchedulerBackend的实现类都是LocalSchedulerBackend。在第7章介绍调度系统时已经分析过local部署模式下的LocalSchedulerBackend、LocalEndpoint等组件的实现,本节将以图形的方式展现local部署模式的启动、local部署模式下的任务提交与执行等流程。

local部署模式的启动过程如图9.2所示。

image

9.4 持久化引擎 PersistentEngine

PersistenceEngine用于当Master发生故障后,通过领导选举选择其他Master接替整个集群的管理工作时,能够使得新激活的Master有能力从故障中恢复整个集群的状态信息,进而恢复对集群资源的管理和分配。抽象类PersistenceEngine定义了对Master必需的任何状态信息进行持久化的接口规范。实现PersistenceEngine必须满足以下机制。

  • 在完成新的Application的注册之前,addApplication方法必须被调用。
  • 在完成新的Worker的注册之前,addWorker方法必须被调用。
  • removeApplication方法和removeWorker方法可以在任何时候调用。

在以上机制的保证下,整个集群的Worker、Driver和Application的信息都被持久化,集群因此可以在领导选举和主Master的切换后,对集群状态进行恢复。

image

BlackHolePersistentEngine都是空实现,CustomPersistentEngine是用于单元测试的实现。其他两个是用于生产环境的实现类。

9.4.1 基于文件系统的持久化引擎

FileSystemPersistenceEngine是基于文件系统的持久化引擎。对于ApplicationInfo、WorkerInfo及DriverInfo,FiIeSystemPersistenceEngine会将它们的数据存储到磁盘上的单个文件夹中,当要移除它们时,这些磁盘文件将被删除。由于不同的Master往往不在同一个机器节点上,因此在使用FileSystemPersistenceEngine时,底层的文件系统应该是分布式的。

9.4.2 基于ZooKeeperPersistenceEngine的久化引擎

ZooKeeperPersistenceEngine是基于ZooKeeper的持久化引擎。对于ApplicationInfo、WorkerInfo及DriverInfo,ZooKeeperPersistenceEngine会将它们的数据存储到ZooKeeper的不同节点(也称为Znode)中,当要移除它们时,这些节点将被删除。

ZooKeeperPersistenceEngine有以下属性。

  • conf: 即SparkConf。
  • serializer:持久化时使用的序列化器。
  • WORKING_DIR:ZooKeeperPersistenceEngine在ZooKeeper上的工作目录,是spark基于ZooKeeper进行热备的根节点(可通过spark.deploy.ZooKeeper.dir属性配置,默认为spark)的子节点master_status。
  • zk:连接ZooKeeper的客户端类型为CuratorFramework。

小贴士Curator是Netflix公司开源的一个ZooKeeper客户端,与ZooKeeper提供的原生客户端相比,Curator的抽象层次更高,其核心目标是帮助工程师管理zooKeeper的相关操作,简化ZooKeeper客户端的开发量。Curator现已提升为Apache的顶级项目。Curator-Framework就是Curator提供的APIO

9.5 领导选举代理

领导选举机制(Leader Election)可以保证集群虽然存在多个Master,但是只有一个Master处于激活(Active)状态,其他的Master处于支持(Standby)状态。当Active状态的Master出现故障时,会选举出一个standby状态的Master作为新的Active状态的Master。由于整个集群的worker,Driver和Application的信息都已经通过持久化引擎持久化,因此切换Master时只会影响新任务的提交,对于正在运行中的任务没有任何影响。

特质LeaderElectionAgent定义了对当前的Master进行跟踪和领导选举代理的通用接口,其定义如下。

/**
 * :: DeveloperApi ::
 *
 * A LeaderElectionAgent tracks current master and is a common interface for all election Agents.
 */
@DeveloperApi
trait LeaderElectionAgent {
  val masterInstance: LeaderElectable
  def stop() {} // to avoid noops in implementations.
}

masterInstance 属性类型是LeaderElectable,特质LeaderElectable的定义如下:

@DeveloperApi
trait LeaderElectable {
  def electedLeader(): Unit   // 被选举为领导
  def revokedLeadership(): Unit  // 撤销领导关系
}
  1. MonarchyLeaderAgent 详解
  2. ZookeeperLeadeElectionrAgent 详解

9.6 Master 详解

Master是local-cluster部署模式和Standalone部署模式中,整个Spark集群最为重要的组件之一,它的设计将直接决定整个集群的可扩展性、可用性和容错性。Master的职责包括worker的管理、Application的管理、Driver的管理等。Master负责对整个集群中所有资源的统一管理和分配,它接收各个worker的注册、更新状态、心跳等消息,也接收Driver和Application的注册。

worker向Master注册时会携带自身的身份和资源信息(如ID、host、P酾、内核数、内存大小等),这些资源将按照一定的资源调度策略分配给或Application。Master给Driver分配了资源后,将向Worker发送启动Driver的命令,后者在接收到启动Driver的命令后启动Driver。Master给Application分配了资源后,将向Worker发送启动Executor的命令,后者在接收到启动Executor的命令后启动Executor。

Master接收Worker的状态更新消息,用于“杀死”不匹配的Driver或Application。worker向Master发送的心跳消息有两个目的:一是告知Master自己还“活着”,另外则是某个出现故障后,通过领导选举选择了其他Master负责对整个集群的管理,此时被激活的Master可能并没有缓存worker的相关信息,因此需要告知Worker重新向新的
Master注册。

本节主要对Master进行详细分析,理解local-cluster部署模式和StandaIone部署模式下Master如何对整个集群的资源进行管理和分配,但在此之前先需要按部就班地了解Master包含的属性。

****** 识别结果 1******

9.6.1 启动Master

启动Master有作为JVM进程内的对象启动和作为单独的进程启动的两种方式。以对象启动的方式主要用于local-cluster模式,而作为进程启动则用于standalone模式。

  1. 对象方式启动
    Master的伴生对象的startRpcEnvAndEndpoint方法用于创建Master对象,并将Master对象注册到RpcEnv中完成对Master对象的启动。

      /**
       * Start the Master and return a three tuple of:
       *   (1) The Master RpcEnv
       *   (2) The web UI bound port
       *   (3) The REST server bound port, if any
       */
      def startRpcEnvAndEndpoint(
          host: String,
          port: Int,
          webUiPort: Int,
          conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
        val securityMgr = new SecurityManager(conf)
        val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
        val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
          new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
        val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
        (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
      }
    }
    1. 进程方式启动

      Master的伴生对象中实现了main方法,这样就可以作为单独的JVM进程启动了。

        def main(argStrings: Array[String]) {
          Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
            exitOnUncaughtException = false))
          Utils.initDaemon(log)
          val conf = new SparkConf
          val args = new MasterArguments(argStrings, conf)
          val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
          rpcEnv.awaitTermination()
        }

9.6.2 检查worker超时

经过上一小节对启动Master的分析,我们知道定时任务checkForWorkerTimeOutTask是以WORKER_TIMEOUT_MS为时间间隔,通过不断向Master自身发送CheckForWorkerTime0ut消息来实现对worker的超时检查的。Master也继承自RpcEndpoint,Master实现的receive方法中处理CheckForWorkerTimeOut消息的代码如下。

 case CheckForWorkerTimeOut =>
   timeOutDeadWorkers()
  /** Check for, and remove, any timed-out workers */
  private def timeOutDeadWorkers() {
    // Copy the workers into an array so we don't modify the hashset while iterating through it
    val currentTime = System.currentTimeMillis()
    val toRemove = workers.filter(_.lastHeartbeat < currentTime - workerTimeoutMs).toArray
    for (worker <- toRemove) {
      if (worker.state != WorkerState.DEAD) {
        val workerTimeoutSecs = TimeUnit.MILLISECONDS.toSeconds(workerTimeoutMs)
        logWarning("Removing %s because we got no heartbeat in %d seconds".format(
          worker.id, workerTimeoutSecs))
        removeWorker(worker, s"Not receiving heartbeat for $workerTimeoutSecs seconds")
      } else {
        if (worker.lastHeartbeat < currentTime - ((reaperIterations + 1) * workerTimeoutMs)) {
          workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
        }
      }
    }
  }

9.6.3 被选举为领导时的处理

Master基于高可用性的考虑,可以同时启动多个Master。这些中只有一个是激活(Active)状态的,其余的都是支持(Standby)状态。根据9.5节的介绍,Master为了具备故障迁移的能力,它实现了LeaderElectable接囗,因此当Master被选举为领导时,领导选举代理(LeaderElectionAgent)将会调用Master的electedLeader方法。electedLeader方法的实现如下所示。

  override def electedLeader() {
    self.send(ElectedLeader)
  }

9.6.4 一级资源调度

9.6.5 注册Worker

在Spark集群中,Master接收到提交的应用程序后,需要根据应用的资源需求,将应用分配到worker上去运行。一个集群刚开始的时候只有Master,为了让后续启动的worker加人到Master的集群中,每个Worker都需要在启动的时候向Master注册,Master接收到worker的注册信息后,将把的各种重要信息(如ID、host、port、内核数、内存大小等信息)缓存起来,以便进行资源的分配与调度。Master为了容灾,还将worker的信息通过持久化引擎进行持久化,以便经过领导选举出的新Mar能够将集群的状态从错误或灾难中恢复。

Master的receiveAndReply方法中实现了对Worker发送的RegisterWorker消息进行处理的实现。

9.6.6 更新Worker的最新状态

Worker在向Master注册成功后,会向Master发送WorkerLatestState消息。WorkerLatestState消息将携带Worker的身份标识、worker节点的所有Executor的描述信息、调度到当前Worker的所有Driver的身份标识。Master接收到WorkerLatestState消息的处理。

9.6.7 处理Worker的心跳

向Master注册Worker,可以让Master知道worker的资源配置,进而通过资源调度使得Driver及Executor可以在Worker上执行。如果Worker的JVM进程发生了崩溃或者Worker所在的机器宕机或网络不通,那么Master所维护的关于worker的注册信息将变得不可用。

为了让Master及时得知Worker的最新状态,需要向Master发送心跳,Master将根据worker的心跳更新Worker的最后心跳时间,以便为整个集群的健康工作提供参考。Master的receive方法中实现了对Worker的心跳的处理。

    case Heartbeat(workerId, worker) =>
      idToWorker.get(workerId) match {
        case Some(workerInfo) =>
          workerInfo.lastHeartbeat = System.currentTimeMillis()
        case None =>
          if (workers.map(_.id).contains(workerId)) {
            logWarning(s"Got heartbeat from unregistered worker $workerId." +
              " Asking it to re-register.")
            worker.send(ReconnectWorker(masterUrl))
          } else {
            logWarning(s"Got heartbeat from unregistered worker $workerId." +
              " This worker was never registered, so ignoring the heartbeat.")
          }
      }

根据代码清单,Master接收到Heartbeat消息后,将从idToWorker中找出缓存的WorkerInfo,并将workerlnfo的最后心跳时间(lastHeartbeat)更新为系统当前时间的时间戳。如果idToWorker中没有缓存的workerlnfo,且workers中有对应的Workerinfo(这说明定时任务checkForWorkerTime0utTask检查到worker超时,但是workerlnfo的状态不是DEAD,那么在调用removeworker方法时将workerlnfo从idToWorker中清除,此时的workers中仍然持有WorkerInfo),那么向Worker发送ReconnectWorker消息。如果idToWorker中没有缓存的WorkerInfo,且workers中也没有对应的WorkerInfo,那么说明checkForWorkerTimeOutTask已经发现Worker很长时间没有心跳,并且WorkerInfo的状态为DEAD后,将WorkerInfo从workers中也移除了。

9.6.8 注册Application

9.6.9 处理Executor的申请

9.6.10 处理Executor的状态变化

9.6.11 Master的常用方法

9.7 Worker详解

worker是Spark在local-cluster部署模式和Standa10ne部署模式中对工作节点的资源和Executor进行管理的服务。Worker一方面向Master汇报自身所管理的资源信息,一方面接收Master的命令运行Drrver或者为Apphcatlon运行Executor。同一个机器上可以同时部署多个Worker服务,一个worker也可以启动多个Executor。当Executor完成后,Worker将回收使用的资源。

9.7.1 启动Worker

启动Worker有作为JVM进程内的对象启动和作为单独的进程启动的两种方式。以对象启动的方式主要用于local-cluster模式,而作为进程启动则用于standalone模式。

9.7.2 向Master注册Worker

worker在启动后,需要加人到Master管理的整个集群中,以参与Dnver、Executor的资源调度。Worker要加入Mar管理的集群,就必须将注册到Mastero在启动Worker的过程中需要调用registerWithMaster方法向Master注册Worker。

9.7.3 向Master发送心跳

为了让Mar得知Worker依然健康运行着,就需要不断地告诉Master:“我活着”,这个过程是通过发送心跳实现的。

根据之前的内容我们知道,当Worker向Mar注册成功后会接收到Master回复的RegisteredWorker消息,Worker使用handleRegisterResponse方法处理RegisteredWorker消息时,将会向forwordMessageScheduler提交以HEARTBEATMILLIS作为间隔向Worker自身发送SendHeaflbeat消息的定时任务。Worker的receive方法实现了对SendHeartbeac消息的处理。

9.7.4 Worker与领导选举

9.7.5 运行 Driver

在介绍 Master对 Driver的资源调度和运行时,我们知道 Master将向 Worker发送LaunchDriver消息以运行 Driver。下面一起来看 Worker接收到 LaunchDriver消息后,是如何运行 Driver的。

9.7.6 运行 Executor

9.7.7 处理Executor的状态变化

9.8 StandloneAppClient 实现

StandloneAppClient 是在standalone模式下,Application与集群管理器进行对话的客户端。

9.8.1 ClientEndpoint 的实现分析

Spark各个组件之间的通信离不开RpcEnv以及RpcEndpoint。ClientEndpoint 继承自ThreadSafeRpcEndpoint,也是StandloneAppClient 的内部类,StandloneAppClient 依赖于ClientEndpoint 与集群管理器进行通信。

9.8.2 StandloneAppClient 的实现分析

StandloneAppClient 最为核心的功能是向集群管理器请求或“杀死”Executor。

9.9 StandloneSchedulerBackend 的实现分析

在7.8.2节我们曾经介绍了loca部署模式下, SchedulerBackend的实现类 LocalSchedulerBackend,本节将介绍在local- cluster模式和 Standalone模式下, SchedulerBackend的另个实现类 StandaloneScheduler Backend。由于 Standalonescheduler Backend继承自 CoarseGrainedSchedulerBackend,本节还需要介绍 CoarseGrainedSchedulerBackend及其与其他组件通信的内部类 DriverEndpoint。

@shilinlee shilinlee added Spark Apache Spark 大数据 整个大数据体系 labels Aug 27, 2019
@shilinlee shilinlee changed the title Spark内核设计的艺术: 第9章 计部署模式 Spark内核设计的艺术: 第9章 部署模式 Aug 28, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Spark Apache Spark 大数据 整个大数据体系
Projects
None yet
Development

No branches or pull requests

1 participant