摘要
本文結合實例詳細闡明了Spark數(shù)據(jù)傾斜的幾種場景以及對應的解決方案,包括避免數(shù)據(jù)源傾斜,調(diào)整并行度,使用自定義Partitioner,使用Map側Join代替Reduce側Join,給傾斜Key加上隨機前綴等。
為何要處理數(shù)據(jù)傾斜(Data Skew)
什么是數(shù)據(jù)傾斜
對Spark/Hadoop這樣的大數(shù)據(jù)系統(tǒng)來講,數(shù)據(jù)量大并不可怕,可怕的是數(shù)據(jù)傾斜。
何謂數(shù)據(jù)傾斜?數(shù)據(jù)傾斜指的是,并行處理的數(shù)據(jù)集中,某一部分(如Spark或Kafka的一個Partition)的數(shù)據(jù)顯著多于其它部分,從而使得該部分的處理速度成為整個數(shù)據(jù)集處理的瓶頸。
數(shù)據(jù)傾斜是如何造成的
在Spark中,同一個Stage的不同Partition可以并行處理,而具有依賴關系的不同Stage之間是串行處理的。假設某個Spark Job分為Stage 0和Stage 1兩個Stage,且Stage 1依賴于Stage 0,那Stage 0完全處理結束之前不會處理Stage 1。而Stage 0可能包含N個Task,這N個Task可以并行進行。如果其中N-1個Task都在10秒內(nèi)完成,而另外一個Task卻耗時1分鐘,那該Stage的總時間至少為1分鐘。換句話說,一個Stage所耗費的時間,主要由最慢的那個Task決定。
由于同一個Stage內(nèi)的所有Task執(zhí)行相同的計算,在排除不同計算節(jié)點計算能力差異的前提下,不同Task之間耗時的差異主要由該Task所處理的數(shù)據(jù)量決定。
Stage的數(shù)據(jù)來源主要分為如下兩類
從數(shù)據(jù)源直接讀取。如讀取HDFS,Kafka
讀取上一個Stage的Shuffle數(shù)據(jù)
如何緩解/消除數(shù)據(jù)傾斜
盡量避免數(shù)據(jù)源的數(shù)據(jù)傾斜
以Spark Stream通過DirectStream方式讀取Kafka數(shù)據(jù)為例。由于Kafka的每一個Partition對應Spark的一個Task(Partition),所以Kafka內(nèi)相關Topic的各Partition之間數(shù)據(jù)是否平衡,直接決定Spark處理該數(shù)據(jù)時是否會產(chǎn)生數(shù)據(jù)傾斜。
如《Kafka設計解析(一)- Kafka背景及架構介紹》一文所述,Kafka某一Topic內(nèi)消息在不同Partition之間的分布,主要由Producer端所使用的Partition實現(xiàn)類決定。如果使用隨機Partitioner,則每條消息會隨機發(fā)送到一個Partition中,從而從概率上來講,各Partition間的數(shù)據(jù)會達到平衡。此時源Stage(直接讀取Kafka數(shù)據(jù)的Stage)不會產(chǎn)生數(shù)據(jù)傾斜。
但很多時候,業(yè)務場景可能會要求將具備同一特征的數(shù)據(jù)順序消費,此時就需要將具有相同特征的數(shù)據(jù)放于同一個Partition中。一個典型的場景是,需要將同一個用戶相關的PV信息置于同一個Partition中。此時,如果產(chǎn)生了數(shù)據(jù)傾斜,則需要通過其它方式處理。