Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark内核设计的艺术: 第5章 Spark执行环境 #16

Open
shilinlee opened this issue Aug 9, 2019 · 0 comments
Open

Spark内核设计的艺术: 第5章 Spark执行环境 #16

shilinlee opened this issue Aug 9, 2019 · 0 comments
Labels
Spark Apache Spark 大数据 整个大数据体系

Comments

@shilinlee
Copy link
Owner

shilinlee commented Aug 9, 2019

5.1 SparkEnv 概述

SparkEnv的私有方法create用于创建SparkEnv。其内部组件如图:

image

名称 说明
SecurityManager 主要对账户、权限及身份认证进行设置与管理。
RpcEnv 各个组件之间通信的执行环境。
SerializerManager Spark 中很多对象在通用网络传输或者写入存储体系时,都需要序列化。
BroadcastManager 用于将配置信息和序列化后的RDD、Job以及ShuffleDependency等信息在本地存储。
MapOutputTracker 用于跟踪Map阶段任务的输出状态,此状态便于Reduce阶段任务获取地址及中间结果。
ShuffleManager 负责管理本地及远程的Block数据的shuffle操作。
MemoryManager 一个抽象的内存管理器,用于执行内存如何在执行和存储之间共享。
NettyBlockTransferService 使用Netty提供的异步事件驱动的网络应用框架,提供Web服务及客户端,获取远程节点上Block的集合。
BlockManagerMaster 负责对BlockManager的管理和协调。
BlockManager 负责对Block的管理,管理整个Spark运行时的数据读写的,当然也包含数据存储本身,在这个基础之上进行读写操作。
MetricsSystem 一般是为了衡量系统的各种指标的度量系统。
OutputCommitCoordinator 确定任务是否可以把输出提到到HFDS的管理者,使用先提交者胜的策略。

5.2 安全管理器 SecurityManager

SecurityManager主要对帐号、权限以及身份认证进行设置和管理。如果 Spark 的部署模式为 YARN,则需要生成 secret key (密钥)并存储 Hadoop UGI。而在其他模式下,则需要设置环境变量 _SPARK_AUTH_SECRET(优先级更高)或者 spark.authenticate.secret 属性指定 secret key (密钥)。最后SecurityManager 中设置了默认的口令认证实例 Authenticator,此实例采用匿名内部类实现,用于每次使用 HTTP client 从 HTTP 服务器获取用户的用户和密码。这是由于 Spark 的节点间通信往往需要动态协商用户名、密码,这种方式灵活地支持了这种需求。

val securityManager = new SecurityManager(conf, ioEncryptionKey, authSecretFileConf)

SecurityManager内部有很多属性。

  • authOn:是否开启认证。
  • aclsOn:是否对账号进行授权检查。
  • adminAcls:管理员账号集合。

5.3 RPC 环境

RpcEnv组件肩负着替代Spark 2.x.x以前版本中采用的Akka。SparkEnv创建RpcEnv代码如下:

val systemName = if (isDriver) driverSystemName else executorSystemName
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), 				conf, securityManager, numUsableCores, !isDriver)

在RpcEnv的create方法中也只有如下代码:

val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port,                            securityManager, numUsableCores, clientMode)
new NettyRpcEnvFactory().create(config)

5.3.1 RPC 端点 RpcEndpoint

RPC端点是对Spark的PRC通信实体的统一抽象, 所有运行与RPC框架至上的实体都应该继承RpcEndpointRpcEndpoint是替代AkkaActorRpcEndpoint是能够处理RPC请求,给一特定服务提供本地调用及跨节点调用的RPC组件的抽象。

5.3.1.1 RpcEndpoint 的定义

查看源码,自带注释。

RpcEndpoint的一些接口非常类似于Akka的Actor

5.3.1.2 特质 RpcEndpoint 的继承体系

RpcEndpoint 只是一个特质,除了对接口的定义,并没有任何实现的逻辑。下图展示了那些子类实现了RpcEndpoint `。

image

其中灰色的子类型DummyMaster(Mummy意为虚拟的、假的),它只是用来测试的。ThreadSafeRpcEndpoint适用于必须是线程安全的场景,被很多继承者实现,比如HeartbeatReceiverMaster,遇到了再做具体分析。

5.3.2 RPC 端点引用 RpcEndpointRef

RpcEndpointRefActorRef的替代品。要向远端的RpcEndpoint发起请求,必须持有这个RpcEndpointRpcEndpointRef

image

下面介绍什么事消息投递规则。

5.3.2.1 消息投递规则

  • at-most-once: 意味着每条应用了这种机制的消息会被投递0次或1次。可以说这条消息可能会丢失。
  • at-least-once: 潜在的存在多次投递尝试并保证至少成功一次。
  • exaxctly-once: 只会准确发送一次,这种消息不会丢失、也不会重复。

5.3.2.2 RpcEndpointRef 的定义

它定义了所有RpcEndpoint引用的属性与接口。代码请看源码

/**
* Sends a one-way asynchronous message. Fire-and-forget semantics.
* 属于 at-most-once 投递规则
*/
def send(message: Any): Unit

  /**
   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
   * receive the reply within a default timeout.
   * 这个就要等待服务端的返回了
   * This method only sends the message once and never retries.
   */
  def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)

5.3.3 创建传输上下文 TransportConf

由于RPC环境RpcEnv的底层需要依赖于数据总线,因此需要创建传输上下文TransportConf

源代码

private[netty] val transportConf = SparkTransportConf.fromSparkConf(
    conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
    "rpc",
    conf.getInt("spark.rpc.io.threads", 0))

5.3.4 消息调度器 Dispatcher (*)

Dispatcher 是有效提法哦NettyRpcEnv对消息异步处理并最大提升并行性处理能力的前提。Dispatcher负责将RPC消息路由到要该对此消息处理的RPCEndpoint。

5.3.5 传输上下文 TransportContext

NettyRpcEnv中,创建TransportContext的代码如下:

  private val streamManager = new NettyStreamManager(this)

  private val transportContext = new TransportContext(transportConf,
    new NettyRpcHandler(dispatcher, this, streamManager))

5.3.5.1 NettyStreamManager

NettyStreamManager专门用来提供文件服务的能力。定义了3个文件与目录缓存。

5.3.5.2 NettyRpcHandler

两个重载的receive的方法。

  • 对客户端进行响应的receive方法。

      override def receive(
          client: TransportClient,
          message: ByteBuffer,
          callback: RpcResponseCallback): Unit = {
        val messageToDispatch = internalReceive(client, message)
        dispatcher.postRemoteMessage(messageToDispatch, callback)
      }
  • 对客户端进行响应的receive重载方法。

      override def receive(
          client: TransportClient,
          message: ByteBuffer): Unit = {
        val messageToDispatch = internalReceive(client, message)
        dispatcher.postOneWayMessage(messageToDispatch)
      }

5.3.6 创建传输客户端工厂 TransportClientFactory

Spark与远端RpcEnv进行通信都依赖于TransportClientFactory 创建的TransportClient

5.3.7 创建TransportServer

  @volatile private var server: TransportServer = _

  private val stopped = new AtomicBoolean(false)

TransportServer并未在这实例化,而是在启动EpcEnv的偏函数StartNerryRpcEnv,它负责调用NettyRpcEnv的startServer方法。

5.3.8 客户端请求发送

5.3.9 NettyRpcEnv 中常用方法

  • 获取RpcEndPoint的引用对象RpcEndPointRef

      override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
        dispatcher.getRpcEndpointRef(endpoint)
      }
  • 得到对应的RPCEndpointRef

  • 设置Endpoint

       override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
        dispatcher.registerRpcEndpoint(name, endpoint)
      }

5.4 序列化管理器 SerializerManager

SparkEnv中有两个序列化的组件:

  • SerializerManager
  • closureSerializer
    val serializer = instantiateClassFromConf[Serializer](SERIALIZER)
    logDebug(s"Using serializer: ${serializer.getClass}")

    val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)

5.5 广播管理器 BroadcastManager

BroadcastManager用于将配置信息和序列化后的RDD、Job及ShuffleDependency等信息在本地存储。如果为了容灾,也会复制到其他节点上。在sparkEnv.scala中:

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

5.6 map任务输出跟踪器

mapOutputTracker用于跟踪map任务的输出状态,此状态便于reduce任务定位map输出结果所在节点地址,进而获取中间输出结果。每个map任务或者reduce任务都会有其唯一的表示,分别为mapIdreduceId。每个reduce任务的输入可能是多个map任务的输出,reduce会到各个map任务所在节点上拉取Block,这一过程叫Shuffle。每次shuffle都有唯一标识shuffleId

5.6.1 mapOutputTracker 的实现

5.6.2 mapOutputTrackerMaster 的实现

mapOutputTrackerWorkermap任务的跟踪信息,通过mapOutputTrackerMasterEndpointRPCEndpointRef发送给mapOutputTrackerMaster ,由mapOutputTrackerMaster负责管理和维护所有的map任务的输出跟踪信息。

5.7 构架存储体系

  • Shuffle管理器 ShuffleManager
  • 内存管理器 MemoryManager
  • 块传输服务 BlockTransferService
  • BlockManagerMaster
  • 磁盘块管理器 DiskBlockManager
  • 块锁管理器 BlockInfoManager
  • 块管理器 BlockManager

第6章会讲每个组件的功能。

5.8 创建度量系统

SparkEnv中,度量系统也是必不可少的一个子组件。

    val metricsSystem = if (isDriver) {
      // Don't start metrics system right now for Driver.
      // We need to wait for the task scheduler to give us an app ID.
      // Then we can start the metrics system.
      MetricsSystem.createMetricsSystem(MetricsSystemInstances.DRIVER, conf, securityManager)
    } else {
      // We need to set the executor ID before the MetricsSystem is created because sources and
      // sinks specified in the metrics configuration file will want to incorporate this executor's
      // ID into the metrics they report.
      conf.set(EXECUTOR_ID, executorId)
      val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.EXECUTOR, conf,
        securityManager)
      ms.start()
      ms
    }

metrics中,createMetricsSystem是MetricsSystem伴生对象提供的实现:

  def createMetricsSystem(
      instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = {
    new MetricsSystem(instance, conf, securityMgr)
  }
}

5.9 输出提交协调器

当Spark应用程序用了Spark SQL(包括Hive)或者需要将任务的输出保存到HDFS时,就会用到输出提交协调器 OutputCommitCoordinator,它将决定任务是否可以提交输出到HDFS。无论是Driver还是Executor,在SparkEnv中都包含子组件OutputCommitCoordinator

SparkEnv中,创建OutputCommitCoordinator的代码如下:

    val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
      new OutputCommitCoordinator(conf, isDriver)
    }
    val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
      new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
    outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)

5.9.1 OutputCommitCoordinatorEndpoint的实现

5.9.2 OutputCommitCoordinator的实现

5.9.2 OutputCommitCoordinator的工作原理

image

5.10 创建SparkEnv

当SparkEnv内部的所有组件都是丽华完毕,将正式创建SparkEnv。

@shilinlee shilinlee added Spark Apache Spark 大数据 整个大数据体系 labels Aug 9, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Spark Apache Spark 大数据 整个大数据体系
Projects
None yet
Development

No branches or pull requests

1 participant