Spark踩坑記——共享變量
前言
在前面總結的幾篇spark踩坑博文中,我總結了自己在使用spark過程當中踩過的一些坑和經驗。我們知道Spark是多機器集群部署的,分為Driver/Master/Worker,Master負責資源調度,Worker是不同的運算節點,由Master統一調度,而Driver是我們提交Spark程序的節點,并且所有的reduce類型的操作都會匯總到Driver節點進行整合。節點之間會將map/reduce等操作函數傳遞一個獨立副本到每一個節點,這些變量也會復制到每臺機器上,而節點之間的運算是相互獨立的,變量的更新并不會傳遞回Driver程序。那么有個問題,如果我們想在節點之間共享一份變量,比如一份公共的配置項,該怎么辦呢?Spark為我們提供了兩種特定的共享變量,來完成節點間變量的共享。
本文(wen)首先簡單的介紹spark以及spark streaming中累加器和廣播變量的使用方式,然后重點(dian)介紹一下(xia)如何更新廣播變量。
累加器
顧名思義,累加器是一種只能通過關聯操作進行“加”操作的變量,因此它能夠高效的應用于并行操作中。它們能夠用來實現counters和sums。Spark原生支持數值類型的累加器,開發者可以自己添加支持的類型,在2.0.0之前的版本中,通過繼承AccumulatorParam來實現,而2.0.0之后的版本需要繼承AccumulatorV2來實現自定義類型的累加器。
如果創建了一個具名的累加器,它可以在spark的UI中顯示。這對于理解運行階段(running stages)的過程有很重要的作用。如下圖:

在2.0.0之前版本中,累(lei)加器的聲明使用(yong)方式如下:
scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10
累加器(qi)的聲明在2.0.0發生(sheng)了變(bian)化,到2.1.0也有所變(bian)化,具體(ti)可以參考(kao)官(guan)方文檔,我們這里以2.1.0為例將代碼貼一下:
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10
廣播變量
累加器比較簡單直觀,如果我們需要在spark中進行一些全局統計就可以使用它。但是有時候僅僅一個累加器并不能滿足我們的需求,比如數據庫中一份公共配置表格,需要同步給各個節點進行查詢。OK先來簡單介紹下spark中的廣播變量:
廣播變量允許程序員緩存一個只讀的變量在每臺機器上面,而不是每個任務保存一份拷貝。例如,利用廣播變量,我們能夠以一種更有效率的方式將一個大數據量輸入集合的副本分配給每個節點。Spark也嘗試著利用有效的廣播算法去分配廣播變量,以減少通信的成本。
一個(ge)廣播變(bian)量(liang)可以通(tong)過調(diao)用(yong)SparkContext.broadcast(v)方(fang)法(fa)從一個(ge)初始變(bian)量(liang)v中創(chuang)建。廣播變(bian)量(liang)是v的一個(ge)包裝變(bian)量(liang),它的值可以通(tong)過value方(fang)法(fa)訪問(wen),下面的代碼(ma)說明了這個(ge)過程:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
從上文我們可(ke)(ke)以看出廣播(bo)變(bian)量的(de)(de)(de)聲明(ming)很簡單,調用broadcast就(jiu)能搞(gao)定(ding),并且scala中一(yi)切(qie)可(ke)(ke)序列(lie)化的(de)(de)(de)對象都是可(ke)(ke)以進行(xing)(xing)廣播(bo)的(de)(de)(de),這(zhe)就(jiu)給了我們很大(da)的(de)(de)(de)想象空間,可(ke)(ke)以利用廣播(bo)變(bian)量將一(yi)些經常訪問的(de)(de)(de)大(da)變(bian)量進行(xing)(xing)廣播(bo),而不是每(mei)個任務(wu)保存一(yi)份,這(zhe)樣可(ke)(ke)以減(jian)少(shao)資源上的(de)(de)(de)浪費。
更新廣播變量(rebroadcast)
廣播變量可以用來更新一些大的配置變量,比如數據庫中的一張表格,那么有這樣一個問題,如果數據庫當中的配置表格進行了更新,我們需要重新廣播變量該怎么做呢。上文對廣播變量的說明中,我們知道廣播變量是只讀的,也就是說廣播出去的變量沒法再修改,那么我們應該怎么解決這個問題呢?
答案是利用spark中的unpersist函數
Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method.
上文是從spark官方文檔摘抄出來的,我們可以看出,正常來說每個節點的數據是不需要我們操心的,spark會自動按照LRU規則將老數據刪除,如果需要手動刪除可以調用unpersist函數。
那么更新(xin)(xin)廣(guang)(guang)播(bo)(bo)變(bian)量的(de)基本思路(lu):將(jiang)老的(de)廣(guang)(guang)播(bo)(bo)變(bian)量刪除(unpersist),然后重新(xin)(xin)廣(guang)(guang)播(bo)(bo)一遍新(xin)(xin)的(de)廣(guang)(guang)播(bo)(bo)變(bian)量,為此簡(jian)單(dan)包裝了一個用于廣(guang)(guang)播(bo)(bo)和更新(xin)(xin)廣(guang)(guang)播(bo)(bo)變(bian)量的(de)wraper類,如(ru)下(xia):
import java.io.{ ObjectInputStream, ObjectOutputStream }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.StreamingContext
import scala.reflect.ClassTag
// This wrapper lets us update brodcast variables within DStreams' foreachRDD
// without running into serialization issues
case class BroadcastWrapper[T: ClassTag](
@transient private val ssc: StreamingContext,
@transient private val _v: T) {
@transient private var v = ssc.sparkContext.broadcast(_v)
def update(newValue: T, blocking: Boolean = false): Unit = {
// 刪除RDD是否需要鎖定
v.unpersist(blocking)
v = ssc.sparkContext.broadcast(newValue)
}
def value: T = v.value
private def writeObject(out: ObjectOutputStream): Unit = {
out.writeObject(v)
}
private def readObject(in: ObjectInputStream): Unit = {
v = in.readObject().asInstanceOf[Broadcast[T]]
}
}
利用該wrapper更新廣(guang)播變量,大(da)致的處理(li)邏輯如下:
// 定義
val yourBroadcast = BroadcastWrapper[yourType](ssc, yourValue)
yourStream.transform(rdd => {
//定期更新廣播變量
if (System.currentTimeMillis - someTime > Conf.updateFreq) {
yourBroadcast.update(newValue, true)
}
// do something else
})
總結
spark中的共享變量是我們能夠在全局做出一些操作,比如record總數的統計更新,一些大變量配置項的廣播等等。而對于廣播變量,我們也可以監控數據庫中的變化,做到定時的重新廣播新的數據表配置情況,另外我使用上述方式,在每天千萬級的數據實時流統計中表現穩定,所以有相似問題的同學也可以進行嘗試,有任何問題,歡迎隨時騷擾溝通v
廣告下我們項目:專注于游戲輿情的挖掘分析,歡迎大家來踩踩
