中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

Spark 廣播變量BroadCast

 

一、 廣播變量

 

廣(guang)(guang)播(bo)(bo)變(bian)量(liang)(liang)允許(xu)程序員將一個(ge)只(zhi)讀的變(bian)量(liang)(liang)緩存(cun)在(zai)每臺機器上(shang),而不(bu)用在(zai)任務之間傳遞變(bian)量(liang)(liang)。廣(guang)(guang)播(bo)(bo)變(bian)量(liang)(liang)可(ke)被(bei)(bei)用于有(you)效地(di)(di)(di)給每個(ge)節點一個(ge)大(da)輸入數據(ju)(ju)(ju)(ju)集的副本。Spark還嘗試使用高效地(di)(di)(di)廣(guang)(guang)播(bo)(bo)算法來分發變(bian)量(liang)(liang),進而減少通信的開(kai)銷(xiao)。 Spark的動作(zuo)(zuo)通過一系(xi)列(lie)的步(bu)驟執行,這(zhe)(zhe)些步(bu)驟由分布式的洗牌操作(zuo)(zuo)分開(kai)。Spark自(zi)動地(di)(di)(di)廣(guang)(guang)播(bo)(bo)每個(ge)步(bu)驟每個(ge)任務需要(yao)的通用數據(ju)(ju)(ju)(ju)。這(zhe)(zhe)些廣(guang)(guang)播(bo)(bo)數據(ju)(ju)(ju)(ju)被(bei)(bei)序列(lie)化(hua)地(di)(di)(di)緩存(cun),在(zai)運行任務之前被(bei)(bei)反(fan)序列(lie)化(hua)出來。這(zhe)(zhe)意味著當我們需要(yao)在(zai)多(duo)個(ge)階(jie)段的任務之間使用相同的數據(ju)(ju)(ju)(ju),或者以反(fan)序列(lie)化(hua)形式緩存(cun)數據(ju)(ju)(ju)(ju)是(shi)十分重要(yao)的時(shi)候,顯(xian)式地(di)(di)(di)創建廣(guang)(guang)播(bo)(bo)變(bian)量(liang)(liang)才有(you)用。

 

二、為什么使用廣播變量

假如(ru)我們(men)要共享的變量map,1M
在默認的,task執行的算子中,使用了外部的變量,每個(ge)task都會獲取(qu)一份變量的副本
在(zai)什么情(qing)況下,會出現性能上的惡劣的影響呢(ni)?
1000個task。大量(liang)task的確都在(zai)并行(xing)運行(xing)。這些task里面都用(yong)到了占用(yong)1M內存(cun)的map,那么首先(xian),map會拷(kao)貝1000份副本,通過(guo)網絡(luo)傳輸到各個task中去(qu),給task使用(yong)。總(zong)計有1G的數據,會通過(guo)網絡(luo)傳輸。網絡(luo)傳輸的開銷,不容(rong)樂觀啊!!!網絡(luo)傳輸,也許就會消耗掉你(ni)的spark作業運行(xing)的總(zong)時間的一小部(bu)分。
map副本,傳輸到了(le)各個(ge)task上(shang)(shang)之后(hou),是要(yao)占(zhan)用內(nei)(nei)(nei)(nei)存(cun)的(de)(de)。1個(ge)map的(de)(de)確不大,1M;1000個(ge)map分布在你(ni)的(de)(de)集群(qun)中,一下子(zi)就(jiu)(jiu)耗費掉(diao)1G的(de)(de)內(nei)(nei)(nei)(nei)存(cun)。對性能會有什么影響呢?不必要(yao)的(de)(de)內(nei)(nei)(nei)(nei)存(cun)的(de)(de)消耗和占(zhan)用,就(jiu)(jiu)導致(zhi)(zhi)了(le),你(ni)在進行RDD持久化到內(nei)(nei)(nei)(nei)存(cun),也許就(jiu)(jiu)沒法完(wan)全在內(nei)(nei)(nei)(nei)存(cun)中放下;就(jiu)(jiu)只能寫入磁盤(pan),最后(hou)導致(zhi)(zhi)后(hou)續的(de)(de)操作在磁盤(pan)IO上(shang)(shang)消耗性能;
你的(de)(de)(de)task在(zai)創建對象的(de)(de)(de)時候,也(ye)許會(hui)(hui)發現堆內(nei)存(cun)放不下所(suo)有(you)對象,也(ye)許就會(hui)(hui)導致頻繁(fan)的(de)(de)(de)垃圾(ji)回收器(qi)的(de)(de)(de)回收,GC。GC的(de)(de)(de)時候,一(yi)定(ding)是會(hui)(hui)導致工(gong)作線程停(ting)止,也(ye)就是導致Spark暫停(ting)工(gong)作那么一(yi)點時間。頻繁(fan)GC的(de)(de)(de)話,對Spark作業的(de)(de)(de)運行的(de)(de)(de)速度會(hui)(hui)有(you)相當可觀的(de)(de)(de)影(ying)響。
 
如果說,task使用大變量(1m~100m),明知道(dao)會導致性能出(chu)現惡劣的影(ying)響(xiang)。那么(me)我們怎(zen)么(me)來解決呢?
廣播(bo)(bo),Broadcast,將大(da)變量(liang)廣播(bo)(bo)出去。而不是直(zhi)接使用。
 
廣播變量的好處,不是每個task一份變量副本,而是變成每個節點的executor才一份副本。這樣的話,就可以讓變量產生的副本大大減少。
廣播變量(liang),初始的(de)時候,就(jiu)在(zai)Drvier上有一份副本。task在(zai)運行的(de)時候,想要使用廣播變量(liang)中的(de)數據,此時首(shou)先(xian)會在(zai)自己本地的(de)Executor對應(ying)的(de)
BlockManager中,嘗試獲取(qu)(qu)變(bian)量副本(ben);如果本(ben)地沒有(you),BlockManager,也許會從遠程的Driver上面去(qu)獲取(qu)(qu)變(bian)量副本(ben);也有(you)可(ke)能從距離(li)比較近的其他
節點的Executor的BlockManager上(shang)去獲取,并(bing)保存在(zai)本地(di)的BlockManager中;BlockManager負責管理某(mou)個Executor對應的內存和(he)磁盤上(shang)的數(shu)據,
此后這個executor上的(de)task,都會直接使用本(ben)地的(de)BlockManager中(zhong)的(de)副本(ben)。

優點:
    不是每個task一份副本,而是變成每個節點Executor上一個副本。

 

1.舉例來說:

50個Executor 1000個task。 
一個map10M 

默認情況下,1000個task 1000個副本

1000 * 10M = 10 000M = 10 G

10G的數據,網絡傳輸,在集群中,耗費10G的內存資源

如果使用 廣播變(bian)量(liang),

50個Executor ,50個副本,10M*50 = 500M的數據

網絡傳輸,而且不一定是從Drver傳輸到各個節點,還可能是從就近的節點 
的(de)Executor的(de)BlockManager上(shang)獲取變量(liang)副本,網絡(luo)傳輸(shu)速度大大增加。

之前 10000M 現在 500M

20倍網絡傳輸性能的消耗。20倍內存消耗的減少。

三、如何使用

開始(shi)使用broadcast變(bian)量,使用完后,程(cheng)序結束(shu)記得釋放

  sc = SparkContext(appName=AppNames.JOURNEY_AGGREGATOR_APP_NAME)
    broadCastForLog = None
    try:
        broadCastForLog = ELogForDistributedApp.setLogConf2BroadCast(sc)
        elogging.initLogFromDict(broadCastForLog.value)
    except StandardError:
        pass

.......
    #執(zhi)行完程序邏輯,記(ji)得釋放該變量

    if broadCastForLog is not None:
        broadCastForLog.unpersist(False)

#獲取要被(bei)共享的(de)大(da)變量,這里是(shi)log配(pei)置

 

class ELogForDistributedApp(object):

    LOGHDFSPATH = "/user/hdfs/test/logging/logging_hdfs.json"
    @staticmethod
    def setLogConf2BroadCast(sc):
        logFilePath = ELogForDistributedApp.LOGHDFSPATH
        if sc is not None:
            configDict = HDFSOperation.getConfigFromHDFS(logFilePath,sc)
            broadCast = sc.broadcast(configDict)
            #globals()['broadCast'] = broadCast
            #elogging.initLogFromDict(broadCast.value)
            return broadCast
            #print broadCast.value
        else:
            return None

 

    def initLogFromDict(self):
        elogging.initLogFromDict(self.eloggingConfig)

 

從hdfs中(zhong)找(zhao)到相應配(pei)置文件(jian)

class HDFSOperation(object):

    @staticmethod
    def getConfigFromHDFS(hdfsPath,sc):
        if sc is not None:
            filesystem_class = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
            hadoop_configuration = sc._jsc.hadoopConfiguration()
            fs =filesystem_class.get(hadoop_configuration)
            path_class = sc._gateway.jvm.org.apache.hadoop.fs.Path
            pathObj = path_class(hdfsPath)
            try:
                hdfsInStream = fs.open(pathObj)
                bufferedReader_class = sc._gateway.jvm.java.io.BufferedReader
                inputStreamReader_class = sc._gateway.jvm.java.io.InputStreamReader
                bufferedReader = bufferedReader_class(inputStreamReader_class(hdfsInStream))
            except IOError,msg:
                print str(msg)
                return None

        else:
            return None
        configStr = ''
        while True:
            tmpStr = bufferedReader.readLine()
            if tmpStr == None:
                break
            configStr += tmpStr
        try:
            confDict = json.loads(configStr)
        except IOError,msg:
            print str(msg)
            return None
        return confDict

 

參考文檔

  1. Spark踩坑記——共享變量

posted @ 2017-12-03 23:15  ^_TONY_^  閱讀(3760)  評論(0)    收藏  舉報