• Spark踩坑記——初試

  • Spark踩坑記——數(shù)據(jù)庫(kù)(Hbase+Mysql)

  • Spark踩坑記——Spark Streaming+kafka應(yīng)用及調(diào)優(yōu)
    在前面總結(jié)的幾篇spark踩坑博文中,我總結(jié)了自己在使用spark過(guò)程當(dāng)中踩過(guò)的一些坑和經(jīng)驗(yàn)。我們知道Spark是多機(jī)器集群部署的,分為Driver/Master/Worker,Master負(fù)責(zé)資源調(diào)度,Worker是不同的運(yùn)算節(jié)點(diǎn),由Master統(tǒng)一調(diào)度,而Driver是我們提交Spark程序的節(jié)點(diǎn),并且所有的reduce類型的操作都會(huì)匯總到Driver節(jié)點(diǎn)進(jìn)行整合。節(jié)點(diǎn)之間會(huì)將map/reduce等操作函數(shù)傳遞一個(gè)獨(dú)立副本到每一個(gè)節(jié)點(diǎn),這些變量也會(huì)復(fù)制到每臺(tái)機(jī)器上,而節(jié)點(diǎn)之間的運(yùn)算是相互獨(dú)立的,變量的更新并不會(huì)傳遞回Driver程序。那么有個(gè)問(wèn)題,如果我們想在節(jié)點(diǎn)之間共享一份變量,比如一份公共的配置項(xiàng),該怎么辦呢?Spark為我們提供了兩種特定的共享變量,來(lái)完成節(jié)點(diǎn)間變量的共享。
    本文首先簡(jiǎn)單的介紹spark以及spark streaming中累加器和廣播變量的使用方式,然后重點(diǎn)介紹一下如何更新廣播變量。

累加器

顧名思義,累加器是一種只能通過(guò)關(guān)聯(lián)操作進(jìn)行“加”操作的變量,因此它能夠高效的應(yīng)用于并行操作中。它們能夠用來(lái)實(shí)現(xiàn)counters和sums。Spark原生支持?jǐn)?shù)值類型的累加器,開發(fā)者可以自己添加支持的類型,在2.0.0之前的版本中,通過(guò)繼承AccumulatorParam來(lái)實(shí)現(xiàn),而2.0.0之后的版本需要繼承AccumulatorV2來(lái)實(shí)現(xiàn)自定義類型的累加器。
如果創(chuàng)建了一個(gè)具名的累加器,它可以在spark的UI中顯示。這對(duì)于理解運(yùn)行階段(running stages)的過(guò)程有很重要的作用。如下圖:
萬(wàn)碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)
在2.0.0之前版本中,累加器的聲明使用方式如下:

scala> val accum = sc.accumulator(0, "My Accumulator")accum: spark.Accumulator[Int] = 0scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.valueres2: Int = 10

累加器的聲明在2.0.0發(fā)生了變化,到2.1.0也有所變化,具體可以參考官方文檔,我們這里以2.1.0為例將代碼貼一下:

scala> val accum = sc.longAccumulator("My Accumulator")accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc