Spark在BDAS生态系统中处于核心地位,其他相关组件通过Spark实现对分布式并行处理任务的程序支持。本章试着从Spark内核代码实现的重要部分,来进一步剖析Spark,以加深读者对Spark设计思想与实现细节的理解。
4.1 Spark代码布局
4.1.1 Spark源码布局简介
图4-1列出了Spark的代码结构及包含的重点功能模块。读者可以通过这张图,可以对Spark的主要构成及代码布局产生直观的印象。 这些模块也构成了Spark架构中的的功能组件。根据Spark的代码布局,读者可以自行查阅源码,这对于掌握Spark的实现细节,加深对Spark实现机制的理解都是非常有必要的。
图4-1 Spark code layout
4.1.1 Spark Core内模块概述
下面对Spark core中的重点组成模块功能一一介绍:
(1)Api: Java,Pathon及R语言API的实现。
(2)Broadcast: 包含广播变量的实现。
(3)Deploy:Spark部署与启动运行的实现。
(4)Executor:worker节点负责计算部分的实现。
(5)Metrics:运行时状态监控的实现。
(6)Network:集群通信实现。
(7)Partial:近似评估代码。
(8)Serializer:序列化模块。
(9)Storage:存储模块。
(10)UI:监控界面的代码逻辑实现。
4.1.2 Spark Core外模块概述
下面是core以外的其他模块:
(1)Begal:Pregel是Google的图计算框架,Begal是基于Spark的轻量级Pregel实现。
(2)Mlib:机器学习算法库。
(3)SQL:SQL on Spark,提供大数据上的查询功能。
(4)GraphX:图计算模块的实现。
(5)Streaming:流处理框架Spark Streaming的实现。
(6)Yarn:Spark on Yarn的部分实现。
4.2 Spark执行主线[RDD->task]剖析
在前面一章中详细讲过,当Action算子被调用之后,Spark作业就开始进入切分调度执行的几个重点执行阶段。具体如图4-2所示,此处不再赘述:
图4-2 Spark执行主要阶段
在Spark中job作业从提交到切分成task在worker节点上执行,这个过程可以将其称之为Spark执行主线,这条主线是Spark原理的重点。在前面几章主要从原理的层面揭示了job提交之后会发生什么。本节将带领读者从源码层面深入剖析这条执行主线。通过本节,读者势必会对Spark的重点部分理解更加深入。
4.2.1 从RDD到DAGScheduler
因为Action算子会触发Job的提交,所以下面还是以count函数为例,来剖析整个执行主线。注:[]中为代码片段所在文件名。
[org.apache.spark.rdd.RDD]
[RDD.scala]
/**
* Return the number of elements in the RDD.
*/
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
很明显,在count函数中调用了runJob,runJob函数的实现位于org.apache.spark.SparkContext类中。
[SparkContext.scala]
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
/* 注意!从此处进入DAGScheduler阶段 */
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
从上述SparkContext.scala的runJob实现可以发现,其中调用了org.apache.spark.scheduler.DAGScheduler类中的runJob函数,说明RDD Graph处理完成,进入了DAGScheduler的处理阶段。
4.2.2 从DAGScheduler到TaskScheduler
下面介绍进入DAGScheduler之后的处理阶段,限于篇幅,在代码部分省略了部分不太重要的代码,读者在阅读本章后,可以使用IntellijIDEA阅读更完整的代码,建立更深入的理解。
[DAGScheduler.scala]
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
//注意! 这里继续调用了同一文件中的submitJob函数
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
waiter.awaitResult() match {
case JobSucceeded =>
...
case JobFailed(exception: Exception) =>
...
...
}
}
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
...
}
val jobId = nextJobId.getAndIncrement()
...
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
//注意,此处为Spark1.5.0中通信机制的新实现,发送JobSubmitted消息
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
下面列出JobSubmitted消息接收后的处理:
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
//处理消息JobSubmitted
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
//调用handleJobSubmitted函数
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case ...
...
在处理JobSubmitted的代码中,可以看到Spark继续调用了同一文件中的handleJobSubmitted函数,下面列出了该函数的重点代码片段,为了突出重点,略去了部分无关代码。
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
//将最后一个stage切分出来作为finalStage
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
...
}
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
//检验finalStage是否有依赖的父辈stage未被计算完成
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.resultOfJob = Some(job)
val stageIds = jobIdToStageIds(jobId).toArray
...
//提交finalStage
submitStage(finalStage)
submitWaitingStages()
}
下面看看finalStage被提交之后,Spark的处理逻辑。
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
...
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
...
//如果stage所有依赖的父辈stage已结算完成,则直接提交stage
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
//如果stage依赖的父辈stage未被计算完成,则递归调用本函数
submitStage(parent)
}
waitingStages += stage
...
}
在上面程序片段中,最后调用了submitMissingTasks函数提交stage。由下面的程序片段可以看出,此时DAGScheduler将task的调度交给了TaskScheduler,调用TaskSchedule中的submitTasks函数将task数组封装为TaskSet对象,然后提交TaskSet。具体如下:
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// Get our pending tasks and remember them in our pendingTasks entry
stage.pendingPartitions.clear()
...
if (tasks.size > 0) {
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingPartitions ++= tasks.map(_.partitionId)
logDebug("New pending partitions: " + stage.pendingPartitions)
//注意! 这里进入了task scheduler阶段来提交TaskSet
taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
...
---------------------------
[TaskSchedulerImpl.scala]
---------------------------
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
//生成TaskSetManager来执行taskset内的调度
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
...
//注意!在这里请求执行的计算资源
backend.reviveOffers()
}
上面submitTasks函数中最后调用了org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend类中的reviveOffers函数来请求计算资源,下面列出该函数的实现:
[CoarseGrainedSchedulerBackend.scala]
override def reviveOffers() {
//这里发送了ReviveOffers的消息
driverEndpoint.send(ReviveOffers)
}
下面我们继续追寻ReviveOffers消息的处理逻辑,具体如下:
[CoarseGrainedSchedulerBackend.scala]
override def receive: PartialFunction[Any, Unit] = {
case StatusUpdate(executorId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
s"from unknown executor with ID $executorId")
}
}
case ReviveOffers =>
//注意!调用makeOffers函数来处理ReviveOffers消息
makeOffers()
case KillTask =>
...
private def makeOffers() {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))
//获取可用的计算资源
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq
//启动task
launchTasks(scheduler.resourceOffers(workOffers))
}
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val serializedTask = ser.serialize(task)
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
...
...
}
else {
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
//注意!发送LaunchTask消息来执行启动task操作
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}
### 4.2.3 从TaskScheduler到worker节点
在上面程序片段中launchTasks函数最后发送LaunchTask消息来完成对task的启动操作,具体在org.apache.spark.executor.Executor中得到了完成。下面给出Executor.scala中的重点相关程序片段:
[Executor.scala]
private[spark] class Executor(
executorId: String,
executorHostname: String,
env: SparkEnv,
userClassPath: Seq[URL] = Nil, isLocal: Boolean = false)
extends Logging {
...
//启动worker节点上的thread pool
private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
private val executorSource = new ExecutorSource(threadPool, executorId)
...
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit = {
//将task包装成TaskRunner
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask)
//将TaskRunner加入running task list
runningTasks.put(taskId, tr)
//threadpool执行该task
threadPool.execute(tr)
}
至此,从Job提交到最终task在worker节点上执行的主线已剖析完。
4.3 Client,Master和Worker交互过程剖析
4.3.1 交互流程概览
在上一节我们沿着作业从提交到切分成task在worker节点上执行的一条主线来剖析了相关代码。本节我们将带领读者从另一个角度,即 Client, Master和worker之间交互的角度来剖析代码。 交互细节如图4-3所示:
图4-3 Client、Master和Worker之间交互
4.3.2 交互过程调用
下面,我们继续从org.apache.spark.SparkContext类中的启动调用序列看起:
[SparkContext.scala]
...
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
//启动task scheduler
_taskScheduler.start()
...
TaskScheduler的start函数实现在org.apache.spark.scheduler.TaskSchedulerImpl类中。
[TaskSchedulerImpl.scala]
...
override def start() {
//启动backend
backend.start()
...
下面接着给出上面提及的backend的启动实现关键代码,位于org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend中,具体如下:
[SparkDeploySchedulerBackend.scala]
...
override def start() {
super.start()
launcherBackend.connect()
...
client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
//生成并启动Client
client.start()
继续来看看org.apache.spark.deploy.client.AppClient的启动及关键部分代码:
[AppClient.scala]
...
def start() {
//生成ClientEndpoint对象,并启动rpcEndpoint
endpoint = rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv))
}
[ClientEndpoint类的部分实现]
override def onStart(): Unit = {
try {
//向Master注册
registerWithMaster(1)
} catch {
...
其中,registerWithMaster调用tryRegisterAllMasters函数来完成注册,如下:
[AppClient.scala]
...
private def registerWithMaster(nthRetry: Int) {
//调用tryRegisterAllMasters实现
registerMasterFutures = tryRegisterAllMasters()
...
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
...
val masterRef = rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
//AppClient向Master发送RegisterApplication消息
masterRef.send(RegisterApplication(appDescription, self))
下面我们来看看org.apache.spark.deploy.master.Master收到消息之后执行了那些操作。
[Master.scala]
...
//注册Application
case RegisterApplication(description, driver) => {
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
val app = createApplication(description, driver)
//注册application
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
//持久化app的元数据信息,可以选择持久化到哪里,或者不持久化
persistenceEngine.addApplication(app)
driver.send(RegisteredApplication(app.id, self))
//执行调度为待分配资源的Application分配资源,注意在每次有新的Application加入或者新的资源加入时都会调用schedule进行调度
schedule()
}
}
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) { return }
// Drivers take strict precedence over executors
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
//注意这里的条件
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
for (driver <- waitingDrivers) {
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
}
}
}
//启动Executor
startExecutorsOnWorkers()
}
schedule() 为处于待分配资源的Application分配资源。在每次有新的Application加入或者新的资源加入时都会调用schedule进行调度。为Application分配资源选择worker(executor),一般有两种策略:
(1)尽量打散:即一个Application尽可能多的分配到不同的节点。这个可以通过设置spark.deploy.spreadOut来实现。默认值为true,即尽量打散。
(2)尽量集中:即一个Application尽量分配到尽可能少的节点。
对于同一个Application,它在一个worker上只能拥有一个executor,但这个executor可能拥有多于1个的core的数量。
下面来看看launchExector的代码实现。
[Master.scala]
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
//更新worker的信息,可用core数和memory数减去本次分配的executor占用的
worker.addExecutor(exec)
//向worker节点发送LaunchExecutor消息请求启动Executor
worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
//通知AppClient已添加了Executor
exec.application.driver.send(ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}
接下,我们继续剖析worker节点收到消息后的主要操作,代码片段如下:
[worker.scala]
...
override def receive: PartialFunction[Any, Unit] = synchronized {
...
//处理LaunchExecutor消息
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else {
...
//创建executor工作目录
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}
...
//包装成ExecutorRunner
val manager = new ExecutorRunner(
appId,
execId,
appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self,
workerId,
host,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
workerUri,
conf,
appLocalDirs, ExecutorState.LOADING)
executors(appId + "/" + execId) = manager
//启动ExecutorRunner
manager.start()
//累计资源使用量
coresUsed += cores_
memoryUsed += memory_
//向Master发ExecutorStateChanged消息
sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
...
由上面程序解析可以看出,worker接收到来自Master的LaunchExecutor的消息后,会创建org.apache.spark.deploy.worker.ExecutorRunner。Worker本身会记录本身资源的使用情况,包括已经使用的CPU core数,memory等,但是这个统计只是为了web UI的展现。Master本身会记录Worker的资源使用情况,无需Worker汇报。Worker与Master之间的心跳的目的仅仅是为了报活,不会携带其他的信息。
  下面,我们继续深入ExecutorRunner类,分析下start函数实现。
[ExecutorRunner.scala]
private[worker] def start() {
//创建thread,其中run函数调用了fetchAndRunExecutor函数实现
workerThread = new Thread("ExecutorRunner for " + fullId) {
override def run() { fetchAndRunExecutor() }
}
//启动thread
workerThread.start()
// Shutdown hook that kills actors on shutdown.
shutdownHook = ShutdownHookManager.addShutdownHook { () =>
killProcess(Some("Worker shutting down")) }
}
private def fetchAndRunExecutor() {
try {
// Launch the process
val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
memory, sparkHome.getAbsolutePath, substituteVariables)
val command = builder.command()
val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
logInfo(s"Launch command: $formattedCommand")
builder.directory(executorDir)
builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
// Add webUI log urls
val baseUrl =
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
//启动进程process
process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
formattedCommand, "=" * 40)
// Redirect its stdout and stderr to files
val stdout = new File(executorDir, "stdout")
stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
val stderr = new File(executorDir, "stderr")
Files.write(header, stderr, UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
val exitCode = process.waitFor()
state = ExecutorState.EXITED
val message = "Command exited with code " + exitCode
//发消息ExecutorStateChanged通知Master状态变更
worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
至此,Executor启动完成。类似地,读者可以自行阅读Spark core代码,加深对Spark机制实现的理解。此处限于篇幅,不再详述。
4.4 Shuffle触发
在第三章中,本书就shuffle的基本概念与原理为读者做了介绍。下面我们从源码的角度,来进一步剖析shuffle的触发及其他重要知识点。
4.4.1 触发Shuffle Write
从前面章节的讲解,我们知道Mapper实际上是一个任务。在前面讲Spark调度时讲过DAG调度器会在一个Stage内部划分任务。 在实际过程中,会根据Stage的不同,得到ResultTask 和 ShuffleMapTask两类任务。ResultTask会将计算结果返回给 Driver,ShuffleMapTask 则将结果传递给Shuffle 依赖中的子 RDD,并将RDD划分为多个buckets,这个操作基于ShuffleDependency中指定的partitioner来完成。所以这里我们先从ShuffleMapTask入手,来剖析Mapper的大致工作流程。请读者阅读如下代码:
private[spark] class ShuffleMapTask(
stageId: Int,
stageAttemptId: Int,
taskBinary: Broadcast[Array[Byte]],
partition: Partition,
@transient private var locs: Seq[TaskLocation],
internalAccumulators: Seq[Accumulator[Long]])
extends Task[MapStatus](stageId, stageAttemptId, partition.index, internalAccumulators) with Logging {
[此处省略部分代码]
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)_executorDeserializeTime = System.currentTimeMillis() -deserializeStartTime
metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null
try {
/* 从ShuffleManager实例中获取该 ShuffleWriter对象 */
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
/* 触发shuffle 写操作 */
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}
override def preferredLocations: Seq[TaskLocation] = preferredLocs
override def toString: String = "ShuffleMapTask(%d, %d)".format(stageId, partitionId)
}
由于一个任务对应当前阶段末RDD内的一个分区,因此通过rdd.iterator(partition, context)可以计算得到该分区的数据。然后便是执行 Shuffle 写操作,该操作由一个 ShuffleWriter对象实例通过调用write接口完成,在上面代码段中已说明,Spark 从 ShuffleManager 实例中获取该 ShuffleWriter对象。
在这部分的代码实现中,Spark提供的Shuffle机制有两种,那么同样地,ShuffleManager也有两个子类:
1. HashShuffleManager
2. SortShuffleManager
ShuffleManager用于提供ShuffleWriter和ShuffleReader用于Shuffle写过程和Shuffle读过程。那么同样地,HashShuffleManager也提供HashShuffleWriter和HashShuffleReader。相应地SortShffleManager 提供了SortShuffleWriter 和 HashShuffleReader(注意,并非SortShuffleReader!)。细心的读者也许已经发现,Hash Shuffle 和Sort Shuffle 的唯一区别在于 Shuffle 写过程不同,它们读的过程是完全一样的。
4.4.2 触发Shuffle Read
本节我们继续来探索Shuffle read读操作触发。在Spark实现中,聚合器中三个方法是在PairRDDFunctions.combineByKey方法中指定。事实上当新的RDD与旧RDD二者分区器不同时,此时会生成一个ShuffledRDD。下面给出combineByKey的代码实现:
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined")
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
/* 分区器不同,此时产生了ShuffledRDD */
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}
细心的读者看到这里可能想知道如何得知ShuffledRDD采取什么办法来获取分区数据。让我们一起来看ShuffledRDD类的具体实现,代码片段如下所示:
/* ShuffledRDD.scala */
@DeveloperApi
class ShuffledRDD[K, V, C](
@transient var prev: RDD[_ <: Product2[K, V]], part: Partitioner)
extends RDD[(K, C)](prev.context, Nil) {
[此处省略部分代码...]
/*此处设定RDD shuffle的序列化器*/
def setSerializer(serializer: Serializer): ShuffledRDD[K, V, C] = {
this.serializer = Option(serializer)
this
}
/* 设定RDD shuffle的key排序 */
def setKeyOrdering(keyOrdering: Ordering[K]): ShuffledRDD[K, V, C] = {
this.keyOrdering = Option(keyOrdering)
this
}
/* 为RDD shuffle 设定aggregator*/
def setAggregator(aggregator: Aggregator[K, V, C]): ShuffledRDD[K, V, C] = {
this.aggregator = Option(aggregator)
this
}
/* 为RDD shuffle设定mapSideCombine flag */
def setMapSideCombine(mapSideCombine: Boolean): ShuffledRDD[K, V, C] = {
this.mapSideCombine = mapSideCombine
this
}
override def getDependencies: Seq[Dependency[_]] = {
List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
}
override val partitioner = Some(part)
override def getPartitions: Array[Partition] = {
Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i))
}
/* 此处触发shuffle read */
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]re
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
.read()
.asInstanceOf[Iterator[(K, C)]]
}
override def clearDependencies() {
super.clearDependencies()
prev = null
}
}
通过上述ShuffledRDD的具体代码实现实现可以看出,触发Shuffle读过程实际上与触发Shuffle写过程非常类似。二者首先从ShuffleManager中获取ShuffleReader,然后通过调用 ShuffleReader的read接口拉取(shuffle fetch)并计算特定分区中的数据。
4.5 Spark存储策略
在Spark开发实践中,开发者避免不了要和RDD打交道。spark应用即为通过调用RDD提供的各种transformation和action接口来实现。spark为了提高抽象层次,建立了RDD的概念,也因此在接口和实现之间降低了耦合,用户无需关心底层的实现。但是读者也许会问,RDD提供给我们的仅仅是接口的调用, 而操作的数据如何存放及访问?这部分的实现是怎么做的? 那么这就需要涉及到spark存储机制了。本节我们就spark存储机制,从源码的角度做一些提纲挈领的剖析和探索。限于篇幅,如果读者要深入每一个细节,那就要求读者深入阅读源码。
RDD类是开发者执行具体操作的类,也是存储机制的入口。这中间涉及了2个重要的类,即CacheManager类和BlockManager类,这两个类的概要介绍如下:
1. CacheManager类:是RDD和实际查询之间的中间层。
- 将RDD的信息传递给BlockManager。
- 保证每个节点不会重复读取RDD,并提供并发控制。
2. BlockManager类提供了实际的查询接口,通过MemoryStore、DiskStore和TachyonStore三个类管理具体的缓存位置。
实际上RDD中的iterator方法是缓存读取机制的入口。关于iterator的实现请参见如下代码序列:
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
/* 这里调用cacheManager的方法来查询 */
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
/* 重新计算*/
computeOrReadCheckpoint(split, context)
}
}
由上述代码实现不难发现,当存储级别不为NONE的时候,就会以Partition为分片进行缓存查询,否则就调用computeOrReadCheckpoint重新计算。用CacheManager类的getOrCompute接口调用BlockManager类的get方法来获取数据。在getOrCompute函数中顶层抽象中的Partition与底层的Block形成了联系。
下面将对这些存储机制相关的核心类来做进一步的剖析。
4.5.1 CacheManager职能
在spark的存储机制实现中,当RDD在进行计算时,通过CacheManager来获取数据,并通过CacheManager来存储计算结果。CacheManager负责将RDD的partition内容传递给BlockManager,并且 确保同一节点一次只会载入一次该RDD。在前面所讲的RDD的iterator方法中,使用了CacheManager类的getOrCompute方法来执行缓存查询,本节以这个方法为入口,来探讨CacheManager的职能。
def getOrCompute[T](
rdd: RDD[T],
partition: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {
val key = RDDBlockId(rdd.id, partition.index)
logDebug(s"Looking for partition $key")
blockManager.get(key) match {
case Some(blockResult) =>
/* 分区已包含数据,因此直接返回值即可 */
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(blockResult.readMethod)
existingMetrics.incBytesRead(blockResult.bytes)
val iter = blockResult.data.asInstanceOf[Iterator[T]]
new InterruptibleIterator[T](context, iter) {
override def next(): T = {
existingMetrics.incRecordsRead(1)
delegate.next()
}
}
case None =>
/* 获取载入分区的锁 */
/* 如果其他线程已持有锁,那么等待它执行完成 */
val storedValues = acquireLockForPartition[T](key)
if (storedValues.isDefined) {
return new InterruptibleIterator[T](context, storedValues.get)
}
/* 载入分区 */
try {
logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
/* 如果该任务在本地运行则不必保存结果 */
if (context.isRunningLocally) {
return computedValues
}
/* 缓存value并追踪block状态更新 */
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
val metrics = context.taskMetrics
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
new InterruptibleIterator(context, cachedValues)
} finally {
loading.synchronized {
loading.notifyAll()
loading.remove(key)
}
}
}
}
从上述代码片段可以看出,首先调用RDDBlockId方法将要查询的Patition转化成BlockId,进而调用BlockManager类的get方法进行查询。如果查询成功,那么会把查询结果以task为单位储存起来。不难发现,即使储存级别不是NONE也有可能无法从缓存中查询到。另外,在查询过程中会出现并发,因此需要加锁。如果缓存未被命中时,那么会调用RDD中的computeOrReadCheckpoint方法来计算。这里需要注意的是,如果task在本地运行则直接返回计算结果,否则会调用putInBlockManager上传缓存,同时跟踪缓存的status来保证缓存的一致性。下面继续来探究putInBlockManager的实现逻辑,在代码实现的关键点我已经添加了注释来帮助读者理解。
private def putInBlockManager[T](
key: BlockId,
values: Iterator[T],
level: StorageLevel,
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
val putLevel = effectiveStorageLevel.getOrElse(level)
if (!putLevel.useMemory) {
/*
* 如果存储级别不是在内存里,那么可以直接将计算结果以iterator的形式传给BlockManager,而非在内存中展开
* 调用其putIterator方法进行储存,否则要先在MemoryStore类中注册。
* 储存结束后还要查询一下保证缓存成功。
* [注意]此处的putIterator方法会在后面介绍BlockManager时进一步详细介绍
*/
updatedBlocks ++=
blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
blockManager.get(key) match {
case Some(v) => v.data.asInstanceOf[Iterator[T]]
case None =>
logInfo(s"Failure to store $key")
throw new BlockException(key, s"Block manager failed to return cached value for $key!")
}
} else {
/*
* 如果RDD缓存在内存中的话,那么不能直接传递iterator,而是调用putArray方法将整个数组储存起来。
* 因为将来这个partition可能会被再次查询之前从内存中删除掉,这样就会导致迭代器失效。
* 另外要先在内存中注册,因为有可能出现内存空间不够的OOM异常。出现时会选择一个合适的partition
* 落地到磁盘上。选择过程由MemoryStore.unrollSafely进行。
* [注意]此处调用的putArray方法会在后面详细介绍
*/
blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
case Left(arr) =>
/* 已成功地展开整个partition,因此缓存在了内存中 */
updatedBlocks ++=
blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
arr.iterator.asInstanceOf[Iterator[T]]
case Right(it) =>
/* 内存空间不够,无法在内存中缓存partition */
val returnValues = it.asInstanceOf[Iterator[T]]
if (putLevel.useDisk) {
logWarning(s"Persisting partition $key to disk instead.")
val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false, useOffHeap = false, deserialized = false, putLevel.replication)
putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
} else {
returnValues
}
}
}
}
4.5.2 BlockManager职能
由上一节内容可以看出CacheManager在进行数据读取和存取的时候主要是依赖BlockManager接口来操作,BlockManager的职能是决定数据是从内存(MemoryStore)还是从磁盘(DiskStore)中获取。并且BlockManager类提供getLocal与getRemote方法从本地或远程查询数据。在getLocal的实现中调用了doGetLocal方法,因此getLocal可以看作是doGetLocal的封装。
而doGetLocal会先通过blockdId获得blockinfo,然后取出此block的存储级别,进而进入不同分支。例如memory、tachyon或disk。而memory和tachyon本质都是在内存中储存的,但disk分支在查询到结果后还会再进行判断,判断这个block原来的储存级别是否是memory。如果是,那么将这个block载入内存。下面我们来看do GetLocal的代码实现:
private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
/* 检测block是否存在,在小概率情况下,它会被removeBlock删除。
* 即使用户有意删除block,此处的条件分支依然可以通过
* 但最终会由于找不到block而抛出异常
*/
if (blockInfo.get(blockId).isEmpty) {
logWarning(s"Block $blockId had been removed")
return None
}
/* 如果有其他线程正在写该block, 那么等待 */
if (!info.waitForReady()) {
// If we get here, the block write failed.
logWarning(s"Block $blockId was marked as failure.")
return None
}
val level = info.level
logDebug(s"Level for block $blockId is $level")
/* 在内存中查找block */
if (level.useMemory) {
logDebug(s"Getting block $blockId from memory")
val result = if (asBlockResult) {
memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
} else {
memoryStore.getBytes(blockId)
}
result match {
case Some(values) =>
return result
case None =>
logDebug(s"Block $blockId not found in memory")
}
}
/* 在外部block store中查找block */
if (level.useOffHeap) {
logDebug(s"Getting block $blockId from ExternalBlockStore")
if (externalBlockStore.contains(blockId)) {
val result = if (asBlockResult) {
externalBlockStore.getValues(blockId)
.map(new BlockResult(_, DataReadMethod.Memory, info.size))
} else {
externalBlockStore.getBytes(blockId)
}
result match {
case Some(values) =>
return result
case None =>
logDebug(s"Block $blockId not found in ExternalBlockStore")
}
}
}
/* 在硬盘上查找block,必要时将其载入内存 */
if (level.useDisk) {
logDebug(s"Getting block $blockId from disk")
val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
case Some(b) => b
case None =>
throw new BlockException(
blockId, s"Block $blockId not found on disk, though it should be")
}
assert(0 == bytes.position())
if (!level.useMemory) {
/* 若block不该被保存在内存中,则直接返回 */
if (asBlockResult) {
return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk, info.size))
} else {
return Some(bytes)
}
} else {
/* 否则,在memory store中保存部分数据 */
if (!level.deserialized || !asBlockResult) {
/* 当block的存储级别包括"memory serialized"时或当block应该被在内存中缓存为对象时
* 在内存中保存部分字节(只需要序列化的字节)
*/
memoryStore.putBytes(blockId, bytes.limit, () => {
/* 当文件大于内存剩余空间时,触发OOM。当无法将文件放入memory store时,copyForMemory会被创建*/
val copyForMemory = ByteBuffer.allocate(bytes.limit)
copyForMemory.put(bytes)
})
bytes.rewind()
}
if (!asBlockResult) {
return Some(bytes)
} else {
val values = dataDeserialize(blockId, bytes)
if (level.deserialized) {
/* 在返回结果之前先缓存 */
val putResult = memoryStore.putIterator(
blockId, values, level, returnValues = true, allowPersistToDisk = false)
/* 当空间不够时, put可能失败*/
putResult.data match {
case Left(it) =>
return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
case _ =>
/* 当value被落地到硬盘时,抛出该异常 */
throw new SparkException("Memory store did not return an iterator!")
}
} else {
return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
}
}
}
}
}
} else {
logDebug(s"Block $blockId not registered locally")
}
None
}
查询过程中BlockManager不会直接调用底层的查询函数,而是通过MemoryStore、DiskStore等管理类代理。getRemote方法实际也是doGetRemote的包装。doGetRemote的过程比较简单,就是先获得blockinfo,然后查询自己在集群中的locations,最后持续依照locations将blockinfo发送给远端,等待任一个远端返回数据之后查询结束。接下来看一下put相关方法,在前面我们发现向BlockManager提交存储调用了如下两个接口:
1. putArray
2. putIterator
事实上,两个函数都是doPut方法的简单封装,在他们的实现中调用了doPut方法,因此下面我们重点来研究doPut方法的实现。
private def doPut(
blockId: BlockId,
data: BlockValues,
level: StorageLevel,
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None)
: Seq[(BlockId, BlockStatus)] = {
require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
effectiveStorageLevel.foreach { level =>
require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
}putInBlockManager
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
/* 依据block的存储级别而正确地将其落地到硬盘。
* 然而,除非我们对该block调用markReady,
* 否则其他线程无法对该block调用get方法
*/
val putBlockInfo = {
val tinfo = new BlockInfo(level, tellMaster)
// Do atomically !
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
if (oldBlockOpt.isDefined) {
if (oldBlockOpt.get.waitForReady()) {
logWarning(s"Block $blockId already exists on this machine; not re-adding it")
return updatedBlocks
}
oldBlockOpt.get
} else {
tinfo
}
}
val startTimeMs = System.currentTimeMillis
/* If we're storing values and we need to replicate the data, we'll want access to the values,
* but because our put will read the whole iterator, there will be no values left. For the
* case where the put serializes data, we'll remember the bytes, above; but for the case where
* it doesn't, such as deserialized storage, let's rely on the put returning an Iterator.
*/
var valuesAfterPut: Iterator[Any] = null
// Ditto for the bytes after the put
var bytesAfterPut: ByteBuffer = null
/* block的大小(bytes为单位)*/
var size = 0L
// The level we actually use to put the block
val putLevel = effectiveStorageLevel.getOrElse(level)
// If we're storing bytes, then initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
val replicationFuture = data match {
case b: ByteBufferValues if putLevel.replication > 1 =>
// Duplicate doesn't copy the bytes, but just creates a wrapper
val bufferView = b.buffer.duplicate()
Future {
// This is a blocking action and should run in futureExecutionContext which is a cached
// thread pool
replicate(blockId, bufferView, putLevel)
}(futureExecutionContext)
case _ => null
}
putBlockInfo.synchronized {
logTrace("Put for block %s took %s to get into synchronized block"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
var marked = false
try {
/* returnValues - 是否返回values
* blockStore - 存放values的存储类型
*/
val (returnValues, blockStore: BlockStore) = {
if (putLevel.useMemory) {
/* 先存在内存,即使设置了useDisk为true。 若内存hold不住的时候, 将它落地到硬盘*/
(true, memoryStore)
} else if (putLevel.useOffHeap) {
// Use external block store
(false, externalBlockStore)
} else if (putLevel.useDisk) {
// Don't get back the bytes from put unless we replicate them
(putLevel.replication > 1, diskStore)
} else {
assert(putLevel == StorageLevel.NONE)
throw new BlockException(
blockId, s"Attempted to put block $blockId without specifying storage level!")
}
}
// Actually put the values
val result = data match {
case IteratorValues(iterator) =>
blockStore.putIterator(blockId, iterator, putLevel, returnValues)
case ArrayValues(array) =>
blockStore.putArray(blockId, array, putLevel, returnValues)
case ByteBufferValues(bytes) =>
bytes.rewind()
blockStore.putBytes(blockId, bytes, putLevel)
}
size = result.size
result.data match {
case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
case Right (newBytes) => bytesAfterPut = newBytes
case _ =>
}
// Keep track of which blocks are dropped from memory
if (putLevel.useMemory) {
result.droppedBlocks.foreach { updatedBlocks += _ }
}
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
// Now that the block is in either the memory, externalBlockStore, or disk store,
// let other threads read it, and tell the master about it.
marked = true
putBlockInfo.markReady(size)
if (tellMaster) {
reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
}
updatedBlocks += ((blockId, putBlockStatus))
}
} finally {
// If we failed in putting the block to memory/disk, notify other possible readers
// that it has failed, and then remove it from the block info map.
if (!marked) {
// Note that the remove must happen before markFailure otherwise another thread
// could've inserted a new BlockInfo before we remove it.
blockInfo.remove(blockId)
putBlockInfo.markFailure()
logWarning(s"Putting block $blockId failed")
}
}
}
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
// Either we're storing bytes and we asynchronously started replication, or we're storing
// values and need to serialize and replicate them now:
if (putLevel.replication > 1) {
data match {
case ByteBufferValues(bytes) =>
if (replicationFuture != null) {
Await.ready(replicationFuture, Duration.Inf)
}
case _ =>
val remoteStartTime = System.currentTimeMillis
// Serialize the block if not already done
if (bytesAfterPut == null) {
if (valuesAfterPut == null) {
throw new SparkException(
"Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
}
bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
}
replicate(blockId, bytesAfterPut, putLevel)
logDebug("Put block %s remotely took %s"
.format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
}
}
BlockManager.dispose(bytesAfterPut)
if (putLevel.replication > 1) {
logDebug("Putting block %s with replication took %s"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
} else {
logDebug("Putting block %s without replication took %s"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
}
updatedBlocks
}
doPut方法的职能可以总结为如下几点:
1. 为block创建BlockInfo结构体存储block相关信息,同时将其加锁使其不能被访问。
2. 根据block的replication数决定是否将该block拷贝到远端。
3. 根据block的storage level决定将block存储到内存还是硬盘上,同时解锁标识该block已经ready,可被访问。
4.5.3 DiskStore与DiskBlockManager类
本节我们继续来探索实现具体存储落地到硬盘的过程。首先我们看一下两个重点类:
(1) DiskStore
(2) DiskBlockManager
事实上DiskStore虽然承担着将block存储到硬盘上的工作,但它仍然没有直接调用底层操作,而是用DiskBlockManager来管理。在DiskBlockManager实现中通过创建数组以哈希表的形式保存了文件的路径,而查找文件路径是通过getFile完成的,在数组中,以hash的方式来查找文件所在路径。下面我们重点来看getFile的实现:
def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
/* 如果子目录不存在则创建它 */
val subDir = subDirs(dirId).synchronized {
val old = subDirs(dirId)(subDirId)
if (old != null) {
old
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists() && !newDir.mkdir()) {
throw new IOException(s"Failed to create local dir in $newDir.")
}
subDirs(dirId)(subDirId) = newDir
newDir
}
}
new File(subDir, filename)
}
getFile方法先根据filename计算出hash值,将hash取模获得dirId和subDirId,进而在subDirs中找出相应的subDir。如果不存在则创建一个subDir,最后以subDir为路径、filename为文件名创建文件对象,DiskBlockManager使用此文件对象将block写入硬盘或读从硬盘中读出block,详细请参见DiskStore.scala文件。
4.5.4 MemoryStore类
本节我们研究下MemoryStore类的实现。MemoryStore类的职能是将block存储到内存,一般采用如下两种方式:
(1) 以数组的方式,数组中保存了java对象的反序列化对象。
(2) 以序列化的ByteBuffers方式保存。
MemoryStore类的实现中很少比较“重”的操作,比如创建文件及文件读取等等。但在MemoryStore类中,它维护了一个java.util.LinkedHashMap[BlockId, MemoryEntry],将blockId映射到内存的入口地址。如此一来,读取block会大大简化,因为直接操作该哈希表。在保存block至内存这个功能点上,MemoryStore类提供了putBytes、putArray等方法。查阅这几个方法的实现后发现它们都是对tryToPut方法的封装。因此下面我们重点介绍tryToPut方法的代码实现。
private def tryToPut(
blockId: BlockId,
value: () => Any,
size: Long,
deserialized: Boolean): ResultWithDroppedBlocks = {
var putSuccess = false
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
accountingLock.synchronized {
val freeSpaceResult = ensureFreeSpace(blockId, size)
val enoughFreeSpace = freeSpaceResult.success
droppedBlocks ++= freeSpaceResult.droppedBlocks
if (enoughFreeSpace) {
val entry = new MemoryEntry(value(), size, deserialized)
entries.synchronized {
entries.put(blockId, entry)
currentMemory += size
}
val valuesOrBytes = if (deserialized) "values" else "bytes"
logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
putSuccess = true
} else {
/* 告诉block manager无法将block放入内存中,该block可被落地到硬盘(如果该block允许在硬盘中保存的话)*/
lazy val data = if (deserialized) {
Left(value().asInstanceOf[Array[Any]])
} else {
Right(value().asInstanceOf[ByteBuffer].duplicate())
}
val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
}
// Release the unroll memory used because we no longer need the underlying Array
releasePendingUnrollMemoryForThisTask()
}
ResultWithDroppedBlocks(putSuccess, droppedBlocks)
}
从上述tryToPut方法实现中不难看出,它首先调用ensureFreeSpace方法,确保留出足够的空间。然后函数依据在不交换空间的情况下内存是否足够而分为两支,简述如下:
(1) 若内存足够,那么直接将数据写入内存中,然后将entry加入entries哈希表。
(2) 若内存不够,可将这个block直接写到硬盘中。
至此,读者也许会问在什么情况下会导致内存不够,并且被交换的块该如何选择呢? 下面一起来继续研究ensureFreeSpace方法实现。
private def ensureFreeSpace(
blockIdToAdd: BlockId,
space: Long): ResultWithDroppedBlocks = {
logInfo(s"ensureFreeSpace($space) called with curMem=$currentMemory, maxMem=$maxMemory")
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
if (space > maxMemory) {
logInfo(s"Will not store $blockIdToAdd as it is larger than our memory limit")
return ResultWithDroppedBlocks(success = false, droppedBlocks)
}
// Take into account the amount of memory currently occupied by unrolling blocks
// and minus the pending unroll memory for that block on current thread.
val taskAttemptId = currentTaskAttemptId()
val actualFreeMemory = freeMemory - currentUnrollMemory +
pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L)
if (actualFreeMemory < space) {
val rddToAdd = getRddId(blockIdToAdd)
val selectedBlocks = new ArrayBuffer[BlockId]
var selectedMemory = 0L
// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
// can lead to exceptions.
entries.synchronized {
val iterator = entries.entrySet().iterator()
while (actualFreeMemory + selectedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
selectedBlocks += blockId
selectedMemory += pair.getValue.size
}
}
}
if (actualFreeMemory + selectedMemory >= space) {
logInfo(s"${selectedBlocks.size} blocks selected for dropping")
for (blockId <- selectedBlocks) {
val entry = entries.synchronized { entries.get(blockId) }
// This should never be null as only one task should be dropping
// blocks and removing entries. However the check is still here for
// future safety.
if (entry != null) {
val data = if (entry.deserialized) {
Left(entry.value.asInstanceOf[Array[Any]])
} else {
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
}
val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
}
}
return ResultWithDroppedBlocks(success = true, droppedBlocks)
} else {
logInfo(s"Will not store $blockIdToAdd as it would require dropping another block " +
"from the same RDD")
return ResultWithDroppedBlocks(success = false, droppedBlocks)
}
}
ResultWithDroppedBlocks(success = true, droppedBlocks)
}
从ensureFreeSpace方法的实现流程中可以看出,首先它会维护一个selectedBlocks数组,该数组中保存了可供替换的的block。另外selectedMemory表示能够空出的最大空间。而selectedBlocks数组的产生过程是先遍历entries哈希表,将不属于当前待加入RDD的block加进去,在尽量保证当前RDD完全缓存到内存中的前提下,使用了FIFO淘汰机制。当selectedBlocks被生成之后,先来判断如果全部释放空间是否足够,如果不够则返回。如果足够的话,那么会依次将里面的block交换出内存,直到产生的空余空间足够。
本节我们通过对源码的分析来对spark的缓存策略做了深入探索。当开发者调用RDD.iterator时会自动触发缓存机制,将这个RDD以默认为memory的缓存级别缓存起来。同时读取缓存也是完全自动的,不需要用户干预。当内存满了之后会在尽量保证当前RDD完整的情况下,采用FIFO策略选取部分block交换至disk中以空出部分空间。而当硬盘中的block被再次用到并且缓存级别是内存时,就会自动重新读入内存中。
4.6 本章小结
本章首先与spark1.5.0的代码布局做了宏观介绍,进而对spark的执行主线做了详细剖析,从代码层面详细讲述了RDD是如何落地到worker上执行的。接着,本章又从另一个角度分析了client,master与worker之间的交互过程。最后本章深入讲述了spark的两个重要功能点及spark shuffle与spark存储机制。 学习本章的讲解后,希望读者能自行深入研究spark代码,加深对spark内部实现原理的理解。