摘要
本文結(jié)合實(shí)例詳細(xì)闡明了Spark數(shù)據(jù)傾斜的幾種場(chǎng)景以及對(duì)應(yīng)的解決方案,包括避免數(shù)據(jù)源傾斜,調(diào)整并行度,使用自定義Partitioner,使用Map側(cè)Join代替Reduce側(cè)Join,給傾斜Key加上隨機(jī)前綴等。
為何要處理數(shù)據(jù)傾斜(Data Skew)
什么是數(shù)據(jù)傾斜
對(duì)Spark/Hadoop這樣的大數(shù)據(jù)系統(tǒng)來(lái)講,數(shù)據(jù)量大并不可怕,可怕的是數(shù)據(jù)傾斜。
何謂數(shù)據(jù)傾斜?數(shù)據(jù)傾斜指的是,并行處理的數(shù)據(jù)集中,某一部分(如Spark或Kafka的一個(gè)Partition)的數(shù)據(jù)顯著多于其它部分,從而使得該部分的處理速度成為整個(gè)數(shù)據(jù)集處理的瓶頸。
數(shù)據(jù)傾斜是如何造成的
在Spark中,同一個(gè)Stage的不同Partition可以并行處理,而具有依賴(lài)關(guān)系的不同Stage之間是串行處理的。假設(shè)某個(gè)Spark Job分為Stage 0和Stage 1兩個(gè)Stage,且Stage 1依賴(lài)于Stage 0,那Stage 0完全處理結(jié)束之前不會(huì)處理Stage 1。而Stage 0可能包含N個(gè)Task,這N個(gè)Task可以并行進(jìn)行。如果其中N-1個(gè)Task都在10秒內(nèi)完成,而另外一個(gè)Task卻耗時(shí)1分鐘,那該Stage的總時(shí)間至少為1分鐘。換句話(huà)說(shuō),一個(gè)Stage所耗費(fèi)的時(shí)間,主要由最慢的那個(gè)Task決定。
由于同一個(gè)Stage內(nèi)的所有Task執(zhí)行相同的計(jì)算,在排除不同計(jì)算節(jié)點(diǎn)計(jì)算能力差異的前提下,不同Task之間耗時(shí)的差異主要由該Task所處理的數(shù)據(jù)量決定。
Stage的數(shù)據(jù)來(lái)源主要分為如下兩類(lèi)
從數(shù)據(jù)源直接讀取。如讀取HDFS,Kafka
讀取上一個(gè)Stage的Shuffle數(shù)據(jù)
如何緩解/消除數(shù)據(jù)傾斜
盡量避免數(shù)據(jù)源的數(shù)據(jù)傾斜
以Spark Stream通過(guò)DirectStream方式讀取Kafka數(shù)據(jù)為例。由于Kafka的每一個(gè)Partition對(duì)應(yīng)Spark的一個(gè)Task(Partition),所以Kafka內(nèi)相關(guān)Topic的各Partition之間數(shù)據(jù)是否平衡,直接決定Spark處理該數(shù)據(jù)時(shí)是否會(huì)產(chǎn)生數(shù)據(jù)傾斜。
如《Kafka設(shè)計(jì)解析(一)- Kafka背景及架構(gòu)介紹》一文所述,Kafka某一Topic內(nèi)消息在不同Partition之間的分布,主要由Producer端所使用的Partition實(shí)現(xiàn)類(lèi)決定。如果使用隨機(jī)Partitioner,則每條消息會(huì)隨機(jī)發(fā)送到一個(gè)Partition中,從而從概率上來(lái)講,各Partition間的數(shù)據(jù)會(huì)達(dá)到平衡。此時(shí)源Stage(直接讀取Kafka數(shù)據(jù)的Stage)不會(huì)產(chǎn)生數(shù)據(jù)傾斜。
但很多時(shí)候,業(yè)務(wù)場(chǎng)景可能會(huì)要求將具備同一特征的數(shù)據(jù)順序消費(fèi),此時(shí)就需要將具有相同特征的數(shù)據(jù)放于同一個(gè)Partition中。一個(gè)典型的場(chǎng)景是,需要將同一個(gè)用戶(hù)相關(guān)的PV信息置于同一個(gè)Partition中。此時(shí),如果產(chǎn)生了數(shù)據(jù)傾斜,則需要通過(guò)其它方式處理。