mapTask并行度的決定機(jī)制
一個(gè)job的map階段并行度由客戶端在提交job時(shí)決定,而客戶端對(duì)map階段并行度的規(guī)劃的基本邏輯為:將待處理數(shù)據(jù)執(zhí)行邏輯切片(即按照一個(gè)特定切片大小,將待處理數(shù)據(jù)劃分成邏輯上的多個(gè)split),然后每一個(gè)split分配一個(gè)mapTask并行實(shí)例處理。
FileInputFormat切片機(jī)制
原文和作者一起討論:http://www.cnblogs.com/intsmaze/p/6733968.html
微信:intsmaze(非誠(chéng)勿擾)
1、默認(rèn)切片定義在InputFormat類(lèi)中的getSplit()方法
2、FileInputFormat中默認(rèn)的切片機(jī)制:
a) 簡(jiǎn)單地按照文件的內(nèi)容長(zhǎng)度進(jìn)行切片
b) 切片大小,默認(rèn)等于hdfs的block大小
c) 切片時(shí)不考慮數(shù)據(jù)集整體,而是逐個(gè)針對(duì)每一個(gè)文件單獨(dú)切片
比如待處理數(shù)據(jù)有兩個(gè)文件:
file1.txt 260M file2.txt 10M
經(jīng)過(guò)FileInputFormat的切片機(jī)制運(yùn)算后,形成的切片信息如下:
file1.txt.split1-- 0~128file1.txt.split2-- 128~260 //如果剩余的文件長(zhǎng)度/切片長(zhǎng)度<=1.1則會(huì)將剩余文件的長(zhǎng)度并未一個(gè)切片file2.txt.split1-- 0~10M
3、FileInputFormat中切片的大小的參數(shù)配置
通過(guò)分析源碼,在FileInputFormat中,計(jì)算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由這幾個(gè)值來(lái)運(yùn)算決定。
minsize:默認(rèn)值:1
因此,默認(rèn)情況下,Math.max(minSize, Math.min(maxSize, blockSize));切片大小=blocksize
maxsize(切片最大值):參數(shù)如果調(diào)得比blocksize小,則會(huì)讓切片變小。
minsize(切片最小值):參數(shù)調(diào)的比blockSize大,則可以讓切片變得比blocksize還大。
選擇并發(fā)數(shù)的影響因素:
1、運(yùn)算節(jié)點(diǎn)的硬件配置
2、運(yùn)算任務(wù)的類(lèi)型:CPU密集型還是IO密集型
3、運(yùn)算任務(wù)的數(shù)據(jù)量
3、hadoop2.6.4源碼解析
org.apache.hadoop.mapreduce.JobSubmitter類(lèi)
//得到j(luò)ob的map任務(wù)的并行數(shù)量 private int writeSplits(org.apache.hadoop.mapreduce.JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { JobConf jConf = (JobConf)job.getConfiguration(); int maps; if (jConf.getUseNewMapper()) { maps = writeNewSplits(job, jobSubmitDir); } else { maps = writeOldSplits(jConf, jobSubmitDir); } return maps; } @SuppressWarnings("unchecked") private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = job.getConfiguration(); InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); List<InputSplit> splits = input.getSplits(job); T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); // sort the splits into order based on size, so that the biggest // go first Arrays.sort(array, new SplitComparator()); JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); return array.length; }
切片計(jì)算邏輯,關(guān)注紅色字體代碼即可。
List<InputSplit> getSplits(JobContext job) = List<InputSplit> splits = ArrayList<InputSplit>= length = (length != 0 (file === fs.getFileBlockLocations(file, 0 bytesRemaining = blkIndex = getBlockIndex(blkLocations, length---= (bytesRemaining != 0 blkIndex = getBlockIndex(blkLocations, length-- { splits.add(makeSplit(path, 0, length, blkLocations[00 splits.add(makeSplit(path, 0, length, String[0"Total # of splits generated by getSplits: " ++ ", TimeTaken: " +
public static final String SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize"; public static final String SPLIT_MAXSIZE = "mapreduce.input.fileinputformat.split.maxsize"; long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); //保證切分的文件長(zhǎng)度最小不得小于1字節(jié) protected long getFormatMinSplitSize() { return 1; } //如果沒(méi)有在conf中設(shè)置SPLIT_MINSIZE參數(shù),則取默認(rèn)值1字節(jié)。 public static long getMinSplitSize(JobContext job) { return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L); } //得到切片文件的最大長(zhǎng)度 long maxSize = getMaxSplitSize(job); //如果沒(méi)有在conf中設(shè)置SPLIT_MAXSIZE參數(shù),則去默認(rèn)值Long.MAX_VALUE字節(jié)。 public static long getMaxSplitSize(JobContext context) { return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE); } //讀取指定目錄下的所有文件的信息 List<FileStatus> files = listStatus(job); //如果沒(méi)有指定開(kāi)啟幾個(gè)線程讀取,則默認(rèn)一個(gè)線程去讀文件信息,因?yàn)榇嬖谀夸浵掠猩蟽|個(gè)文件的情況,所以有需要開(kāi)啟多個(gè)線程加快讀取。 int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS, DEFAULT_LIST_STATUS_NUM_THREADS); public static final String LIST_STATUS_NUM_THREADS = "mapreduce.input.fileinputformat.list-status.num-threads"; public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1; //計(jì)算切片文件的邏輯大小 long splitSize = computeSplitSize(blockSize, minSize, maxSize); protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); } private static final double SPLIT_SLOP = 1.1; // 10% slop //判斷剩余文件與切片大小的比是否為1.1. while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; }
map并行度
如果job的每個(gè)map或者reduce的task的運(yùn)行時(shí)間都只有30-40秒鐘(最好每個(gè)map的執(zhí)行時(shí)間最少不低于一分鐘),那么就減少該job的map或者reduce數(shù)。每一個(gè)task的啟動(dòng)和加入到調(diào)度器中進(jìn)行調(diào)度,這個(gè)中間的過(guò)程可能都要花費(fèi)幾秒鐘,所以如果每個(gè)task都非??炀团芡炅?,就會(huì)在task的開(kāi)始和結(jié)束的時(shí)候浪費(fèi)太多的時(shí)間。
配置task的JVM重用可以改善該問(wèn)題:
?。╩apred.job.reuse.jvm.num.tasks,默認(rèn)是1,表示一個(gè)JVM上最多可以順序執(zhí)行的task數(shù)目(屬于同一個(gè)Job)是1。也就是說(shuō)一個(gè)task啟一個(gè)JVM)。
小文件的場(chǎng)景下,默認(rèn)的切片機(jī)制會(huì)造成大量的maptask處理很少量的數(shù)據(jù),效率低下:
解決方案:
推薦:把小文件存入hdfs之前進(jìn)行預(yù)處理,先合并為大文件后再上傳。
折中:寫(xiě)程序?qū)dfs上小文件進(jìn)行合并再跑job處理。
補(bǔ)救措施:如果大量的小文件已經(jīng)存在hdfs上了,使用combineInputFormate組件,它可以將眾多的小文件從邏輯上規(guī)劃到一個(gè)切片中,這樣多個(gè)小文件就可以交給一個(gè)maptask操作了。
最近實(shí)在是不知道學(xué)點(diǎn)什么了呦,就把hadoop回顧一下,當(dāng)初學(xué)時(shí),為了快速上手,都是記各種理論以及結(jié)論,沒(méi)有時(shí)間去看源碼驗(yàn)證,也不知道人家說(shuō)的結(jié)論是否正確,這次回滾就是看源碼驗(yàn)證當(dāng)初結(jié)論的正確性。這也快一年沒(méi)有用了,最近一直從事分布式實(shí)時(shí)計(jì)算的研究。
老鐵,你的--->推薦,--->關(guān)注,--->評(píng)論--->是我繼續(xù)寫(xiě)作的動(dòng)力。 作者:劉洋 合作微信號(hào):intsmaze 本文版權(quán)歸作者和博客園共有,歡迎轉(zhuǎn)載,但未經(jīng)作者同意必須保留此段聲明,且在文章頁(yè)面明顯位置給出原文連接,否則保留追究法律責(zé)任的權(quán)利。
分類(lèi): mapreduce
http://www.cnblogs.com/intsmaze/p/6733968.html