通過上一節(jié)內(nèi)容,DriverEndpoint最終生成多個(gè)可執(zhí)行的TaskDescription對象,并向各個(gè)ExecutorEndpoint發(fā)送LaunchTask指令,本節(jié)內(nèi)容將關(guān)注ExecutorEndpoint如何處理LaunchTask指令,處理完成后如何回饋給DriverEndpoint,以及整個(gè)job最終如何多次調(diào)度直至結(jié)束。
一、Task的執(zhí)行流程
承接上一節(jié)內(nèi)容,Executor接受LaunchTask指令后,開啟一個(gè)新線程TaskRunner解析RDD,并調(diào)用RDD的compute方法,歸并函數(shù)得到最終任務(wù)執(zhí)行結(jié)果
ExecutorEndpoint接受到LaunchTask指令后,解碼出TaskDescription,調(diào)用Executor的launchTask方法
Executor創(chuàng)建一個(gè)TaskRunner線程,并啟動線程,同時(shí)將改線程添加到Executor的成員對象中,代碼如下:
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] runningTasks.put(taskDescription.taskId, taskRunner)
TaskRunner
首先向DriverEndpoint發(fā)送任務(wù)最新狀態(tài)為RUNNING
從TaskDescription解析出Task,并調(diào)用Task的run方法
Task
如果Task實(shí)例為ShuffleMapTask:解析出RDD以及ShuffleDependency信息,調(diào)用RDD的compute()方法將結(jié)果寫Writer中(Writer這里不介紹,可以作為黑盒理解,比如寫入一個(gè)文件中),返回MapStatus對象
如果Task實(shí)例為ResultTask:解析出RDD以及合并函數(shù)信息,調(diào)用函數(shù)將調(diào)用后的結(jié)果返回
創(chuàng)建TaskContext以及CallerContext(與HDFS交互的上下文對象)
執(zhí)行Task的runTask方法
TaskRunner將Task執(zhí)行的結(jié)果序列化,再次向DriverEndpoint發(fā)送任務(wù)最新狀態(tài)為FINISHED
二、Task的回饋流程
TaskRunner執(zhí)行結(jié)束后,都將執(zhí)行狀態(tài)發(fā)送至DriverEndpoint,DriverEndpoint最終反饋指令CompletionEvent至DAGSchedulerEventProcessLoop中
- ssh框架 2016-09-30
- 阿里移動安全 [無線安全]玩轉(zhuǎn)無線電——不安全的藍(lán)牙鎖 2017-07-26
- 消息隊(duì)列NetMQ 原理分析4-Socket、Session、Option和Pipe 2024-03-26
- Selective Search for Object Recognition 論文筆記【圖片目標(biāo)分割】 2017-07-26
- 詞向量-LRWE模型-更好地識別反義詞同義詞 2017-07-26
- 從棧不平衡問題 理解 calling convention 2017-07-26
- php imagemagick 處理 圖片剪切、壓縮、合并、插入文本、背景色透明 2017-07-26
- Swift實(shí)現(xiàn)JSON轉(zhuǎn)Model - HandyJSON使用講解 2017-07-26
- 阿里移動安全 Android端惡意鎖屏勒索應(yīng)用分析 2017-07-26
- 集合結(jié)合數(shù)據(jù)結(jié)構(gòu)來看看(二) 2017-07-26