在前面的章節(jié)Client的加載中,Spark的DriverRunner已開(kāi)始執(zhí)行用戶任務(wù)類(比如:org.apache.spark.examples.SparkPi),下面我們開(kāi)始針對(duì)于用戶任務(wù)類(或者任務(wù)代碼)進(jìn)行分析

 

一、整體預(yù)覽

          基于上篇圖做了擴(kuò)展,增加任務(wù)執(zhí)行的相關(guān)交互

     iOS培訓(xùn),Swift培訓(xùn),蘋(píng)果開(kāi)發(fā)培訓(xùn),移動(dòng)開(kāi)發(fā)培訓(xùn)
iOS培訓(xùn),Swift培訓(xùn),蘋(píng)果開(kāi)發(fā)培訓(xùn),移動(dòng)開(kāi)發(fā)培訓(xùn)

  • Code:指的用戶編寫(xiě)的代碼

  • RDD:彈性分布式數(shù)據(jù)集,用戶編碼根據(jù)SparkContext與RDD的api能夠很好的將Code轉(zhuǎn)化為RDD數(shù)據(jù)結(jié)構(gòu)(下文將做轉(zhuǎn)化細(xì)節(jié)介紹)

  • DAGScheduler:有向無(wú)環(huán)圖調(diào)度器,將RDD封裝為JobSubmitted對(duì)象存入EventLoop(實(shí)現(xiàn)類DAGSchedulerEventProcessLoop)隊(duì)列中

  • EventLoop: 定時(shí)掃描未處理JobSubmitted對(duì)象,將JobSubmitted對(duì)象提交給DAGScheduler

  • DAGScheduler:針對(duì)于JobSubmitted進(jìn)行處理,最終將RDD轉(zhuǎn)化為執(zhí)行TaskSet,并將TaskSet提交至TaskScheduler

  • TaskScheduler: 根據(jù)TaskSet創(chuàng)建TaskSetManager對(duì)象存入SchedulableBuilder的數(shù)據(jù)池(Pool)中,并調(diào)用DriverEndpoint喚起消費(fèi)(ReviveOffers)操作

  • DriverEndpoint:接受ReviveOffers指令后將TaskSet中的Tasks根據(jù)相關(guān)規(guī)則均勻分配給Executor

  • Executor:?jiǎn)?dòng)一個(gè)TaskRunner執(zhí)行一個(gè)Task

 

二、Code轉(zhuǎn)化為初始RDDs

          我們的用戶代碼通過(guò)調(diào)用Spark的Api(比如:SparkSession.builder.appName("Spark Pi").getOrCreate()),該Api會(huì)創(chuàng)建Spark的上下文(SparkContext),當(dāng)我們調(diào)用transform類方法 (如:parallelize(),map())都會(huì)創(chuàng)建(或者裝飾已有的) Spark數(shù)據(jù)結(jié)構(gòu)(RDD), 如果是action類操作(如:reduce()),那么將最后封裝的RDD作為一次Job提交,存入待調(diào)度隊(duì)列中(DAGSchedulerEventProcessLoop )待后續(xù)異步處理。

          如果多次調(diào)用action類操作,那么封裝的多個(gè)RDD作為多個(gè)Job提交。

     流程如下:

     iOS培訓(xùn),Swift培訓(xùn),蘋(píng)果開(kāi)發(fā)培訓(xùn),移動(dòng)開(kāi)發(fā)培訓(xùn)iOS培訓(xùn),Swift培訓(xùn),蘋(píng)果開(kāi)發(fā)培訓(xùn),移動(dòng)開(kāi)發(fā)培訓(xùn)

  • ExecuteEnv(執(zhí)行環(huán)境 )


    • 這里可以是通過(guò)spark-submit提交的MainClass,也可以是spark-shell腳本

    • MainClass : 代碼中必定會(huì)創(chuàng)建或者獲取一個(gè)SparkContext

    • spark-shell:默認(rèn)會(huì)創(chuàng)建一個(gè)SparkContext

  • RDD(彈性分布式數(shù)據(jù)集)


    • create:可以直接創(chuàng)建(如:sc.parallelize(1 until n, slices) ),也可以在其他地方讀?。ㄈ纾簊c.textFile("README.md"))等

      網(wǎng)友評(píng)論