mapTask并行度的決定機(jī)制

  一個(gè)job的map階段并行度由客戶端在提交job時(shí)決定,而客戶端對(duì)map階段并行度的規(guī)劃的基本邏輯為:將待處理數(shù)據(jù)執(zhí)行邏輯切片(即按照一個(gè)特定切片大小,將待處理數(shù)據(jù)劃分成邏輯上的多個(gè)split),然后每一個(gè)split分配一個(gè)mapTask并行實(shí)例處理。

FileInputFormat切片機(jī)制

原文和作者一起討論:http://www.cnblogs.com/intsmaze/p/6733968.html

微信:intsmaze(非誠(chéng)勿擾)

萬(wàn)碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開(kāi)發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

1、默認(rèn)切片定義在InputFormat類(lèi)中的getSplit()方法

萬(wàn)碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開(kāi)發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

2、FileInputFormat中默認(rèn)的切片機(jī)制:

a) 簡(jiǎn)單地按照文件的內(nèi)容長(zhǎng)度進(jìn)行切片

b) 切片大小,默認(rèn)等于hdfs的block大小

c) 切片時(shí)不考慮數(shù)據(jù)集整體,而是逐個(gè)針對(duì)每一個(gè)文件單獨(dú)切片

比如待處理數(shù)據(jù)有兩個(gè)文件:

file1.txt    260M
file2.txt    10M

經(jīng)過(guò)FileInputFormat的切片機(jī)制運(yùn)算后,形成的切片信息如下:  

file1.txt.split1--  0~128file1.txt.split2--  128~260 //如果剩余的文件長(zhǎng)度/切片長(zhǎng)度<=1.1則會(huì)將剩余文件的長(zhǎng)度并未一個(gè)切片file2.txt.split1--  0~10M

3、FileInputFormat中切片的大小的參數(shù)配置

通過(guò)分析源碼,在FileInputFormat中,計(jì)算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由這幾個(gè)值來(lái)運(yùn)算決定。

萬(wàn)碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開(kāi)發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

minsize:默認(rèn)值:1

萬(wàn)碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開(kāi)發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

因此,默認(rèn)情況下,Math.max(minSize, Math.min(maxSize, blockSize));切片大小=blocksize

maxsize(切片最大值):參數(shù)如果調(diào)得比blocksize小,則會(huì)讓切片變小。

minsize(切片最小值):參數(shù)調(diào)的比blockSize大,則可以讓切片變得比blocksize還大。

選擇并發(fā)數(shù)的影響因素:

1、運(yùn)算節(jié)點(diǎn)的硬件配置

2、運(yùn)算任務(wù)的類(lèi)型:CPU密集型還是IO密集型

3、運(yùn)算任務(wù)的數(shù)據(jù)量

3、hadoop2.6.4源碼解析

org.apache.hadoop.mapreduce.JobSubmitter類(lèi)

萬(wàn)碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開(kāi)發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

   //得到j(luò)ob的map任務(wù)的并行數(shù)量
   private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    JobConf jConf = (JobConf)job.getConfiguration();    int maps;    if (jConf.getUseNewMapper()) {      maps = writeNewSplits(job, jobSubmitDir);
    } else {
      maps = writeOldSplits(jConf, jobSubmitDir);
    }    return maps;
  }
  
  @SuppressWarnings("unchecked")  private <T extends InputSplit>  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input =
     ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
       List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);    // sort the splits into order based on size, so that the biggest    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);    return array.length;
  }

萬(wàn)碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開(kāi)發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

 

切片計(jì)算邏輯,關(guān)注紅色字體代碼即可。

萬(wàn)碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開(kāi)發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

 List<InputSplit> getSplits(JobContext job) = 
    List<InputSplit> splits =  ArrayList<InputSplit>= length = (length != 0 (file === fs.getFileBlockLocations(file, 0 bytesRemaining = blkIndex = getBlockIndex(blkLocations, length---= (bytesRemaining != 0 blkIndex = getBlockIndex(blkLocations, length-- { 
          splits.add(makeSplit(path, 0, length, blkLocations[00
        splits.add(makeSplit(path, 0, length,  String[0"Total # of splits generated by getSplits: " ++ ", TimeTaken: " +

萬(wàn)碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開(kāi)發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

 

萬(wàn)碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開(kāi)發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

 public static final String SPLIT_MINSIZE = 
    "mapreduce.input.fileinputformat.split.minsize";  
  public static final String SPLIT_MAXSIZE = 
    "mapreduce.input.fileinputformat.split.maxsize";    
  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));    
  //保證切分的文件長(zhǎng)度最小不得小于1字節(jié)
  protected long getFormatMinSplitSize() {    return 1;
  }  
  //如果沒(méi)有在conf中設(shè)置SPLIT_MINSIZE參數(shù),則取默認(rèn)值1字節(jié)。
  public static long getMinSplitSize(JobContext job) {    return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
  }  
  //得到切片文件的最大長(zhǎng)度
  long maxSize = getMaxSplitSize(job);  
  //如果沒(méi)有在conf中設(shè)置SPLIT_MAXSIZE參數(shù),則去默認(rèn)值Long.MAX_VALUE字節(jié)。
  public static long getMaxSplitSize(JobContext context) {    return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
                                              Long.MAX_VALUE);
  }  
   //讀取指定目錄下的所有文件的信息
   List<FileStatus> files = listStatus(job);   //如果沒(méi)有指定開(kāi)啟幾個(gè)線程讀取,則默認(rèn)一個(gè)線程去讀文件信息,因?yàn)榇嬖谀夸浵掠猩蟽|個(gè)文件的情況,所以有需要開(kāi)啟多個(gè)線程加快讀取。
   int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
        DEFAULT_LIST_STATUS_NUM_THREADS);   public static final String LIST_STATUS_NUM_THREADS =
      "mapreduce.input.fileinputformat.list-status.num-threads";   public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;  
  //計(jì)算切片文件的邏輯大小
  long splitSize = computeSplitSize(blockSize, minSize, maxSize);  protected long computeSplitSize(long blockSize, long minSize,                                  long maxSize) {    return Math.max(minSize, Math.min(maxSize, blockSize));
  }  
  private static final double SPLIT_SLOP = 1.1;   // 10% slop  //判斷剩余文件與切片大小的比是否為1.1.
  while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
          splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                      blkLocations[blkIndex].getHosts(),
                      blkLocations[blkIndex].getCachedHosts()));
          bytesRemaining -= splitSize;
    }

萬(wàn)碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開(kāi)發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

map并行度

  如果job的每個(gè)map或者reduce的task的運(yùn)行時(shí)間都只有30-40秒鐘(最好每個(gè)map的執(zhí)行時(shí)間最少不低于一分鐘),那么就減少該job的map或者reduce數(shù)。每一個(gè)task的啟動(dòng)和加入到調(diào)度器中進(jìn)行調(diào)度,這個(gè)中間的過(guò)程可能都要花費(fèi)幾秒鐘,所以如果每個(gè)task都非??炀团芡炅?,就會(huì)在task的開(kāi)始和結(jié)束的時(shí)候浪費(fèi)太多的時(shí)間。

  配置task的JVM重用可以改善該問(wèn)題:

 ?。╩apred.job.reuse.jvm.num.tasks,默認(rèn)是1,表示一個(gè)JVM上最多可以順序執(zhí)行的task數(shù)目(屬于同一個(gè)Job)是1。也就是說(shuō)一個(gè)task啟一個(gè)JVM)。

小文件的場(chǎng)景下,默認(rèn)的切片機(jī)制會(huì)造成大量的maptask處理很少量的數(shù)據(jù),效率低下:

解決方案:

  推薦:把小文件存入hdfs之前進(jìn)行預(yù)處理,先合并為大文件后再上傳。

  折中:寫(xiě)程序?qū)dfs上小文件進(jìn)行合并再跑job處理。

  補(bǔ)救措施:如果大量的小文件已經(jīng)存在hdfs上了,使用combineInputFormate組件,它可以將眾多的小文件從邏輯上規(guī)劃到一個(gè)切片中,這樣多個(gè)小文件就可以交給一個(gè)maptask操作了。

   最近實(shí)在是不知道學(xué)點(diǎn)什么了呦,就把hadoop回顧一下,當(dāng)初學(xué)時(shí),為了快速上手,都是記各種理論以及結(jié)論,沒(méi)有時(shí)間去看源碼驗(yàn)證,也不知道人家說(shuō)的結(jié)論是否正確,這次回滾就是看源碼驗(yàn)證當(dāng)初結(jié)論的正確性。這也快一年沒(méi)有用了,最近一直從事分布式實(shí)時(shí)計(jì)算的研究。

老鐵,你的--->推薦,--->關(guān)注,--->評(píng)論--->是我繼續(xù)寫(xiě)作的動(dòng)力。 作者:劉洋 合作微信號(hào):intsmaze 本文版權(quán)歸作者和博客園共有,歡迎轉(zhuǎn)載,但未經(jīng)作者同意必須保留此段聲明,且在文章頁(yè)面明顯位置給出原文連接,否則保留追究法律責(zé)任的權(quán)利。

分類(lèi): mapreduce

http://www.cnblogs.com/intsmaze/p/6733968.html