通過(guò)上一節(jié)內(nèi)容,DriverEndpoint最終生成多個(gè)可執(zhí)行的TaskDescription對(duì)象,并向各個(gè)ExecutorEndpoint發(fā)送LaunchTask指令,本節(jié)內(nèi)容將關(guān)注ExecutorEndpoint如何處理LaunchTask指令,處理完成后如何回饋給DriverEndpoint,以及整個(gè)job最終如何多次調(diào)度直至結(jié)束。
一、Task的執(zhí)行流程
承接上一節(jié)內(nèi)容,Executor接受LaunchTask指令后,開(kāi)啟一個(gè)新線程TaskRunner解析RDD,并調(diào)用RDD的compute方法,歸并函數(shù)得到最終任務(wù)執(zhí)行結(jié)果
ExecutorEndpoint接受到LaunchTask指令后,解碼出TaskDescription,調(diào)用Executor的launchTask方法
Executor創(chuàng)建一個(gè)TaskRunner線程,并啟動(dòng)線程,同時(shí)將改線程添加到Executor的成員對(duì)象中,代碼如下:
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] runningTasks.put(taskDescription.taskId, taskRunner)
TaskRunner
首先向DriverEndpoint發(fā)送任務(wù)最新?tīng)顟B(tài)為RUNNING
從TaskDescription解析出Task,并調(diào)用Task的run方法
Task
如果Task實(shí)例為ShuffleMapTask:解析出RDD以及ShuffleDependency信息,調(diào)用RDD的compute()方法將結(jié)果寫(xiě)Writer中(Writer這里不介紹,可以作為黑盒理解,比如寫(xiě)入一個(gè)文件中),返回MapStatus對(duì)象
如果Task實(shí)例為ResultTask:解析出RDD以及合并函數(shù)信息,調(diào)用函數(shù)將調(diào)用后的結(jié)果返回
創(chuàng)建TaskContext以及CallerContext(與HDFS交互的上下文對(duì)象)
執(zhí)行Task的runTask方法
TaskRunner將Task執(zhí)行的結(jié)果序列化,再次向DriverEndpoint發(fā)送任務(wù)最新?tīng)顟B(tài)為FINISHED
二、Task的回饋流程
TaskRunner執(zhí)行結(jié)束后,都將執(zhí)行狀態(tài)發(fā)送至DriverEndpoint,DriverEndpoint最終反饋指令CompletionEvent至DAGSchedulerEventProcessLoop中