189 8069 5689

spark(二):spark架构及物理执行图-创新互联

spark(二):spark架构及物理执行图
上图是一个job的提交流程图,job提交的具体步骤如下

创新互联主要从事网站制作、成都网站建设、网页设计、企业做网站、公司建网站等业务。立足成都服务老河口,十载网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:18980820575
  1. 一旦有action,就会触发DagScheduler.runJob来提交任务,主要是先生成逻辑执行图DAG,然后调用 finalStage = newStage() 来划分 stage。
  2. new Stage() 的时候会调用 finalRDD 的 getParentStages();
  3. getParentStages() 从 finalRDD 出发,反向 visit 逻辑执行图,遇到 NarrowDependency 就将依赖的 RDD 加入到 stage,遇到 ShuffleDependency 切开 stage,并递归到 ShuffleDepedency 依赖的 stage。
  4. 一个 ShuffleMapStage(不是最后形成 result 的 stage)形成后,会将该 stage 最后一个 RDD 注册到MapOutputTrackerMaster.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size),这一步很重要,因为 shuffle 过程需要 MapOutputTrackerMaster 来指示 ShuffleMapTask 输出数据的位置。
  5. 之后是submitStage(finalStage)
  6. 先确定该 stage 的 missingParentStages,使用getMissingParentStages(stage)。如果 parentStages 都可能已经执行过了,那么就为空了。
  7. 如果 missingParentStages 不为空,那么先递归提交 missing 的 parent stages,并将自己加入到 waitingStages 里面,等到 parent stages 执行结束后,会触发提交 waitingStages 里面的 stage。
  8. 如果 missingParentStages 为空,说明该 stage 可以立即执行,那么就调用submitMissingTasks(stage, jobId)来生成和提交具体的 task。如果 stage 是 ShuffleMapStage,那么 new 出来与该 stage 最后一个 RDD 的 partition 数相同的 ShuffleMapTasks。如果 stage 是 ResultStage,那么 new 出来与 stage 最后一个 RDD 的 partition 个数相同的 ResultTasks。一个 stage 里面的 task 组成一个 TaskSet,最后调用taskScheduler.submitTasks(taskSet)来提交一整个 taskSet。
  9. taskScheduler会把task发给DriverActor进程,DriverActor序列话之后发给exector真正执行。

spark(二):spark架构及物理执行图
上图是task执行流程,具体执行过程如下

  1. Worker 端接收到 tasks 后,executor 将 task 包装成 taskRunner,并从线程池中抽取出一个空闲线程运行 task。
  2. Executor 收到 serialized 的 task 后,先 deserialize 出正常的 task,然后运行 task 得到其执行结果 directResult,这个结果要送回到 driver 那里。
  3. 如果 result 比较大(比如 groupByKey 的 result)先把 result 存放到本地的“内存+磁盘”上,由 blockManager 来管理,只把存储位置信息(indirectResult)发送给 driver。
  4. ShuffleMapTask 生成的是 MapStatus,MapStatus 包含两项内容:一是该 task 所在的 BlockManager 的 BlockManagerId(实际是 executorId + host, port, nettyPort),二是 task 输出的每个 FileSegment 大小。
  5. ResultTask 生成的 result 的是 func 在 partition 上的执行结果。**比如 count() 的 func 就是统计 partition 中 records 的个数。
  6. Driver 收到 task 的执行结果 result 后会进行一系列的操作:
  7. a,首先告诉 taskScheduler 这个 task 已经执行完,然后去分析 result。
  8. b,如果是 ResultTask 的 result,那么可以使用 ResultHandler 对 result 进行 driver 端的计算(比如 count() 会对所有 ResultTask 的 result 作 sum)
  9. c,如果 result 是 ShuffleMapTask 的 MapStatus,那么需要将 MapStatus(ShuffleMapTask 输出的 FileSegment 的位置和大小信息)存放到 mapOutputTrackerMaster 中的 mapStatuses 数据结构中以便以后 reducer shuffle 的时候查询
  10. d,如果 driver 收到的 task 是该 stage 中的最后一个 task,那么可以 submit 下一个 stage,如果该 stage 已经是最后一个 stage,那么告诉 dagScheduler job 已经完成

另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


分享文章:spark(二):spark架构及物理执行图-创新互联
文章位置:http://cdxtjz.cn/article/possd.html

其他资讯