SPARK-Task计算本地性的问题

1.一个 task 对应一个 stage 里面一个分区数据的处理任务

 而一个分区都有 

partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap

 即对于每个分区有自己的数据优先位置

 2. 

 TaskSchedulerImpl :def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {

 方法的前面 通过 

  protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] 方法

 假如得到了所有可用的WorkerOffer,

 2.1 并且假如 所有可用的WorkerOffer 中,有4个WorkerOffer (名字假如为 A,B,C,D),并且是1个CORE,并且都是TaskLocality.PROCESS_LOCAL(都有空闲的executor)只是主机名不同而以,

 2.2 假如 某个TaskSetManager 的 只有2个task ,都只需要1个CORE,也是TaskLocality.PROCESS_LOCAL的,但是 这2个task的数据 getPreferredLoc 只是A,B

 2.3  resourceOfferSingleTaskSet 方法中,这个TaskSet根据数据优先应该找到A,B

 2.4.1 我的疑问是TaskSet怎么找到A,B,而不是错找到 C,D 呢? 

 2.4.2 可能getPreferredLocs已经确定了只能够是WorkerOffer(A,B),而computeValidLocalityLevels之类的方法只是辅助得到计算的本地级别,并没有比较主机名.

 addPendingTask(index: Int)方法也只是 pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index,并没有比较主机名.

 我的意思像2.1假设那样),这些方法没有比较主机名

 2.5.1 可能 def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] 方法返回的是 传递到executor上即将被执行的Task的描述,(仅是逻辑调度,实际上并未分配到executor上执行)

 真正的将Task分配到指定的executor上去执行 是 private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { 方法 通过

 executorData = executorDataMap(task.executorId)  

 executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))去WorkerOffer(A,B)执行

 2.5.2 如果是2.5.1这样的流程,为什么还是需要 protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer]得到了所有可用的WorkerOffer

 在得到所有的WorkerOffer基础上 调用

 resourceOfferSingleTaskSet(taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)方法匹配呢? 

 直接在getPreferredLocs 对应的 WorkerOffer(A,B) 中匹配计算优先TaskLocality 不就可以吗? 

 实在不好意思,SPARK 的源码没有看懂,帮忙解惑下

要回复问题请先登录注册