通過上一節(jié)內(nèi)容,DriverEndpoint最終生成多個(gè)可執(zhí)行的TaskDescription對象,并向各個(gè)ExecutorEndpoint發(fā)送LaunchTask指令,本節(jié)內(nèi)容將關(guān)注ExecutorEndpoint如何處理LaunchTask指令,處理完成后如何回饋給DriverEndpoint,以及整個(gè)job最終如何多次調(diào)度直至結(jié)束。

 

一、Task的執(zhí)行流程

     承接上一節(jié)內(nèi)容,Executor接受LaunchTask指令后,開啟一個(gè)新線程TaskRunner解析RDD,并調(diào)用RDD的compute方法,歸并函數(shù)得到最終任務(wù)執(zhí)行結(jié)果

     Android培訓(xùn),安卓培訓(xùn),手機(jī)開發(fā)培訓(xùn),移動開發(fā)培訓(xùn),云培訓(xùn)培訓(xùn)Android培訓(xùn),安卓培訓(xùn),手機(jī)開發(fā)培訓(xùn),移動開發(fā)培訓(xùn),云培訓(xùn)培訓(xùn)

  • ExecutorEndpoint接受到LaunchTask指令后,解碼出TaskDescription,調(diào)用Executor的launchTask方法

  • Executor創(chuàng)建一個(gè)TaskRunner線程,并啟動線程,同時(shí)將改線程添加到Executor的成員對象中,代碼如下:

private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
runningTasks.put(taskDescription.taskId, taskRunner)
  • TaskRunner


    • 首先向DriverEndpoint發(fā)送任務(wù)最新狀態(tài)為RUNNING

    • 從TaskDescription解析出Task,并調(diào)用Task的run方法

  • Task


    • 如果Task實(shí)例為ShuffleMapTask:解析出RDD以及ShuffleDependency信息,調(diào)用RDD的compute()方法將結(jié)果寫Writer中(Writer這里不介紹,可以作為黑盒理解,比如寫入一個(gè)文件中),返回MapStatus對象

    • 如果Task實(shí)例為ResultTask:解析出RDD以及合并函數(shù)信息,調(diào)用函數(shù)將調(diào)用后的結(jié)果返回

    • 創(chuàng)建TaskContext以及CallerContext(與HDFS交互的上下文對象)

    • 執(zhí)行Task的runTask方法


  • TaskRunner將Task執(zhí)行的結(jié)果序列化,再次向DriverEndpoint發(fā)送任務(wù)最新狀態(tài)為FINISHED

 

二、Task的回饋流程

     TaskRunner執(zhí)行結(jié)束后,都將執(zhí)行狀態(tài)發(fā)送至DriverEndpoint,DriverEndpoint最終反饋指令CompletionEvent至DAGSchedulerEventProcessLoop中

     Android培訓(xùn),安卓培訓(xùn),手機(jī)開發(fā)培訓(xùn),移動開發(fā)培訓(xùn),云培訓(xùn)培訓(xùn)Android培訓(xùn),安卓培訓(xùn),手機(jī)開發(fā)培訓(xùn),移動開發(fā)培訓(xùn),云培訓(xùn)培訓(xùn)