Spark 廣播變量BroadCast
一、 廣播變量
廣(guang)(guang)播(bo)(bo)變(bian)量(liang)(liang)允許(xu)程序員將一個(ge)只(zhi)讀的變(bian)量(liang)(liang)緩存(cun)在(zai)每臺機器上(shang),而不(bu)用在(zai)任務之間傳遞變(bian)量(liang)(liang)。廣(guang)(guang)播(bo)(bo)變(bian)量(liang)(liang)可(ke)被(bei)(bei)用于有(you)效地(di)(di)(di)給每個(ge)節點一個(ge)大(da)輸入數據(ju)(ju)(ju)(ju)集的副本。Spark還嘗試使用高效地(di)(di)(di)廣(guang)(guang)播(bo)(bo)算法來分發變(bian)量(liang)(liang),進而減少通信的開(kai)銷(xiao)。 Spark的動作(zuo)(zuo)通過一系(xi)列(lie)的步(bu)驟執行,這(zhe)(zhe)些步(bu)驟由分布式的洗牌操作(zuo)(zuo)分開(kai)。Spark自(zi)動地(di)(di)(di)廣(guang)(guang)播(bo)(bo)每個(ge)步(bu)驟每個(ge)任務需要(yao)的通用數據(ju)(ju)(ju)(ju)。這(zhe)(zhe)些廣(guang)(guang)播(bo)(bo)數據(ju)(ju)(ju)(ju)被(bei)(bei)序列(lie)化(hua)地(di)(di)(di)緩存(cun),在(zai)運行任務之前被(bei)(bei)反(fan)序列(lie)化(hua)出來。這(zhe)(zhe)意味著當我們需要(yao)在(zai)多(duo)個(ge)階(jie)段的任務之間使用相同的數據(ju)(ju)(ju)(ju),或者以反(fan)序列(lie)化(hua)形式緩存(cun)數據(ju)(ju)(ju)(ju)是(shi)十分重要(yao)的時(shi)候,顯(xian)式地(di)(di)(di)創建廣(guang)(guang)播(bo)(bo)變(bian)量(liang)(liang)才有(you)用。
二、為什么使用廣播變量
map副本,傳輸到了(le)各個(ge)task上(shang)(shang)之后(hou),是要(yao)占(zhan)用內(nei)(nei)(nei)(nei)存(cun)的(de)(de)。1個(ge)map的(de)(de)確不大,1M;1000個(ge)map分布在你(ni)的(de)(de)集群(qun)中,一下子(zi)就(jiu)(jiu)耗費掉(diao)1G的(de)(de)內(nei)(nei)(nei)(nei)存(cun)。對性能會有什么影響呢?不必要(yao)的(de)(de)內(nei)(nei)(nei)(nei)存(cun)的(de)(de)消耗和占(zhan)用,就(jiu)(jiu)導致(zhi)(zhi)了(le),你(ni)在進行RDD持久化到內(nei)(nei)(nei)(nei)存(cun),也許就(jiu)(jiu)沒法完(wan)全在內(nei)(nei)(nei)(nei)存(cun)中放下;就(jiu)(jiu)只能寫入磁盤(pan),最后(hou)導致(zhi)(zhi)后(hou)續的(de)(de)操作在磁盤(pan)IO上(shang)(shang)消耗性能;
你的(de)(de)(de)task在(zai)創建對象的(de)(de)(de)時候,也(ye)許會(hui)(hui)發現堆內(nei)存(cun)放不下所(suo)有(you)對象,也(ye)許就會(hui)(hui)導致頻繁(fan)的(de)(de)(de)垃圾(ji)回收器(qi)的(de)(de)(de)回收,GC。GC的(de)(de)(de)時候,一(yi)定(ding)是會(hui)(hui)導致工(gong)作線程停(ting)止,也(ye)就是導致Spark暫停(ting)工(gong)作那么一(yi)點時間。頻繁(fan)GC的(de)(de)(de)話,對Spark作業的(de)(de)(de)運行的(de)(de)(de)速度會(hui)(hui)有(you)相當可觀的(de)(de)(de)影(ying)響。
優點:
不是每個task一份副本,而是變成每個節點Executor上一個副本。
1.舉例來說:
50個Executor 1000個task。
一個map10M
默認情況下,1000個task 1000個副本
1000 * 10M = 10 000M = 10 G
10G的數據,網絡傳輸,在集群中,耗費10G的內存資源。
如果使用 廣播變(bian)量(liang),
50個Executor ,50個副本,10M*50 = 500M的數據。
網絡傳輸,而且不一定是從Drver傳輸到各個節點,還可能是從就近的節點
的(de)Executor的(de)BlockManager上(shang)獲取變量(liang)副本,網絡(luo)傳輸(shu)速度大大增加。
之前 10000M 現在 500M。
20倍網絡傳輸性能的消耗。20倍內存消耗的減少。
三、如何使用
開始(shi)使用broadcast變(bian)量,使用完后,程(cheng)序結束(shu)記得釋放
sc = SparkContext(appName=AppNames.JOURNEY_AGGREGATOR_APP_NAME) broadCastForLog = None try: broadCastForLog = ELogForDistributedApp.setLogConf2BroadCast(sc) elogging.initLogFromDict(broadCastForLog.value) except StandardError: pass ....... #執(zhi)行完程序邏輯,記(ji)得釋放該變量 if broadCastForLog is not None: broadCastForLog.unpersist(False)
#獲取要被(bei)共享的(de)大(da)變量,這里是(shi)log配(pei)置
class ELogForDistributedApp(object): LOGHDFSPATH = "/user/hdfs/test/logging/logging_hdfs.json" @staticmethod def setLogConf2BroadCast(sc): logFilePath = ELogForDistributedApp.LOGHDFSPATH if sc is not None: configDict = HDFSOperation.getConfigFromHDFS(logFilePath,sc) broadCast = sc.broadcast(configDict) #globals()['broadCast'] = broadCast #elogging.initLogFromDict(broadCast.value) return broadCast #print broadCast.value else: return None
def initLogFromDict(self):
elogging.initLogFromDict(self.eloggingConfig)
從hdfs中(zhong)找(zhao)到相應配(pei)置文件(jian)
class HDFSOperation(object): @staticmethod def getConfigFromHDFS(hdfsPath,sc): if sc is not None: filesystem_class = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem hadoop_configuration = sc._jsc.hadoopConfiguration() fs =filesystem_class.get(hadoop_configuration) path_class = sc._gateway.jvm.org.apache.hadoop.fs.Path pathObj = path_class(hdfsPath) try: hdfsInStream = fs.open(pathObj) bufferedReader_class = sc._gateway.jvm.java.io.BufferedReader inputStreamReader_class = sc._gateway.jvm.java.io.InputStreamReader bufferedReader = bufferedReader_class(inputStreamReader_class(hdfsInStream)) except IOError,msg: print str(msg) return None else: return None configStr = '' while True: tmpStr = bufferedReader.readLine() if tmpStr == None: break configStr += tmpStr try: confDict = json.loads(configStr) except IOError,msg: print str(msg) return None return confDict
