1.1 如何啟動AM并申請資源

1.1.1 如何啟動AM

Android培訓(xùn),安卓培訓(xùn),手機開發(fā)培訓(xùn),移動開發(fā)培訓(xùn),云培訓(xùn)培訓(xùn)

val yarnClient = YarnClient.createYarnClient
setupCredentials()
yarnClient.init(yarnConf)
yarnClient.start()// Get a new application from our RMval newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId() 
// Set up the appropriate contexts to launch our AMval containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext) 
// Finally, submit and monitor the applicationlogInfo(s"Submitting application $appId to ResourceManager")
yarnClient.submitApplication(appContext)

Android培訓(xùn),安卓培訓(xùn),手機開發(fā)培訓(xùn),移動開發(fā)培訓(xùn),云培訓(xùn)培訓(xùn)

1.1.2 FairScheduler如何處理AM的ResourceRequest

1、FairScheduler接收到SchedulerEventType.APP_ADDED之后,調(diào)用addApplication方法把把RMApp添加到隊列里面,結(jié)束之后發(fā)送RMAppEventType.APP_ACCEPTED給RMApp

2、RMApp啟動RMAttempt之后,發(fā)送SchedulerEventType.APP_ATTEMPT_ADDED給FairScheduler

LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user: " + user);

3、FairScheduler調(diào)用addApplicationAttempt方法,發(fā)送RMAppAttemptEventType.ATTEMPT_ADDED事件給RMAppAttempt,RMAppAttempt隨后調(diào)用Scheduler的allocate方法發(fā)送AM的ResourceRequest

4、FairScheduler在allocate方法里面對該請求進(jìn)行處理,F(xiàn)airScheduler對于AM的資源請求的優(yōu)先級上并沒有特殊的照顧,詳細(xì)請看章節(jié)2 如何分配資源

1.2 AM啟動之后如何申請資源

1.2.1、注冊AM

amClient = AMRMClient.createAMRMClient()
amClient.init(conf)
amClient.start()
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)

1.2.2、發(fā)送資源請求

Android培訓(xùn),安卓培訓(xùn),手機開發(fā)培訓(xùn),移動開發(fā)培訓(xùn),云培訓(xùn)培訓(xùn)

// 1.創(chuàng)建資源請求amClient.addContainerRequest(request)// 2.發(fā)送資源請求val allocateResponse = amClient.allocate(progressIndicator)
val allocatedContainers = allocateResponse.getAllocatedContainers()if (allocatedContainers.size > 0) {  // 3.請求返回之后處理Container  handleAllocatedContainers(allocatedContainers.asScala)
}

Android培訓(xùn),安卓培訓(xùn),手機開發(fā)培訓(xùn),移動開發(fā)培訓(xùn),云培訓(xùn)培訓(xùn)

1.2.3、啟動Container

Android培訓(xùn),安卓培訓(xùn),手機開發(fā)培訓(xùn),移動開發(fā)培訓(xùn),云培訓(xùn)培訓(xùn) View Code

2、如何分配資源

2.1 接受資源請求步驟

在FairScheduler的allocate方法里面僅僅是記錄ResourceRequest,并不會真正的立馬分配。

流程如下:

1、檢查該APP是否注冊過

2、檢查資源的請求是否超過最大內(nèi)存和最大CPU的限制

3、記錄資源請求的時間,最后container分配的延遲會體現(xiàn)在隊列metrics的appAttemptFirstContainerAllocationDelay當(dāng)中

4、釋放AM發(fā)過來的已經(jīng)不需要的資源,主要邏輯在FSAppAttempt的containerCompleted方法里

5、更新資源請求,所有資源請求都是記錄在AppSchedulingInfo當(dāng)中的requests(注意:只有是ANY的資源請求才會被立馬更新到QueueMetrics的PendingResources里)

6、找出該APP被標(biāo)記為搶占的container ID列表preemptionContainerIds

7、更新APP的黑名單列表,該信息被記錄在AppSchedulingInfo當(dāng)中

8、從FSAppAttempt的newlyAllocatedContainers當(dāng)中獲取最新被分配的container

9、返回preemptionContainerIds、HeadRoom、ContainerList、NMTokenList。(注:Headroom = Math.min(Math.min(queueFairShare - queueUsage, 0), maxAvailableResource)

2.2 請求和分配的關(guān)系

Android培訓(xùn),安卓培訓(xùn),手機開發(fā)培訓(xùn),移動開發(fā)培訓(xùn),云培訓(xùn)培訓(xùn)

 請求和分配的過程是異步的,關(guān)系如上圖,每次調(diào)用allocate獲得的container,其實是之前的請求被分配的結(jié)果

2.3 如何分配

2.3.1 分配方式

分配有兩種方式:

1、接收到NodeManager的心跳的時候進(jìn)行分配

NodeManager每隔一秒(yarn.resourcemanager.nodemanagers.heartbeat-interval-ms)給ResourceManager發(fā)送一個心跳事件NODE_UPDATE,接收到心跳事件之后,在FairScheduler的nodeUpdate方法里進(jìn)行處理。

NodeManager會匯報新啟動的Container列表newlyLaunchedContainers和已經(jīng)結(jié)束的Container列表completedContainers。然后在attemptScheduling方法里面進(jìn)行分配。

 

2、持續(xù)調(diào)度方式

它有一個單獨的線程,線程名稱是FairSchedulerContinuousScheduling,每5毫秒對所有節(jié)點的資源進(jìn)行排序,然后遍歷所有節(jié)點,調(diào)用attemptScheduling方法進(jìn)行分配。

開啟持續(xù)調(diào)度模式之后,在接收到心跳事件NODE_UPDATE的時候,只有在completedContainers不為空的情況下,才會進(jìn)行調(diào)度

 

attemptScheduling首先會檢查是否有資源預(yù)留,如果有預(yù)留,則直接為預(yù)留的APP分配container

沒有預(yù)留的分配過程如下:

1、最大可分配資源為這臺機器的可用資源的一半,從root隊列開始自上而下進(jìn)行分配Resource assignment = queueMgr.getRootQueue().assignContainer(node);

2、分配到一個Container之后,判斷是否要連續(xù)分配多個,最大支持連續(xù)分配多少個?

以下是涉及到的各個參數(shù)以及參數(shù)的默認(rèn)值:

yarn.scheduler.fair.assignmultiple false (建議設(shè)置為true)

yarn.scheduler.fair.dynamic.max.assign true (hadoop2.7之后就沒有這個參數(shù)了)

yarn.scheduler.fair.max.assign -1 (建議設(shè)置為2~3,不要設(shè)置得太多,否則會有調(diào)度傾斜的問題)

 

2.3.2 如何從隊列當(dāng)中選出APP進(jìn)行資源分配

入口在queueMgr.getRootQueue().assignContainer(node);

1、檢查當(dāng)前隊列的使用量是否小于最大資源量

2、首先對子隊列進(jìn)行排序,優(yōu)先順序請參照章節(jié) 2.3.4 如何確定優(yōu)先順序

3、排序完再調(diào)用子隊列的assignContainer方法分配container

4、一直遞歸到葉子隊列

葉子隊列如何進(jìn)行分配?

1、先對runnableApps進(jìn)行排序,排序完成之后,for循環(huán)遍歷一下

2、先檢查該Node是否在APP的黑名單當(dāng)中

3、檢查該隊列是否可以運行該APP的AM,主要是檢查是否超過了maxAMShare(根據(jù)amRunning字段判斷是否已經(jīng)啟動了AM了)

檢查邏輯的偽代碼如下:

Android培訓(xùn),安卓培訓(xùn),手機開發(fā)培訓(xùn),移動開發(fā)培訓(xùn),云培訓(xùn)培訓(xùn) View Code

4、給該APP分配container

 

下面以一個例子來說明分配的過程是如何選擇隊列的:

假設(shè)隊列的結(jié)構(gòu)是這樣子的

root

---->BU_1

-------->A

-------->B

---->BU_2

-------->C

-------->D

Android培訓(xùn),安卓培訓(xùn),手機開發(fā)培訓(xùn),移動開發(fā)培訓(xùn),云培訓(xùn)培訓(xùn)

 

2.3.3 任務(wù)分配Container的本地性

任務(wù)分配Container的時候會考慮請求的本地性,對于調(diào)度器來說,它的本地性分為三種:NODE_LOCAL, RACK_LOCAL, OFF_SWITCH

具體方法位于FSAppAttempt的assignContainer方法

遍歷優(yōu)先級

    給該優(yōu)先級的調(diào)度機會+1

    獲取RackLocal和NodeLocal的任務(wù)

    計算允許分配的本地性級別allowedLocality,默認(rèn)是NODE_LOCAL

        1、心跳分配方式

        計算調(diào)度機會,如果該優(yōu)先級的任務(wù)的調(diào)度機會超過了(節(jié)點數(shù) * NODE_LOCAL閾值),降級為RACK_LOCAL,如果該優(yōu)先級的任務(wù)的調(diào)度機會超過了(節(jié)點數(shù) * RACK_LOCAL閾值),降級為OFF_SWITCH

        2、連續(xù)分配方式

        計算等待時間waitTime -= lastScheduledContainer.get(priority);

        如果waitTime超過了NODE_LOCAL允許的delay時間,就降級為RACK_LOCAL,再超過RACK_LOCAL允許的delay的時間,就降級為OFF_SWITCH

  分配NODE_LOCAL的container

  允許分配的本地性級別>=RACK_LOCAL,分配RACK_LOCAL的container

  允許分配的本地性級別=OFF_SWITCH,分配OFF_SWITCH的container

  都分不到,等待下一次機會

 

相關(guān)參數(shù):

默認(rèn)值全是-1,則允許的本地性級別是OFF_SWITCH

yarn.scheduler.fair.locality-delay-node-ms -1

yarn.scheduler.fair.locality-delay-rack-ms -1

yarn.scheduler.fair.locality.threshold.node -1

yarn.scheduler.fair.locality.threshold.rack -1

 

2.3.4 Container分配

1、檢查該節(jié)點的資源是否足夠,如果資源充足

 2、如果當(dāng)前的allowedLocality比實際分配的本地性低,則重置allowedLocality

3、把新分配的Container加到newlyAllocatedContainers和liveContainers列表中

4、把分配的container信息同步到appSchedulingInfo當(dāng)中

5、發(fā)送RMContainerEventType.START事件

6、更新FSSchedulerNode記錄的container信息

7、如果被分配的是AM,則設(shè)置amRunning為true

 

如果資源不夠,則檢查是否可以預(yù)留資源

條件:

1)Container的資源請求必須小于Scheduler的增量分配內(nèi)存 * 倍數(shù)(默認(rèn)應(yīng)該是2g)

2)如果已經(jīng)存在的預(yù)留數(shù) < 本地性對應(yīng)的可用節(jié)點 * 預(yù)留比例

3)一個節(jié)點只允許同時為一個APP預(yù)留資源

 

相關(guān)參數(shù):

yarn.scheduler.increment-allocation-mb 1024

yarn.scheduler.increment-allocation-vcores 1

yarn.scheduler.reservation-threshold.increment-multiple 2

yarn.scheduler.fair.reservable-nodes 0.05

 

2.3.4 如何確定優(yōu)先順序

該比較規(guī)則同時適用于隊列和APP,詳細(xì)代碼位于FairSharePolicy當(dāng)中

MinShare = Math.min(getMinShare(), getDemand())

1、(當(dāng)前資源使用量 / MinShare)的比值越小,優(yōu)先級越高

2、如果雙方資源使用量都超過MinShare,則(當(dāng)前資源使用量 / 權(quán)重)的比值越小,優(yōu)先級越高

3、啟動時間越早,優(yōu)先級越高

4、最后實在比不出來,就比名字...

 

從上面分配的規(guī)則當(dāng)中能看出來MinShare是非常重要的一個指標(biāo),當(dāng)資源使用量沒有超過MinShare之前,隊列在分配的時候就會比較優(yōu)先,切記一定要設(shè)置啊!

 

 

注:getMinShare()是FairScheduler當(dāng)中隊列的minResources

<minResources>6887116 mb,4491 vcores</minResources>

 

http://www.cnblogs.com/cenyuhai/p/7183243.html