Spark性能優化指南——基礎篇
本文轉自://tech.meituan.com/spark-tuning-basic.html
感謝原作者
前言
在大數(shu)據計(ji)算(suan)領域,Spark已(yi)經(jing)成為(wei)了越(yue)來越(yue)流(liu)行(xing)、越(yue)來越(yue)受歡迎(ying)的計(ji)算(suan)平(ping)臺之一。Spark的功能涵蓋(gai)了大數(shu)據領域的離線批處(chu)理、SQL類處(chu)理、流(liu)式/實時計(ji)算(suan)、機器(qi)學習(xi)、圖(tu)計(ji)算(suan)等各種(zhong)不同(tong)類型的計(ji)算(suan)操作,應用范圍與前景非(fei)常廣泛。在美(mei)團?大眾點評(ping),已(yi)經(jing)有(you)很多(duo)同(tong)學在各種(zhong)項(xiang)目(mu)中(zhong)嘗試使用Spark。大多(duo)數(shu)同(tong)學(包括(kuo)筆者(zhe)在內(nei)),最初開始嘗試使用Spark的原(yuan)因很簡(jian)單,主要就(jiu)是為(wei)了讓大數(shu)據計(ji)算(suan)作業的執行(xing)速度更快、性能更高。
然而,通過Spark開發出高性(xing)(xing)能(neng)(neng)的(de)大(da)數(shu)據計算作業,并不是那(nei)么簡單的(de)。如果沒有對Spark作業進行合理的(de)調優(you),Spark作業的(de)執行速度可能(neng)(neng)會很(hen)慢,這(zhe)樣(yang)就完全(quan)體(ti)現(xian)不出Spark作為(wei)一種快速大(da)數(shu)據計算引擎(qing)的(de)優(you)勢來。因此,想要用好Spark,就必須(xu)對其進行合理的(de)性(xing)(xing)能(neng)(neng)優(you)化。
Spark的(de)性(xing)(xing)能(neng)調(diao)優(you)(you)實際上(shang)是由很多部分組成(cheng)的(de),不是調(diao)節幾個(ge)參數就可以立竿見影提升作業(ye)(ye)性(xing)(xing)能(neng)的(de)。我們(men)需要根據不同的(de)業(ye)(ye)務場景以及數據情況,對Spark作業(ye)(ye)進(jin)(jin)行綜合(he)性(xing)(xing)的(de)分析,然(ran)后(hou)進(jin)(jin)行多個(ge)方(fang)面的(de)調(diao)節和(he)優(you)(you)化,才(cai)能(neng)獲得(de)最佳性(xing)(xing)能(neng)。
筆者根據之前的(de)(de)Spark作(zuo)(zuo)業(ye)(ye)開(kai)發經驗以(yi)及實踐積累,總結出了一(yi)套Spark作(zuo)(zuo)業(ye)(ye)的(de)(de)性能優化方(fang)(fang)案。整套方(fang)(fang)案主要(yao)(yao)分為開(kai)發調(diao)優、資源(yuan)調(diao)優、數據傾(qing)斜調(diao)優、shuffle調(diao)優幾個(ge)部分。開(kai)發調(diao)優和(he)(he)資源(yuan)調(diao)優是所有Spark作(zuo)(zuo)業(ye)(ye)都需要(yao)(yao)注(zhu)意(yi)和(he)(he)遵循的(de)(de)一(yi)些基(ji)本原則,是高(gao)性能Spark作(zuo)(zuo)業(ye)(ye)的(de)(de)基(ji)礎;數據傾(qing)斜調(diao)優,主要(yao)(yao)講解了一(yi)套完整的(de)(de)用來(lai)解決(jue)Spark作(zuo)(zuo)業(ye)(ye)數據傾(qing)斜的(de)(de)解決(jue)方(fang)(fang)案;shuffle調(diao)優,面向的(de)(de)是對Spark的(de)(de)原理有較深層次(ci)掌(zhang)握和(he)(he)研究的(de)(de)同學,主要(yao)(yao)講解了如(ru)何對Spark作(zuo)(zuo)業(ye)(ye)的(de)(de)shuffle運(yun)行過(guo)程(cheng)以(yi)及細節進行調(diao)優。
本文作(zuo)為Spark性能優(you)(you)化(hua)指南的基礎篇,主(zhu)要講(jiang)解開發調優(you)(you)以及資(zi)源調優(you)(you)。
開(kai)發調優
調(diao)優概(gai)述
Spark性(xing)(xing)能優(you)化(hua)(hua)的第(di)一(yi)步,就(jiu)是要(yao)在開(kai)發(fa)Spark作(zuo)業(ye)(ye)的過(guo)程中注(zhu)意和應用(yong)一(yi)些性(xing)(xing)能優(you)化(hua)(hua)的基(ji)本原(yuan)則。開(kai)發(fa)調優(you),就(jiu)是要(yao)讓大家了(le)解(jie)以下一(yi)些Spark基(ji)本開(kai)發(fa)原(yuan)則,包括:RDD lineage設計、算子的合(he)理使用(yong)、特殊操(cao)作(zuo)的優(you)化(hua)(hua)等(deng)。在開(kai)發(fa)過(guo)程中,時時刻刻都應該(gai)注(zhu)意以上原(yuan)則,并將這(zhe)些原(yuan)則根據具體的業(ye)(ye)務以及實際的應用(yong)場景,靈活地運用(yong)到自己的Spark作(zuo)業(ye)(ye)中。
原則一:避(bi)免(mian)創建重復(fu)的RDD
通常來說(shuo),我們在(zai)開(kai)發一(yi)個(ge)Spark作業時,首(shou)先是基于某(mou)個(ge)數據源(yuan)(比如Hive表或HDFS文件(jian))創建(jian)一(yi)個(ge)初始的RDD;接著對(dui)這(zhe)個(ge)RDD執行(xing)某(mou)個(ge)算(suan)子操作,然后得(de)到下一(yi)個(ge)RDD;以此類推,循環(huan)往復(fu),直(zhi)到計算(suan)出最(zui)終我們需要的結果。在(zai)這(zhe)個(ge)過(guo)(guo)程中,多個(ge)RDD會通過(guo)(guo)不同的算(suan)子操作(比如map、reduce等)串起來,這(zhe)個(ge)“RDD串”,就是RDD lineage,也就是“RDD的血緣關系鏈”。
我們在開發過程中要注意:對于同一(yi)(yi)份數據(ju)(ju),只應該創建(jian)一(yi)(yi)個(ge)RDD,不能創建(jian)多個(ge)RDD來(lai)代表同一(yi)(yi)份數據(ju)(ju)。
一些(xie)Spark初學者在剛開始開發Spark作業(ye)時,或者是有經(jing)驗(yan)的(de)工(gong)程師在開發RDD lineage極其冗長的(de)Spark作業(ye)時,可能(neng)會忘了自己之前對于某一份(fen)數據(ju)(ju)已經(jing)創建過一個RDD了,從而導致對于同一份(fen)數據(ju)(ju),創建了多個RDD。這就意味著,我們的(de)Spark作業(ye)會進行多次重復計算來創建多個代表相同數據(ju)(ju)的(de)RDD,進而增加了作業(ye)的(de)性(xing)能(neng)開銷。
一(yi)個簡單的(de)例子
// 需要對(dui)名(ming)為(wei)“hello.txt”的HDFS文件進行一(yi)(yi)次map操作(zuo),再進行一(yi)(yi)次reduce操作(zuo)。也就是說(shuo),需要對(dui)一(yi)(yi)份數據(ju)執(zhi)行兩次算子(zi)操作(zuo)。
// 錯誤的(de)做(zuo)法:對于(yu)同(tong)一份數據(ju)執行多次算子操作(zuo)時,創建多個RDD。
// 這里執(zhi)行了(le)兩(liang)次textFile方法,針對同一(yi)個(ge)HDFS文件(jian),創建了(le)兩(liang)個(ge)RDD出來,然(ran)后分別對每個(ge)RDD都(dou)執(zhi)行了(le)一(yi)個(ge)算子操(cao)作(zuo)。
// 這種情況下,Spark需要從HDFS上兩(liang)次加(jia)載hello.txt文件的(de)內容,并創建(jian)兩(liang)個單獨的(de)RDD;第(di)二次加(jia)載HDFS文件以及創建(jian)RDD的(de)性能開銷,很(hen)明顯(xian)是白白浪(lang)費掉的(de)。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd1.map(...)
val rdd2 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd2.reduce(...)
// 正確(que)的用法:對于一(yi)份(fen)數(shu)據執行(xing)多次算子操作(zuo)時(shi),只使用一(yi)個(ge)RDD。
// 這(zhe)種寫法很明(ming)顯比上一(yi)種寫法要(yao)好多了(le),因為我們(men)對于同(tong)一(yi)份數據只創建了(le)一(yi)個RDD,然后對這(zhe)一(yi)個RDD執行了(le)多次算子操作。
// 但是(shi)要(yao)注意到這里為止優化還沒有結束(shu),由于(yu)rdd1被執行(xing)了兩次(ci)算(suan)(suan)子操作(zuo),第二次(ci)執行(xing)reduce操作(zuo)的(de)時候,還會(hui)(hui)再(zai)次(ci)從源頭處重新計算(suan)(suan)一次(ci)rdd1的(de)數據(ju),因此還是(shi)會(hui)(hui)有重復計算(suan)(suan)的(de)性能開(kai)銷。
// 要徹底解決這個問題,必須結合“原(yuan)則三:對多(duo)次(ci)使用(yong)的RDD進行持久化”,才能保證一個RDD被(bei)多(duo)次(ci)使用(yong)時(shi)只被(bei)計算一次(ci)。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd1.map(...)
rdd1.reduce(...)
原則二(er):盡可能復用(yong)同一(yi)個RDD
除(chu)了(le)要(yao)避免在(zai)開發(fa)過程中對一(yi)份完全(quan)相同(tong)的(de)(de)數(shu)(shu)據創(chuang)建多個(ge)RDD之外,在(zai)對不同(tong)的(de)(de)數(shu)(shu)據執行(xing)(xing)算子操(cao)作時(shi)還要(yao)盡可(ke)能(neng)地復用(yong)一(yi)個(ge)RDD。比如說(shuo),有一(yi)個(ge)RDD的(de)(de)數(shu)(shu)據格式是(shi)key-value類(lei)型(xing)的(de)(de),另(ling)一(yi)個(ge)是(shi)單value類(lei)型(xing)的(de)(de),這兩個(ge)RDD的(de)(de)value數(shu)(shu)據是(shi)完全(quan)一(yi)樣的(de)(de)。那(nei)么此時(shi)我們可(ke)以(yi)(yi)只(zhi)使用(yong)key-value類(lei)型(xing)的(de)(de)那(nei)個(ge)RDD,因為其中已經包(bao)含(han)了(le)另(ling)一(yi)個(ge)的(de)(de)數(shu)(shu)據。對于類(lei)似這種多個(ge)RDD的(de)(de)數(shu)(shu)據有重疊或者包(bao)含(han)的(de)(de)情況(kuang),我們應該盡量(liang)復用(yong)一(yi)個(ge)RDD,這樣可(ke)以(yi)(yi)盡可(ke)能(neng)地減少RDD的(de)(de)數(shu)(shu)量(liang),從而盡可(ke)能(neng)減少算子執行(xing)(xing)的(de)(de)次數(shu)(shu)。
一個簡(jian)單的例子
// 錯誤的做法。
// 有一個<Long, String>格(ge)式的RDD,即rdd1。
// 接著(zhu)由于業務需要,對rdd1執行了一個(ge)map操(cao)作,創建了一個(ge)rdd2,而(er)rdd2中的數據僅(jin)僅(jin)是(shi)rdd1中的value值(zhi)而(er)已(yi),也就是(shi)說(shuo),rdd2是(shi)rdd1的子集。
JavaPairRDD<Long, String> rdd1 = ...
JavaRDD<String> rdd2 = rdd1.map(...)
// 分別(bie)對rdd1和rdd2執行了不同的算子操作。
rdd1.reduceByKey(...)
rdd2.map(...)
// 正確的做法。
// 上(shang)面這(zhe)個case中,其實rdd1和(he)rdd2的區別無非就(jiu)是數據(ju)格式不(bu)同而已(yi),rdd2的數據(ju)完全就(jiu)是rdd1的子集而已(yi),卻創(chuang)建了兩(liang)(liang)個rdd,并對兩(liang)(liang)個rdd都執(zhi)行了一次算子操作。
// 此(ci)時會因為對rdd1執(zhi)行map算子來創建rdd2,而(er)(er)多執(zhi)行一次算子操作,進而(er)(er)增加性(xing)能開(kai)銷。
// 其實在這種情況下完全可以復用(yong)同一個RDD。
// 我們可以(yi)使(shi)用rdd1,既做(zuo)reduceByKey操作(zuo),也做(zuo)map操作(zuo)。
// 在進(jin)行第二個(ge)map操作時,只使用每個(ge)數據的(de)(de)tuple._2,也就是rdd1中(zhong)的(de)(de)value值,即(ji)可。
JavaPairRDD<Long, String> rdd1 = ...
rdd1.reduceByKey(...)
rdd1.map(tuple._2...)
// 第(di)二種方式相較于第(di)一(yi)種方式而言,很明顯減(jian)少了一(yi)次rdd2的計(ji)算開(kai)銷。
// 但是(shi)到這里為止,優化(hua)還(huan)沒有結(jie)束(shu),對rdd1我們還(huan)是(shi)執(zhi)行了兩(liang)次(ci)算子(zi)操作,rdd1實際上還(huan)是(shi)會被(bei)計算兩(liang)次(ci)。
// 因此還需要配合(he)“原(yuan)則(ze)三:對多次(ci)(ci)使(shi)用的RDD進(jin)行持久化”進(jin)行使(shi)用,才能保證一個RDD被(bei)(bei)多次(ci)(ci)使(shi)用時只被(bei)(bei)計算一次(ci)(ci)。
原則三:對多(duo)次使用的(de)RDD進(jin)行持久化
當(dang)你在(zai)Spark代(dai)碼中多(duo)次對一(yi)個(ge)RDD做了(le)算(suan)子操作后,恭喜,你已(yi)經實現Spark作業(ye)第一(yi)步的優(you)化了(le),也(ye)就是盡可能復用RDD。此時就該在(zai)這(zhe)個(ge)基礎之上(shang),進行(xing)第二步優(you)化了(le),也(ye)就是要(yao)保證對一(yi)個(ge)RDD執行(xing)多(duo)次算(suan)子操作時,這(zhe)個(ge)RDD本身(shen)僅僅被計算(suan)一(yi)次。
Spark中對(dui)于一(yi)個RDD執行多次算(suan)子的(de)(de)默認原理(li)是這(zhe)樣的(de)(de):每次你對(dui)一(yi)個RDD執行一(yi)個算(suan)子操作時,都會重新(xin)從(cong)源頭(tou)處計算(suan)一(yi)遍,計算(suan)出(chu)那個RDD來,然后再(zai)對(dui)這(zhe)個RDD執行你的(de)(de)算(suan)子操作。這(zhe)種方式(shi)的(de)(de)性能是很(hen)差的(de)(de)。
因此對于這種情況,我們的建議是:對多次使用的RDD進行持久化。此時Spark就會根據你的持久化策略,將RDD中的數據保存到內存或者磁盤中。以后(hou)每(mei)次(ci)對這個(ge)RDD進行(xing)(xing)算子操作(zuo)時,都(dou)會(hui)(hui)直接(jie)從內(nei)存或磁盤中提取持(chi)久化的(de)RDD數據,然(ran)后(hou)執行(xing)(xing)算子,而不會(hui)(hui)從源頭處重新(xin)計算一遍這個(ge)RDD,再執行(xing)(xing)算子操作(zuo)。
對多次使用的RDD進(jin)行持久化的代碼示例
// 如(ru)果(guo)要(yao)對一(yi)個RDD進行持久化,只要(yao)對這個RDD調用(yong)cache()和persist()即可(ke)。
// 正確的做法。
// cache()方(fang)法表示:使(shi)用非序列化(hua)的方(fang)式將RDD中的數據全部嘗試持久化(hua)到內存中。
// 此時再對(dui)rdd1執行兩次(ci)算(suan)子操作(zuo)時,只有在第一(yi)次(ci)執行map算(suan)子時,才會將這個rdd1從源頭處計算(suan)一(yi)次(ci)。
// 第二次執(zhi)行(xing)reduce算子時,就(jiu)會直接從內(nei)存中(zhong)提取數據進行(xing)計算,不(bu)會重復計算一個rdd。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()
rdd1.map(...)
rdd1.reduce(...)
// persist()方(fang)(fang)法表示(shi):手動(dong)選擇持久化(hua)級別,并使用指定(ding)的方(fang)(fang)式進(jin)行持久化(hua)。
// 比(bi)如說(shuo),StorageLevel.MEMORY_AND_DISK_SER表示,內(nei)存(cun)充足(zu)時優(you)先持久(jiu)化(hua)到內(nei)存(cun)中,內(nei)存(cun)不充足(zu)時持久(jiu)化(hua)到磁盤(pan)文件(jian)中。
// 而(er)且其中(zhong)的_SER后綴表示,使用(yong)序列化的方式(shi)來(lai)保存RDD數據,此時RDD中(zhong)的每(mei)個partition都會序列化成(cheng)一個大的字節數組(zu),然后再持久化到內存或磁盤中(zhong)。
// 序列(lie)化(hua)的(de)方式可以(yi)減少持久(jiu)化(hua)的(de)數(shu)據對內(nei)存/磁盤的(de)占用量,進而(er)避(bi)免(mian)內(nei)存被持久(jiu)化(hua)數(shu)據占用過多(duo),從而(er)發生頻繁(fan)GC。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)
對于persist()方法而言(yan),我們可以根(gen)據不同的(de)業務場景選擇不同的(de)持久(jiu)化級別。
Spark的持久化級別
| 持久(jiu)化級(ji)別 | 含義解釋 |
|---|---|
| MEMORY_ONLY | 使(shi)用(yong)未(wei)序列化(hua)的Java對象格式,將數據(ju)保存(cun)在內存(cun)中。如果內存(cun)不(bu)夠存(cun)放所(suo)有的數據(ju),則(ze)數據(ju)可能就不(bu)會(hui)進行(xing)持(chi)久化(hua)。那么下次對這個RDD執行(xing)算子操(cao)作時(shi),那些沒有被持(chi)久化(hua)的數據(ju),需要從源頭處重新計算一(yi)遍。這是默認的持(chi)久化(hua)策(ce)略(lve),使(shi)用(yong)cache()方法時(shi),實際就是使(shi)用(yong)的這種持(chi)久化(hua)策(ce)略(lve)。 |
| MEMORY_AND_DISK | 使用未序(xu)列(lie)化的(de)(de)Java對象格式,優先嘗試將(jiang)數據(ju)保(bao)存在內存中。如(ru)果內存不(bu)夠(gou)存放所(suo)有的(de)(de)數據(ju),會(hui)將(jiang)數據(ju)寫入磁盤(pan)文件中,下次對這個(ge)RDD執(zhi)行算子時,持久化在磁盤(pan)文件中的(de)(de)數據(ju)會(hui)被讀取出來使用。 |
| MEMORY_ONLY_SER | 基本含(han)義同MEMORY_ONLY。唯一(yi)的區別是,會(hui)(hui)將RDD中的數據進(jin)行序列化,RDD的每個partition會(hui)(hui)被序列化成一(yi)個字節數組。這種方式更加節省(sheng)內(nei)存(cun),從而可以(yi)避免(mian)持(chi)久化的數據占(zhan)用過多內(nei)存(cun)導致(zhi)頻繁(fan)GC。 |
| MEMORY_AND_DISK_SER | 基(ji)本含義同(tong)MEMORY_AND_DISK。唯一的區別是,會將(jiang)RDD中的數據(ju)進行序(xu)列(lie)化,RDD的每個(ge)(ge)partition會被序(xu)列(lie)化成一個(ge)(ge)字(zi)節數組。這種(zhong)方式更加節省(sheng)內存,從而(er)可以避(bi)免(mian)持久化的數據(ju)占用過(guo)多內存導致頻繁GC。 |
| DISK_ONLY | 使用(yong)未序列化的Java對象格式,將數(shu)據全部寫入磁盤文(wen)件中。 |
| MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等(deng)(deng)等(deng)(deng). | 對(dui)于上(shang)述任意一(yi)種持久(jiu)化策略,如果(guo)加上(shang)后綴_2,代(dai)表的(de)是將(jiang)每個持久(jiu)化的(de)數據(ju)(ju)(ju),都復制一(yi)份副(fu)本,并將(jiang)副(fu)本保(bao)存到其他節點(dian)上(shang)。這種基于副(fu)本的(de)持久(jiu)化機制主(zhu)要用于進行容錯。假如某個節點(dian)掛(gua)掉(diao),節點(dian)的(de)內存或磁盤中(zhong)的(de)持久(jiu)化數據(ju)(ju)(ju)丟失了,那(nei)么后續對(dui)RDD計(ji)(ji)算時還可以使(shi)用該數據(ju)(ju)(ju)在其他節點(dian)上(shang)的(de)副(fu)本。如果(guo)沒(mei)有副(fu)本的(de)話,就(jiu)只能將(jiang)這些數據(ju)(ju)(ju)從源頭處重新計(ji)(ji)算一(yi)遍了。 |
如(ru)何選擇(ze)一種最合適的持久化策略
-
默認情況下,性能最高的當然是MEMORY_ONLY,但前提是你的內存必須足夠足夠大,可以綽綽有余地存放下整個RDD的所有數據。因為不進行序列化與反序列化操作,就避免了這部分的性能開銷;對這個RDD的后續算子操作,都是基于純內存中的數據的操作,不需要從磁盤文件中讀取數據,性能也很高;而且不需要復制一份數據副本,并遠程傳送到其他節點上。但是這里必須要注意的是,在實際的生產環境中,恐怕能夠直接用這種策略的場景還是有限的,如果RDD中數據比較多時(比如幾十億),直接用這種持久化級別,會導致JVM的OOM內存溢出異常。
-
如果使用MEMORY_ONLY級別時發生了內存溢出,那么建議嘗試使用MEMORY_ONLY_SER級別。該級別會將RDD數據序列化后再保存在內存中,此時每個partition僅僅是一個字節數組而已,大大減少了對象數量,并降低了內存占用。這種級別比MEMORY_ONLY多出來的性能開銷,主要就是序列化與反序列化的開銷。但是后續算子可以基于純內存進行操作,因此性能總體還是比較高的。此外,可能發生的問題同上,如果RDD中的數據量過多的話,還是可能會導致OOM內存溢出的異常。
-
如果純內存的級別都無法使用,那么建議使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因為既然到了這一步,就說明RDD的數據量很大,內存無法完全放下。序列化后的數據比較少,可以節省內存和磁盤的空間開銷。同時該策略會優先盡量嘗試將數據緩存在內存中,內存緩存不下才會寫入磁盤。
-
通常不建議使(shi)用(yong)DISK_ONLY和后(hou)綴為_2的(de)(de)級別:因(yin)為完全基(ji)于磁盤文件進行數據(ju)的(de)(de)讀寫,會(hui)導(dao)致性(xing)能(neng)急劇降低,有(you)(you)時還不如(ru)重(zhong)新計(ji)算一次所(suo)有(you)(you)RDD。后(hou)綴為_2的(de)(de)級別,必須將所(suo)有(you)(you)數據(ju)都復制(zhi)一份(fen)副本,并發送到其他節(jie)點(dian)上,數據(ju)復制(zhi)以及網絡傳輸(shu)會(hui)導(dao)致較大(da)的(de)(de)性(xing)能(neng)開(kai)銷,除(chu)非(fei)是(shi)要求(qiu)作業(ye)的(de)(de)高(gao)可用(yong)性(xing),否(fou)則不建議使(shi)用(yong)。
原則四:盡量避(bi)免使(shi)用(yong)shuffle類算子(zi)
如果(guo)有(you)可能的話(hua),要盡量避免使用(yong)shuffle類算子(zi)。因為(wei)Spark作業(ye)運行過(guo)程中,最消(xiao)耗(hao)性能的地方就(jiu)是(shi)shuffle過(guo)程。shuffle過(guo)程,簡(jian)單來說,就(jiu)是(shi)將(jiang)分(fen)布在集(ji)群中多個節點(dian)上的同(tong)一個key,拉取到(dao)同(tong)一個節點(dian)上,進行聚合或join等操作。比如reduceByKey、join等算子(zi),都會(hui)觸發shuffle操作。
shuffle過程中,各個節點上的相同key都會先寫入本地磁盤文件中,然后其他節點需要通過網絡傳輸拉取各個節點上的磁盤文件中的相同key。而且相同key都拉取到同一個節點進行聚合操作時,還有可能會因為一個節點上處理的key過多,導致內存不夠存放,進而溢寫到磁盤文件中。因此在shuffle過程中,可能會發生(sheng)大量的磁盤文(wen)件讀寫(xie)的IO操作(zuo),以及數據(ju)的網絡(luo)傳輸操作(zuo)。磁盤IO和網絡數據傳輸也是shuffle性能較差的主要原因。
因此在我們(men)的(de)開(kai)發過程中(zhong),能避免則盡可能避免使(shi)用reduceByKey、join、distinct、repartition等會進行shuffle的(de)算子,盡量使(shi)用map類的(de)非(fei)shuffle算子。這(zhe)樣的(de)話,沒有shuffle操(cao)作(zuo)或者僅有較少shuffle操(cao)作(zuo)的(de)Spark作(zuo)業(ye),可以大大減少性(xing)能開(kai)銷。
Broadcast與map進(jin)行join代碼(ma)示例(li)
// 傳(chuan)統的join操作會導(dao)致shuffle操作。
// 因為(wei)兩個RDD中,相(xiang)同的key都(dou)需要(yao)通過網(wang)絡(luo)拉(la)取到一個節(jie)點上(shang),由(you)一個task進(jin)行(xing)join操作。
val rdd3 = rdd1.join(rdd2)
// Broadcast+map的join操作,不會導致shuffle操作。
// 使用Broadcast將(jiang)一個(ge)數據量較小的RDD作為廣播變量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
// 在rdd1.map算子中,可以從rdd2DataBroadcast中,獲取rdd2的所(suo)有數據。
// 然后(hou)進行遍(bian)歷,如(ru)果發現rdd2中某條(tiao)數(shu)據(ju)的(de)key與(yu)rdd1的(de)當(dang)前(qian)數(shu)據(ju)的(de)key是相同的(de),那么就(jiu)判定可以(yi)進行join。
// 此時就可以根據自(zi)己需要(yao)的(de)方式,將rdd1當(dang)前(qian)數據與rdd2中可以連接的(de)數據,拼接在一(yi)起(String或Tuple)。
val rdd3 = rdd1.map(rdd2DataBroadcast...)
// 注(zhu)意(yi),以上操作,建議僅僅在(zai)rdd2的(de)數據量比較少(比如幾百M,或者(zhe)一(yi)兩G)的(de)情況下使用。
// 因為每個Executor的(de)內存中,都會(hui)駐留一份rdd2的(de)全量數(shu)據。
原則五:使(shi)用map-side預聚合的(de)shuffle操作
如果因為業務(wu)需要,一定要使用shuffle操作(zuo),無法(fa)用map類的(de)算(suan)子(zi)來替(ti)代,那么盡量(liang)使用可以map-side預聚合(he)的(de)算(suan)子(zi)。
所謂的(de)map-side預聚(ju)(ju)合(he)(he),說的(de)是在(zai)每個節(jie)點(dian)(dian)本地(di)對相(xiang)(xiang)同(tong)的(de)key進(jin)(jin)行(xing)一(yi)次聚(ju)(ju)合(he)(he)操作,類似(si)于(yu)MapReduce中的(de)本地(di)combiner。map-side預聚(ju)(ju)合(he)(he)之后,每個節(jie)點(dian)(dian)本地(di)就只(zhi)會(hui)(hui)有一(yi)條相(xiang)(xiang)同(tong)的(de)key,因為多條相(xiang)(xiang)同(tong)的(de)key都(dou)被聚(ju)(ju)合(he)(he)起來(lai)(lai)了(le)。其他節(jie)點(dian)(dian)在(zai)拉(la)取(qu)所有節(jie)點(dian)(dian)上的(de)相(xiang)(xiang)同(tong)key時,就會(hui)(hui)大大減少需要拉(la)取(qu)的(de)數(shu)(shu)(shu)據數(shu)(shu)(shu)量(liang),從(cong)而也就減少了(le)磁(ci)盤IO以及網絡傳(chuan)輸開銷(xiao)。通(tong)常(chang)來(lai)(lai)說,在(zai)可能的(de)情況(kuang)下,建議(yi)使(shi)用(yong)reduceByKey或者aggregateByKey算子來(lai)(lai)替(ti)代掉groupByKey算子。因為reduceByKey和aggregateByKey算子都(dou)會(hui)(hui)使(shi)用(yong)用(yong)戶(hu)自(zi)定義的(de)函數(shu)(shu)(shu)對每個節(jie)點(dian)(dian)本地(di)的(de)相(xiang)(xiang)同(tong)key進(jin)(jin)行(xing)預聚(ju)(ju)合(he)(he)。而groupByKey算子是不(bu)會(hui)(hui)進(jin)(jin)行(xing)預聚(ju)(ju)合(he)(he)的(de),全量(liang)的(de)數(shu)(shu)(shu)據會(hui)(hui)在(zai)集(ji)群的(de)各個節(jie)點(dian)(dian)之間分發和傳(chuan)輸,性(xing)能相(xiang)(xiang)對來(lai)(lai)說比較差。
比如如下兩(liang)幅圖(tu),就是(shi)典型(xing)的(de)(de)例子(zi),分(fen)別基于reduceByKey和(he)groupByKey進行單詞計數(shu)。其中第(di)一張圖(tu)是(shi)groupByKey的(de)(de)原(yuan)(yuan)理圖(tu),可以看(kan)到,沒有(you)進行任何本(ben)(ben)地(di)聚(ju)合(he)時(shi),所有(you)數(shu)據都會在集群節(jie)點之(zhi)間傳輸(shu);第(di)二張圖(tu)是(shi)reduceByKey的(de)(de)原(yuan)(yuan)理圖(tu),可以看(kan)到,每個節(jie)點本(ben)(ben)地(di)的(de)(de)相同key數(shu)據,都進行了預聚(ju)合(he),然后才傳輸(shu)到其他節(jie)點上進行全(quan)局聚(ju)合(he)。


原則(ze)六:使用高性能(neng)的算子
除了shuffle相關的(de)算子有(you)優(you)化原(yuan)則之外,其(qi)他(ta)的(de)算子也都有(you)著相應的(de)優(you)化原(yuan)則。
使用reduceByKey/aggregateByKey替代groupByKey
詳(xiang)情(qing)見“原(yuan)則五:使用map-side預聚合(he)的shuffle操作”。
使用mapPartitions替(ti)代普通(tong)map
mapPartitions類(lei)的(de)(de)算子,一(yi)(yi)次(ci)函數(shu)調(diao)用會處理一(yi)(yi)個partition所(suo)(suo)有(you)的(de)(de)數(shu)據,而不是一(yi)(yi)次(ci)函數(shu)調(diao)用處理一(yi)(yi)條,性能相對來說(shuo)會高一(yi)(yi)些。但(dan)是有(you)的(de)(de)時候,使(shi)用mapPartitions會出現OOM(內(nei)存(cun)溢出)的(de)(de)問題。因為單次(ci)函數(shu)調(diao)用就(jiu)要處理掉一(yi)(yi)個partition所(suo)(suo)有(you)的(de)(de)數(shu)據,如(ru)果內(nei)存(cun)不夠,垃圾回收時是無(wu)法回收掉太(tai)多對象的(de)(de),很(hen)可能出現OOM異常。所(suo)(suo)以(yi)使(shi)用這(zhe)類(lei)操作(zuo)時要慎重(zhong)!
使用foreachPartitions替(ti)代foreach
原理(li)類似于“使用(yong)mapPartitions替代map”,也(ye)是一次函(han)(han)數(shu)(shu)(shu)(shu)調(diao)用(yong)處(chu)理(li)一個partition的(de)(de)(de)所有數(shu)(shu)(shu)(shu)據(ju)(ju)(ju)(ju),而不(bu)是一次函(han)(han)數(shu)(shu)(shu)(shu)調(diao)用(yong)處(chu)理(li)一條(tiao)數(shu)(shu)(shu)(shu)據(ju)(ju)(ju)(ju)。在實踐中發現(xian)(xian),foreachPartitions類的(de)(de)(de)算子,對性能(neng)(neng)的(de)(de)(de)提升還(huan)是很有幫(bang)助(zhu)的(de)(de)(de)。比(bi)如(ru)在foreach函(han)(han)數(shu)(shu)(shu)(shu)中,將RDD中所有數(shu)(shu)(shu)(shu)據(ju)(ju)(ju)(ju)寫MySQL,那么(me)如(ru)果是普通的(de)(de)(de)foreach算子,就會(hui)一條(tiao)數(shu)(shu)(shu)(shu)據(ju)(ju)(ju)(ju)一條(tiao)數(shu)(shu)(shu)(shu)據(ju)(ju)(ju)(ju)地寫,每次函(han)(han)數(shu)(shu)(shu)(shu)調(diao)用(yong)可能(neng)(neng)就會(hui)創(chuang)建一個數(shu)(shu)(shu)(shu)據(ju)(ju)(ju)(ju)庫(ku)連(lian)(lian)接(jie),此時就勢必會(hui)頻繁地創(chuang)建和銷毀(hui)數(shu)(shu)(shu)(shu)據(ju)(ju)(ju)(ju)庫(ku)連(lian)(lian)接(jie),性能(neng)(neng)是非常(chang)低(di)下;但是如(ru)果用(yong)foreachPartitions算子一次性處(chu)理(li)一個partition的(de)(de)(de)數(shu)(shu)(shu)(shu)據(ju)(ju)(ju)(ju),那么(me)對于每個partition,只要(yao)創(chuang)建一個數(shu)(shu)(shu)(shu)據(ju)(ju)(ju)(ju)庫(ku)連(lian)(lian)接(jie)即(ji)可,然后執行(xing)批量插(cha)入操作,此時性能(neng)(neng)是比(bi)較(jiao)高的(de)(de)(de)。實踐中發現(xian)(xian),對于1萬條(tiao)左右(you)的(de)(de)(de)數(shu)(shu)(shu)(shu)據(ju)(ju)(ju)(ju)量寫MySQL,性能(neng)(neng)可以提升30%以上。
使用filter之后進行coalesce操(cao)作
通常對一個(ge)RDD執(zhi)行filter算子過濾(lv)掉RDD中較多(duo)數(shu)據(ju)(ju)后(比如30%以上的(de)(de)(de)(de)(de)數(shu)據(ju)(ju)),建議使用coalesce算子,手動減少(shao)(shao)(shao)(shao)RDD的(de)(de)(de)(de)(de)partition數(shu)量(liang),將RDD中的(de)(de)(de)(de)(de)數(shu)據(ju)(ju)壓(ya)縮到更(geng)少(shao)(shao)(shao)(shao)的(de)(de)(de)(de)(de)partition中去。因為filter之(zhi)后,RDD的(de)(de)(de)(de)(de)每個(ge)partition中都會有(you)很多(duo)數(shu)據(ju)(ju)被過濾(lv)掉,此(ci)時如果照(zhao)常進行后續(xu)的(de)(de)(de)(de)(de)計算,其實每個(ge)task處理(li)的(de)(de)(de)(de)(de)partition中的(de)(de)(de)(de)(de)數(shu)據(ju)(ju)量(liang)并不是很多(duo),有(you)一點資源(yuan)浪費,而且此(ci)時處理(li)的(de)(de)(de)(de)(de)task越(yue)多(duo),可(ke)能速度反(fan)而越(yue)慢(man)。因此(ci)用coalesce減少(shao)(shao)(shao)(shao)partition數(shu)量(liang),將RDD中的(de)(de)(de)(de)(de)數(shu)據(ju)(ju)壓(ya)縮到更(geng)少(shao)(shao)(shao)(shao)的(de)(de)(de)(de)(de)partition之(zhi)后,只要使用更(geng)少(shao)(shao)(shao)(shao)的(de)(de)(de)(de)(de)task即可(ke)處理(li)完所有(you)的(de)(de)(de)(de)(de)partition。在某些場(chang)景下,對于(yu)性能的(de)(de)(de)(de)(de)提升會有(you)一定的(de)(de)(de)(de)(de)幫助。
使(shi)用repartitionAndSortWithinPartitions替代repartition與sort類操作(zuo)
repartitionAndSortWithinPartitions是Spark官(guan)網推(tui)薦的(de)一(yi)(yi)個算(suan)子,官(guan)方建議(yi),如果需(xu)要在repartition重分區(qu)(qu)之后,還要進行(xing)排(pai)序,建議(yi)直接使(shi)用repartitionAndSortWithinPartitions算(suan)子。因為該算(suan)子可以一(yi)(yi)邊進行(xing)重分區(qu)(qu)的(de)shuffle操作,一(yi)(yi)邊進行(xing)排(pai)序。shuffle與sort兩個操作同(tong)時(shi)進行(xing),比(bi)先shuffle再sort來說(shuo),性能(neng)可能(neng)是要高的(de)。
原則七:廣播大變量
有時在開(kai)發過(guo)程中,會遇(yu)到需要在算(suan)子(zi)函數(shu)中使(shi)用外部變(bian)量的(de)(de)場景(尤其是大(da)變(bian)量,比如100M以上(shang)的(de)(de)大(da)集合),那么此時就應該使(shi)用Spark的(de)(de)廣(guang)播(Broadcast)功(gong)能來提升(sheng)性能。
在(zai)算(suan)子(zi)函數(shu)中(zhong)使用(yong)到外(wai)部變(bian)量時(shi),默認(ren)情況下,Spark會(hui)將該變(bian)量復制(zhi)多(duo)(duo)個副(fu)本(ben),通(tong)過網(wang)絡傳輸到task中(zhong),此時(shi)每(mei)個task都(dou)有一(yi)個變(bian)量副(fu)本(ben)。如果變(bian)量本(ben)身(shen)比(bi)(bi)較大的話(hua)(比(bi)(bi)如100M,甚至1G),那么大量的變(bian)量副(fu)本(ben)在(zai)網(wang)絡中(zhong)傳輸的性能開銷,以(yi)及在(zai)各個節點的Executor中(zhong)占用(yong)過多(duo)(duo)內存導致的頻(pin)繁(fan)GC,都(dou)會(hui)極大地影響性能。
因此對(dui)于上述情(qing)況,如果使(shi)用(yong)的(de)(de)(de)(de)(de)外部變量比較大(da),建議使(shi)用(yong)Spark的(de)(de)(de)(de)(de)廣(guang)播(bo)(bo)功能,對(dui)該(gai)變量進行(xing)廣(guang)播(bo)(bo)。廣(guang)播(bo)(bo)后的(de)(de)(de)(de)(de)變量,會保證每個Executor的(de)(de)(de)(de)(de)內(nei)存(cun)中(zhong),只駐留一(yi)份變量副本(ben),而(er)(er)Executor中(zhong)的(de)(de)(de)(de)(de)task執行(xing)時共享該(gai)Executor中(zhong)的(de)(de)(de)(de)(de)那份變量副本(ben)。這樣的(de)(de)(de)(de)(de)話,可以大(da)大(da)減(jian)少(shao)變量副本(ben)的(de)(de)(de)(de)(de)數量,從而(er)(er)減(jian)少(shao)網絡傳輸的(de)(de)(de)(de)(de)性能開(kai)銷,并減(jian)少(shao)對(dui)Executor內(nei)存(cun)的(de)(de)(de)(de)(de)占用(yong)開(kai)銷,降低GC的(de)(de)(de)(de)(de)頻率。
廣播大變(bian)量的(de)代碼(ma)示例
// 以下(xia)代碼在算(suan)子函數中,使用了外部(bu)的變量。
// 此時沒有做任(ren)何特殊操作,每個(ge)task都會有一份list1的(de)副本。
val list1 = ...
rdd1.map(list1...)
// 以下代(dai)碼(ma)將(jiang)list1封(feng)裝成了Broadcast類型(xing)的廣播變量。
// 在算子(zi)函數中,使(shi)用廣播變(bian)量(liang)時,首先會(hui)判斷當前task所在Executor內存中,是(shi)否有(you)變(bian)量(liang)副本(ben)。
// 如果有(you)(you)則直接(jie)使用;如果沒有(you)(you)則從Driver或者其他Executor節點上遠程拉(la)取(qu)一份(fen)放(fang)到本地Executor內存中。
// 每個(ge)Executor內存中,就只會駐留一份廣播變(bian)量副本(ben)。
val list1 = ...
val list1Broadcast = sc.broadcast(list1)
rdd1.map(list1Broadcast...)
原則八(ba):使用Kryo優(you)化序列化性能(neng)
在(zai)Spark中(zhong),主要有三(san)個地方涉及到了序列(lie)化:
- 在算子函數(shu)中使用到外部變量(liang)時,該(gai)變量(liang)會被序(xu)列(lie)化后進行網絡(luo)傳輸(見“原則七:廣播大變量(liang)”中的(de)講解)。
- 將自(zi)定義的類(lei)(lei)型(xing)(xing)(xing)作(zuo)為RDD的泛型(xing)(xing)(xing)類(lei)(lei)型(xing)(xing)(xing)時(shi)(比如JavaRDD,Student是自(zi)定義類(lei)(lei)型(xing)(xing)(xing)),所有自(zi)定義類(lei)(lei)型(xing)(xing)(xing)對象(xiang),都會進(jin)行(xing)序列(lie)化(hua)。因(yin)此這(zhe)種情況下(xia),也要求(qiu)自(zi)定義的類(lei)(lei)必須實(shi)現Serializable接口。
- 使用(yong)可序列化(hua)的(de)持久化(hua)策略時(比如MEMORY_ONLY_SER),Spark會將RDD中的(de)每(mei)個partition都序列化(hua)成(cheng)一(yi)個大(da)的(de)字節數組。
對于(yu)這三種出現(xian)序(xu)(xu)(xu)(xu)列(lie)(lie)化(hua)(hua)(hua)(hua)(hua)(hua)(hua)的地(di)方(fang),我們都可以通過使(shi)(shi)用Kryo序(xu)(xu)(xu)(xu)列(lie)(lie)化(hua)(hua)(hua)(hua)(hua)(hua)(hua)類庫,來優化(hua)(hua)(hua)(hua)(hua)(hua)(hua)序(xu)(xu)(xu)(xu)列(lie)(lie)化(hua)(hua)(hua)(hua)(hua)(hua)(hua)和(he)(he)反(fan)(fan)序(xu)(xu)(xu)(xu)列(lie)(lie)化(hua)(hua)(hua)(hua)(hua)(hua)(hua)的性能。Spark默(mo)認使(shi)(shi)用的是(shi)Java的序(xu)(xu)(xu)(xu)列(lie)(lie)化(hua)(hua)(hua)(hua)(hua)(hua)(hua)機(ji)制,也就是(shi)ObjectOutputStream/ObjectInputStream API來進(jin)行序(xu)(xu)(xu)(xu)列(lie)(lie)化(hua)(hua)(hua)(hua)(hua)(hua)(hua)和(he)(he)反(fan)(fan)序(xu)(xu)(xu)(xu)列(lie)(lie)化(hua)(hua)(hua)(hua)(hua)(hua)(hua)。但是(shi)Spark同時支(zhi)持(chi)使(shi)(shi)用Kryo序(xu)(xu)(xu)(xu)列(lie)(lie)化(hua)(hua)(hua)(hua)(hua)(hua)(hua)庫,Kryo序(xu)(xu)(xu)(xu)列(lie)(lie)化(hua)(hua)(hua)(hua)(hua)(hua)(hua)類庫的性能比Java序(xu)(xu)(xu)(xu)列(lie)(lie)化(hua)(hua)(hua)(hua)(hua)(hua)(hua)類庫的性能要(yao)(yao)高很(hen)多。官方(fang)介紹,Kryo序(xu)(xu)(xu)(xu)列(lie)(lie)化(hua)(hua)(hua)(hua)(hua)(hua)(hua)機(ji)制比Java序(xu)(xu)(xu)(xu)列(lie)(lie)化(hua)(hua)(hua)(hua)(hua)(hua)(hua)機(ji)制,性能高10倍(bei)左右。Spark之所以默(mo)認沒有使(shi)(shi)用Kryo作為序(xu)(xu)(xu)(xu)列(lie)(lie)化(hua)(hua)(hua)(hua)(hua)(hua)(hua)類庫,是(shi)因(yin)為Kryo要(yao)(yao)求最好要(yao)(yao)注冊所有需要(yao)(yao)進(jin)行序(xu)(xu)(xu)(xu)列(lie)(lie)化(hua)(hua)(hua)(hua)(hua)(hua)(hua)的自定義類型,因(yin)此對于(yu)開發者來說(shuo),這種方(fang)式(shi)比較麻煩。
以(yi)下(xia)是使用Kryo的(de)(de)代碼示例,我們只要(yao)設置(zhi)序列化(hua)類,再注(zhu)冊要(yao)序列化(hua)的(de)(de)自定義(yi)類型(xing)即可(比(bi)如算(suan)子函數中使用到的(de)(de)外部變量類型(xing)、作為(wei)RDD泛型(xing)類型(xing)的(de)(de)自定義(yi)類型(xing)等):
// 創建SparkConf對象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 設置序列化器為KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注冊要序列(lie)化的自定義(yi)類型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
原則九:優化數據結構
Java中,有(you)三種類型(xing)比較(jiao)耗(hao)費內存:
- 對(dui)象(xiang)(xiang),每個Java對(dui)象(xiang)(xiang)都有對(dui)象(xiang)(xiang)頭、引用等額外的信(xin)息,因此比較占用內存(cun)空間。
- 字符串,每個字符串內部都有(you)一(yi)個字符數組以及長度等額外信息。
- 集(ji)(ji)合(he)類(lei)型(xing),比(bi)如HashMap、LinkedList等,因(yin)為集(ji)(ji)合(he)類(lei)型(xing)內(nei)部(bu)通常(chang)會(hui)使(shi)用一些內(nei)部(bu)類(lei)來封裝(zhuang)集(ji)(ji)合(he)元素,比(bi)如Map.Entry。
因此Spark官方建議(yi),在Spark編碼實現(xian)中(zhong),特(te)別是對于算(suan)子函數(shu)中(zhong)的代(dai)(dai)碼,盡量不(bu)要使(shi)(shi)用上述(shu)三種數(shu)據結構,盡量使(shi)(shi)用字符(fu)串(chuan)(chuan)替(ti)代(dai)(dai)對象,使(shi)(shi)用原(yuan)始(shi)類型(xing)(比如(ru)Int、Long)替(ti)代(dai)(dai)字符(fu)串(chuan)(chuan),使(shi)(shi)用數(shu)組替(ti)代(dai)(dai)集合(he)類型(xing),這樣盡可(ke)能地減少(shao)內存占用,從而降低GC頻率,提升性(xing)能。
但是(shi)(shi)(shi)在(zai)筆者(zhe)的(de)(de)編(bian)碼(ma)(ma)(ma)(ma)實踐中發(fa)現,要做(zuo)到該原(yuan)則(ze)其實并不容(rong)易。因(yin)為我們同時要考(kao)慮到代(dai)(dai)碼(ma)(ma)(ma)(ma)的(de)(de)可(ke)維護(hu)性(xing)(xing),如果一個(ge)代(dai)(dai)碼(ma)(ma)(ma)(ma)中,完全(quan)沒有(you)任何對象(xiang)抽象(xiang),全(quan)部是(shi)(shi)(shi)字(zi)符串拼(pin)接(jie)的(de)(de)方式,那么對于后續的(de)(de)代(dai)(dai)碼(ma)(ma)(ma)(ma)維護(hu)和(he)修(xiu)改(gai),無(wu)疑是(shi)(shi)(shi)一場巨大的(de)(de)災難(nan)。同理,如果所有(you)操作都(dou)基于數(shu)組實現,而不使用HashMap、LinkedList等集合(he)類型,那么對于我們的(de)(de)編(bian)碼(ma)(ma)(ma)(ma)難(nan)度以(yi)及(ji)代(dai)(dai)碼(ma)(ma)(ma)(ma)可(ke)維護(hu)性(xing)(xing),也(ye)是(shi)(shi)(shi)一個(ge)極大的(de)(de)挑(tiao)戰。因(yin)此筆者(zhe)建(jian)議,在(zai)可(ke)能以(yi)及(ji)合(he)適的(de)(de)情況下,使用占用內存較少的(de)(de)數(shu)據結構,但是(shi)(shi)(shi)前提是(shi)(shi)(shi)要保(bao)證代(dai)(dai)碼(ma)(ma)(ma)(ma)的(de)(de)可(ke)維護(hu)性(xing)(xing)。
資(zi)源調優
調優概述
在(zai)開發(fa)完Spark作(zuo)(zuo)(zuo)業之(zhi)后,就(jiu)該為(wei)作(zuo)(zuo)(zuo)業配(pei)置合適的(de)資(zi)(zi)源(yuan)了。Spark的(de)資(zi)(zi)源(yuan)參(can)(can)數(shu),基(ji)本(ben)都可(ke)以(yi)(yi)在(zai)spark-submit命(ming)令中作(zuo)(zuo)(zuo)為(wei)參(can)(can)數(shu)設(she)置。很多Spark初學者,通常不(bu)知道該設(she)置哪些必(bi)(bi)要的(de)參(can)(can)數(shu),以(yi)(yi)及如(ru)(ru)何設(she)置這些參(can)(can)數(shu),最后就(jiu)只(zhi)能胡亂(luan)設(she)置,甚至壓(ya)根兒不(bu)設(she)置。資(zi)(zi)源(yuan)參(can)(can)數(shu)設(she)置的(de)不(bu)合理,可(ke)能會導(dao)致沒(mei)有充分利用(yong)集群資(zi)(zi)源(yuan),作(zuo)(zuo)(zuo)業運行(xing)(xing)會極其(qi)緩慢;或者設(she)置的(de)資(zi)(zi)源(yuan)過大(da),隊(dui)列(lie)沒(mei)有足夠的(de)資(zi)(zi)源(yuan)來提(ti)供,進(jin)而導(dao)致各(ge)種異常。總(zong)之(zhi),無論(lun)是(shi)哪種情況,都會導(dao)致Spark作(zuo)(zuo)(zuo)業的(de)運行(xing)(xing)效率低下,甚至根本(ben)無法運行(xing)(xing)。因此我(wo)們必(bi)(bi)須(xu)對Spark作(zuo)(zuo)(zuo)業的(de)資(zi)(zi)源(yuan)使用(yong)原(yuan)理有一個(ge)清晰(xi)的(de)認識,并知道在(zai)Spark作(zuo)(zuo)(zuo)業運行(xing)(xing)過程中,有哪些資(zi)(zi)源(yuan)參(can)(can)數(shu)是(shi)可(ke)以(yi)(yi)設(she)置的(de),以(yi)(yi)及如(ru)(ru)何設(she)置合適的(de)參(can)(can)數(shu)值(zhi)。
Spark作業基本(ben)運(yun)行原理(li)

詳(xiang)細(xi)原理見(jian)上圖。
我(wo)們使用spark-submit提交一(yi)個Spark作業之后(hou),這個作業就(jiu)(jiu)會啟動(dong)一(yi)個對應(ying)的(de)Driver進程(cheng)。根據你使用的(de)部署模式(deploy-mode)不同,Driver進程(cheng)可(ke)能(neng)在(zai)本地(di)啟動(dong),也(ye)可(ke)能(neng)在(zai)集(ji)群(qun)(qun)中(zhong)某個工(gong)作節點(dian)上啟動(dong)。Driver進程(cheng)本身會根據我(wo)們設(she)置的(de)參數,占有(you)一(yi)定(ding)數量(liang)的(de)內(nei)存和CPU core。而(er)Driver進程(cheng)要做的(de)第(di)一(yi)件事情,就(jiu)(jiu)是(shi)(shi)向集(ji)群(qun)(qun)管(guan)(guan)理(li)器(qi)(可(ke)以是(shi)(shi)Spark Standalone集(ji)群(qun)(qun),也(ye)可(ke)以是(shi)(shi)其他的(de)資(zi)(zi)(zi)(zi)源(yuan)(yuan)管(guan)(guan)理(li)集(ji)群(qun)(qun),美(mei)團?大眾點(dian)評使用的(de)是(shi)(shi)YARN作為(wei)資(zi)(zi)(zi)(zi)源(yuan)(yuan)管(guan)(guan)理(li)集(ji)群(qun)(qun))申請運行Spark作業需要使用的(de)資(zi)(zi)(zi)(zi)源(yuan)(yuan),這里的(de)資(zi)(zi)(zi)(zi)源(yuan)(yuan)指的(de)就(jiu)(jiu)是(shi)(shi)Executor進程(cheng)。YARN集(ji)群(qun)(qun)管(guan)(guan)理(li)器(qi)會根據我(wo)們為(wei)Spark作業設(she)置的(de)資(zi)(zi)(zi)(zi)源(yuan)(yuan)參數,在(zai)各個工(gong)作節點(dian)上,啟動(dong)一(yi)定(ding)數量(liang)的(de)Executor進程(cheng),每個Executor進程(cheng)都(dou)占有(you)一(yi)定(ding)數量(liang)的(de)內(nei)存和CPU core。
在申請到了作業(ye)執(zhi)(zhi)行(xing)(xing)所(suo)需的(de)(de)(de)資源之后(hou),Driver進(jin)程就會開始調度和執(zhi)(zhi)行(xing)(xing)我們(men)(men)編寫(xie)的(de)(de)(de)作業(ye)代(dai)碼(ma)了。Driver進(jin)程會將(jiang)我們(men)(men)編寫(xie)的(de)(de)(de)Spark作業(ye)代(dai)碼(ma)分拆為(wei)多個(ge)(ge)(ge)stage,每(mei)個(ge)(ge)(ge)stage執(zhi)(zhi)行(xing)(xing)一(yi)(yi)部(bu)分代(dai)碼(ma)片(pian)段,并為(wei)每(mei)個(ge)(ge)(ge)stage創建一(yi)(yi)批task,然后(hou)將(jiang)這些(xie)task分配(pei)到各個(ge)(ge)(ge)Executor進(jin)程中(zhong)執(zhi)(zhi)行(xing)(xing)。task是(shi)(shi)最小的(de)(de)(de)計(ji)算(suan)(suan)單元,負(fu)責執(zhi)(zhi)行(xing)(xing)一(yi)(yi)模一(yi)(yi)樣(yang)的(de)(de)(de)計(ji)算(suan)(suan)邏輯(ji)(也(ye)就是(shi)(shi)我們(men)(men)自(zi)己編寫(xie)的(de)(de)(de)某(mou)個(ge)(ge)(ge)代(dai)碼(ma)片(pian)段),只是(shi)(shi)每(mei)個(ge)(ge)(ge)task處理的(de)(de)(de)數據(ju)(ju)不同而已。一(yi)(yi)個(ge)(ge)(ge)stage的(de)(de)(de)所(suo)有task都執(zhi)(zhi)行(xing)(xing)完(wan)畢之后(hou),會在各個(ge)(ge)(ge)節點(dian)本地的(de)(de)(de)磁盤文(wen)件中(zhong)寫(xie)入計(ji)算(suan)(suan)中(zhong)間結(jie)(jie)(jie)果,然后(hou)Driver就會調度運行(xing)(xing)下一(yi)(yi)個(ge)(ge)(ge)stage。下一(yi)(yi)個(ge)(ge)(ge)stage的(de)(de)(de)task的(de)(de)(de)輸入數據(ju)(ju)就是(shi)(shi)上一(yi)(yi)個(ge)(ge)(ge)stage輸出的(de)(de)(de)中(zhong)間結(jie)(jie)(jie)果。如此循(xun)環往復(fu),直到將(jiang)我們(men)(men)自(zi)己編寫(xie)的(de)(de)(de)代(dai)碼(ma)邏輯(ji)全(quan)部(bu)執(zhi)(zhi)行(xing)(xing)完(wan),并且計(ji)算(suan)(suan)完(wan)所(suo)有的(de)(de)(de)數據(ju)(ju),得到我們(men)(men)想要的(de)(de)(de)結(jie)(jie)(jie)果為(wei)止。
Spark是根據(ju)shuffle類算(suan)子(zi)來進(jin)行(xing)(xing)stage的(de)(de)劃(hua)分(fen)。如(ru)果(guo)我(wo)們(men)的(de)(de)代碼(ma)(ma)中執(zhi)(zhi)行(xing)(xing)了(le)某個(ge)(ge)shuffle類算(suan)子(zi)(比(bi)如(ru)reduceByKey、join等),那(nei)么就會在該算(suan)子(zi)處,劃(hua)分(fen)出一(yi)個(ge)(ge)stage界限來。可(ke)以(yi)大(da)致理解(jie)為,shuffle算(suan)子(zi)執(zhi)(zhi)行(xing)(xing)之前的(de)(de)代碼(ma)(ma)會被(bei)劃(hua)分(fen)為一(yi)個(ge)(ge)stage,shuffle算(suan)子(zi)執(zhi)(zhi)行(xing)(xing)以(yi)及之后(hou)的(de)(de)代碼(ma)(ma)會被(bei)劃(hua)分(fen)為下(xia)一(yi)個(ge)(ge)stage。因(yin)此一(yi)個(ge)(ge)stage剛開始執(zhi)(zhi)行(xing)(xing)的(de)(de)時候,它的(de)(de)每個(ge)(ge)task可(ke)能都(dou)會從上一(yi)個(ge)(ge)stage的(de)(de)task所在的(de)(de)節點,去通過網(wang)絡傳(chuan)輸拉取(qu)需要自己(ji)處理的(de)(de)所有(you)key,然后(hou)對拉取(qu)到的(de)(de)所有(you)相同的(de)(de)key使用我(wo)們(men)自己(ji)編寫的(de)(de)算(suan)子(zi)函數執(zhi)(zhi)行(xing)(xing)聚合(he)操作(zuo)(比(bi)如(ru)reduceByKey()算(suan)子(zi)接收(shou)的(de)(de)函數)。這個(ge)(ge)過程就是shuffle。
當我(wo)們在代碼中執(zhi)行(xing)了cache/persist等持(chi)(chi)久(jiu)化操作(zuo)時,根據(ju)我(wo)們選擇的(de)持(chi)(chi)久(jiu)化級(ji)別的(de)不同,每個task計算出(chu)來的(de)數據(ju)也會保存到Executor進程的(de)內存或者(zhe)所(suo)在節點(dian)的(de)磁盤文件中。
因此Executor的(de)內(nei)存主(zhu)要分為三(san)塊:第(di)一塊是(shi)讓(rang)(rang)(rang)task執行(xing)我們自己編寫的(de)代碼(ma)時使(shi)(shi)用,默(mo)(mo)認是(shi)占Executor總(zong)(zong)內(nei)存的(de)20%;第(di)二(er)塊是(shi)讓(rang)(rang)(rang)task通過shuffle過程拉取了(le)上一個stage的(de)task的(de)輸出(chu)后,進行(xing)聚合(he)等(deng)操作時使(shi)(shi)用,默(mo)(mo)認也是(shi)占Executor總(zong)(zong)內(nei)存的(de)20%;第(di)三(san)塊是(shi)讓(rang)(rang)(rang)RDD持久化時使(shi)(shi)用,默(mo)(mo)認占Executor總(zong)(zong)內(nei)存的(de)60%。
task的(de)(de)(de)執行速(su)度是(shi)跟每(mei)個(ge)Executor進(jin)程(cheng)的(de)(de)(de)CPU core數(shu)量有直接關系的(de)(de)(de)。一個(ge)CPU core同一時(shi)間只能(neng)執行一個(ge)線(xian)程(cheng)。而(er)每(mei)個(ge)Executor進(jin)程(cheng)上分配到(dao)的(de)(de)(de)多(duo)個(ge)task,都是(shi)以(yi)每(mei)個(ge)task一條(tiao)線(xian)程(cheng)的(de)(de)(de)方式,多(duo)線(xian)程(cheng)并發運行的(de)(de)(de)。如(ru)果CPU core數(shu)量比較充足(zu),而(er)且(qie)分配到(dao)的(de)(de)(de)task數(shu)量比較合理,那(nei)么(me)通常來(lai)說,可以(yi)比較快速(su)和高效地執行完這些(xie)task線(xian)程(cheng)。
以上就是Spark作業的(de)基本運行原(yuan)理(li)的(de)說明,大家可以結合上圖(tu)來理(li)解(jie)。理(li)解(jie)作業基本原(yuan)理(li),是我們進行資源參數(shu)調優(you)的(de)基本前提。
資(zi)源參數調優
了(le)解完了(le)Spark作(zuo)(zuo)業(ye)(ye)運行(xing)的(de)(de)基本原理之后,對資源(yuan)(yuan)相關的(de)(de)參(can)(can)數(shu)(shu)就(jiu)容(rong)易理解了(le)。所(suo)謂的(de)(de)Spark資源(yuan)(yuan)參(can)(can)數(shu)(shu)調優,其實主(zhu)要就(jiu)是對Spark運行(xing)過程中各(ge)個使用(yong)資源(yuan)(yuan)的(de)(de)地方,通過調節各(ge)種(zhong)參(can)(can)數(shu)(shu),來優化資源(yuan)(yuan)使用(yong)的(de)(de)效率,從而提升Spark作(zuo)(zuo)業(ye)(ye)的(de)(de)執行(xing)性(xing)能。以(yi)下參(can)(can)數(shu)(shu)就(jiu)是Spark中主(zhu)要的(de)(de)資源(yuan)(yuan)參(can)(can)數(shu)(shu),每(mei)個參(can)(can)數(shu)(shu)都對應(ying)著作(zuo)(zuo)業(ye)(ye)運行(xing)原理中的(de)(de)某個部分,我們同時也給出了(le)一個調優的(de)(de)參(can)(can)考值。
num-executors
- 參(can)數說明(ming):該參(can)數用于設置Spark作業(ye)總共要(yao)用多少個(ge)Executor進程(cheng)(cheng)來執行。Driver在向YARN集群(qun)管(guan)理(li)器申請資源(yuan)時(shi),YARN集群(qun)管(guan)理(li)器會盡可能按照你(ni)(ni)的設置來在集群(qun)的各個(ge)工(gong)作節點上,啟動相應數量的Executor進程(cheng)(cheng)。這個(ge)參(can)數非(fei)常(chang)之(zhi)重要(yao),如(ru)果不設置的話,默(mo)認只(zhi)會給你(ni)(ni)啟動少量的Executor進程(cheng)(cheng),此時(shi)你(ni)(ni)的Spark作業(ye)的運行速度是非(fei)常(chang)慢的。
- 參數調優建(jian)議:每個Spark作業(ye)的(de)運行(xing)一(yi)般設置50~100個左右的(de)Executor進(jin)程比較合適,設置太(tai)少(shao)或太(tai)多的(de)Executor進(jin)程都不好。設置的(de)太(tai)少(shao),無法(fa)充分利用(yong)集(ji)群資(zi)源;設置的(de)太(tai)多的(de)話,大部分隊列可能無法(fa)給(gei)予充分的(de)資(zi)源。
executor-memory
- 參(can)數說明:該參(can)數用于設置(zhi)每個Executor進程的(de)(de)內存。Executor內存的(de)(de)大(da)小,很(hen)多時(shi)候(hou)直接(jie)決定了Spark作業的(de)(de)性能,而且跟常見(jian)的(de)(de)JVM OOM異常,也(ye)有直接(jie)的(de)(de)關聯。
- 參數(shu)調優建議:每個(ge)Executor進程的(de)(de)內(nei)(nei)(nei)存設置4G~8G較為合適。但是這(zhe)(zhe)只是一個(ge)參考值,具(ju)體的(de)(de)設置還是得根據不同部門的(de)(de)資(zi)源(yuan)(yuan)隊(dui)列(lie)(lie)(lie)(lie)來定。可(ke)以(yi)看看自己(ji)團隊(dui)的(de)(de)資(zi)源(yuan)(yuan)隊(dui)列(lie)(lie)(lie)(lie)的(de)(de)最(zui)大內(nei)(nei)(nei)存限制是多少,num-executors乘以(yi)executor-memory,是不能(neng)超過隊(dui)列(lie)(lie)(lie)(lie)的(de)(de)最(zui)大內(nei)(nei)(nei)存量的(de)(de)。此(ci)外,如(ru)果你(ni)是跟團隊(dui)里其(qi)他人共享這(zhe)(zhe)個(ge)資(zi)源(yuan)(yuan)隊(dui)列(lie)(lie)(lie)(lie),那么申請(qing)的(de)(de)內(nei)(nei)(nei)存量最(zui)好(hao)不要超過資(zi)源(yuan)(yuan)隊(dui)列(lie)(lie)(lie)(lie)最(zui)大總內(nei)(nei)(nei)存的(de)(de)1/3~1/2,避免你(ni)自己(ji)的(de)(de)Spark作業占(zhan)用了隊(dui)列(lie)(lie)(lie)(lie)所有的(de)(de)資(zi)源(yuan)(yuan),導(dao)致別(bie)的(de)(de)同學(xue)的(de)(de)作業無(wu)法運行。
executor-cores
- 參數(shu)(shu)說明(ming):該參數(shu)(shu)用于設(she)置每(mei)(mei)個Executor進程(cheng)(cheng)(cheng)(cheng)的CPU core數(shu)(shu)量(liang)(liang)。這(zhe)個參數(shu)(shu)決(jue)定了每(mei)(mei)個Executor進程(cheng)(cheng)(cheng)(cheng)并(bing)行執(zhi)行task線(xian)程(cheng)(cheng)(cheng)(cheng)的能力。因為每(mei)(mei)個CPU core同(tong)一(yi)時間只能執(zhi)行一(yi)個task線(xian)程(cheng)(cheng)(cheng)(cheng),因此(ci)每(mei)(mei)個Executor進程(cheng)(cheng)(cheng)(cheng)的CPU core數(shu)(shu)量(liang)(liang)越多,越能夠(gou)快速地執(zhi)行完分配給自己的所(suo)有task線(xian)程(cheng)(cheng)(cheng)(cheng)。
- 參數(shu)調優建議:Executor的(de)CPU core數(shu)量(liang)設置為2~4個較為合適。同樣得根據不同部門的(de)資源隊(dui)列來(lai)定(ding),可以看(kan)(kan)看(kan)(kan)自己的(de)資源隊(dui)列的(de)最大CPU core限制是(shi)(shi)多少,再依據設置的(de)Executor數(shu)量(liang),來(lai)決定(ding)每(mei)個Executor進(jin)程可以分配(pei)到幾個CPU core。同樣建議,如(ru)果是(shi)(shi)跟他人共享這(zhe)個隊(dui)列,那(nei)么(me)num-executors * executor-cores不要超(chao)過(guo)隊(dui)列總(zong)CPU core的(de)1/3~1/2左右比較合適,也是(shi)(shi)避免影響其他同學(xue)的(de)作業運行(xing)。
driver-memory
- 參數(shu)說明:該參數(shu)用于設置Driver進程的內存。
- 參數調優建議:Driver的(de)(de)內存(cun)通常來說不設(she)置,或者設(she)置1G左(zuo)右應該(gai)就(jiu)夠了。唯一需要(yao)注意的(de)(de)一點是,如果(guo)需要(yao)使用collect算(suan)子(zi)將RDD的(de)(de)數據全部拉取到Driver上進行處理,那么必須確保Driver的(de)(de)內存(cun)足夠大,否(fou)則會出現OOM內存(cun)溢出的(de)(de)問題(ti)。
spark.default.parallelism
- 參數(shu)(shu)說明:該參數(shu)(shu)用于設(she)(she)置每個(ge)stage的(de)默認task數(shu)(shu)量。這個(ge)參數(shu)(shu)極為重要,如果不設(she)(she)置可能會直(zhi)接(jie)影響你的(de)Spark作業性能。
- 參(can)數(shu)(shu)(shu)調優(you)建(jian)(jian)議:Spark作業的(de)(de)(de)(de)默認task數(shu)(shu)(shu)量(liang)(liang)為(wei)(wei)500~1000個(ge)(ge)(ge)(ge)較(jiao)為(wei)(wei)合(he)(he)適。很多(duo)(duo)同學常犯(fan)的(de)(de)(de)(de)一(yi)(yi)個(ge)(ge)(ge)(ge)錯誤就(jiu)(jiu)(jiu)是(shi)(shi)不去設(she)置(zhi)這個(ge)(ge)(ge)(ge)參(can)數(shu)(shu)(shu),那么(me)此(ci)時(shi)就(jiu)(jiu)(jiu)會(hui)導(dao)致(zhi)(zhi)Spark自己根據底層HDFS的(de)(de)(de)(de)block數(shu)(shu)(shu)量(liang)(liang)來(lai)設(she)置(zhi)task的(de)(de)(de)(de)數(shu)(shu)(shu)量(liang)(liang),默認是(shi)(shi)一(yi)(yi)個(ge)(ge)(ge)(ge)HDFS block對應一(yi)(yi)個(ge)(ge)(ge)(ge)task。通(tong)常來(lai)說(shuo),Spark默認設(she)置(zhi)的(de)(de)(de)(de)數(shu)(shu)(shu)量(liang)(liang)是(shi)(shi)偏(pian)少(shao)的(de)(de)(de)(de)(比如就(jiu)(jiu)(jiu)幾十個(ge)(ge)(ge)(ge)task),如果task數(shu)(shu)(shu)量(liang)(liang)偏(pian)少(shao)的(de)(de)(de)(de)話,就(jiu)(jiu)(jiu)會(hui)導(dao)致(zhi)(zhi)你(ni)前面(mian)設(she)置(zhi)好(hao)的(de)(de)(de)(de)Executor的(de)(de)(de)(de)參(can)數(shu)(shu)(shu)都前功(gong)盡棄。試想一(yi)(yi)下,無論(lun)你(ni)的(de)(de)(de)(de)Executor進程(cheng)有(you)(you)多(duo)(duo)少(shao)個(ge)(ge)(ge)(ge),內存和CPU有(you)(you)多(duo)(duo)大,但是(shi)(shi)task只有(you)(you)1個(ge)(ge)(ge)(ge)或者10個(ge)(ge)(ge)(ge),那么(me)90%的(de)(de)(de)(de)Executor進程(cheng)可能(neng)根本就(jiu)(jiu)(jiu)沒有(you)(you)task執行(xing),也(ye)就(jiu)(jiu)(jiu)是(shi)(shi)白白浪費(fei)了資源!因此(ci)Spark官網建(jian)(jian)議的(de)(de)(de)(de)設(she)置(zhi)原則是(shi)(shi),設(she)置(zhi)該(gai)參(can)數(shu)(shu)(shu)為(wei)(wei)num-executors * executor-cores的(de)(de)(de)(de)2~3倍(bei)較(jiao)為(wei)(wei)合(he)(he)適,比如Executor的(de)(de)(de)(de)總CPU core數(shu)(shu)(shu)量(liang)(liang)為(wei)(wei)300個(ge)(ge)(ge)(ge),那么(me)設(she)置(zhi)1000個(ge)(ge)(ge)(ge)task是(shi)(shi)可以的(de)(de)(de)(de),此(ci)時(shi)可以充分地(di)利用Spark集(ji)群(qun)的(de)(de)(de)(de)資源。
spark.storage.memoryFraction
- 參數(shu)說明:該參數(shu)用于設置RDD持(chi)(chi)久(jiu)化(hua)數(shu)據(ju)在Executor內(nei)存(cun)(cun)(cun)中能占(zhan)的(de)比例,默認是0.6。也就(jiu)(jiu)是說,默認Executor 60%的(de)內(nei)存(cun)(cun)(cun),可以(yi)用來保存(cun)(cun)(cun)持(chi)(chi)久(jiu)化(hua)的(de)RDD數(shu)據(ju)。根(gen)據(ju)你選擇的(de)不(bu)同的(de)持(chi)(chi)久(jiu)化(hua)策略,如果(guo)內(nei)存(cun)(cun)(cun)不(bu)夠時,可能數(shu)據(ju)就(jiu)(jiu)不(bu)會持(chi)(chi)久(jiu)化(hua),或者數(shu)據(ju)會寫入磁盤。
- 參(can)數(shu)(shu)調優建議(yi):如(ru)果Spark作(zuo)(zuo)業中,有(you)較多(duo)的(de)(de)(de)RDD持久化操作(zuo)(zuo),該參(can)數(shu)(shu)的(de)(de)(de)值可(ke)以適當提高一些,保(bao)證(zheng)持久化的(de)(de)(de)數(shu)(shu)據能(neng)夠容納(na)在內存(cun)中。避免內存(cun)不夠緩存(cun)所有(you)的(de)(de)(de)數(shu)(shu)據,導(dao)致(zhi)(zhi)數(shu)(shu)據只(zhi)能(neng)寫入磁盤中,降低(di)了性能(neng)。但是如(ru)果Spark作(zuo)(zuo)業中的(de)(de)(de)shuffle類操作(zuo)(zuo)比(bi)較多(duo),而持久化操作(zuo)(zuo)比(bi)較少(shao),那么這個參(can)數(shu)(shu)的(de)(de)(de)值適當降低(di)一些比(bi)較合(he)適。此外,如(ru)果發現作(zuo)(zuo)業由(you)于頻繁的(de)(de)(de)gc導(dao)致(zhi)(zhi)運行緩慢(通過spark web ui可(ke)以觀(guan)察到作(zuo)(zuo)業的(de)(de)(de)gc耗(hao)時),意味著(zhu)task執行用戶代碼的(de)(de)(de)內存(cun)不夠用,那么同(tong)樣建議(yi)調低(di)這個參(can)數(shu)(shu)的(de)(de)(de)值。
spark.shuffle.memoryFraction
- 參(can)數(shu)說明:該(gai)參(can)數(shu)用于設置(zhi)shuffle過程(cheng)中(zhong)一(yi)個task拉(la)取(qu)到上個stage的(de)task的(de)輸出后,進行(xing)聚合(he)操作時能夠使用的(de)Executor內存的(de)比例(li),默認(ren)是(shi)0.2。也(ye)就是(shi)說,Executor默認(ren)只有20%的(de)內存用來進行(xing)該(gai)操作。shuffle操作在(zai)進行(xing)聚合(he)時,如果發現(xian)使用的(de)內存超出了這個20%的(de)限制,那么多余(yu)的(de)數(shu)據就會溢寫到磁盤文件中(zhong)去,此時就會極大(da)地降(jiang)低性(xing)能。
- 參數調優建議:如(ru)果Spark作業中的(de)(de)RDD持久化操(cao)作較(jiao)少,shuffle操(cao)作較(jiao)多時,建議降(jiang)低(di)持久化操(cao)作的(de)(de)內(nei)存占比,提高(gao)shuffle操(cao)作的(de)(de)內(nei)存占比比例(li),避(bi)免shuffle過程中數據過多時內(nei)存不(bu)夠用,必須溢寫到磁盤上,降(jiang)低(di)了性能。此外(wai),如(ru)果發現作業由于頻(pin)繁的(de)(de)gc導致運行緩慢,意味(wei)著task執(zhi)行用戶代碼的(de)(de)內(nei)存不(bu)夠用,那么同樣建議調低(di)這(zhe)個參數的(de)(de)值。
資源參數(shu)的調(diao)優(you),沒有(you)一個(ge)固定的值,需要同學(xue)們(men)根據自己(ji)的實際情況(包括Spark作業(ye)中的shuffle操(cao)作數(shu)量(liang)(liang)、RDD持久化操(cao)作數(shu)量(liang)(liang)以及spark web ui中顯示的作業(ye)gc情況),同時參考本篇文章中給出的原理(li)以及調(diao)優(you)建議(yi),合理(li)地(di)設置上述參數(shu)。
資源參數參考示(shi)例(li)
以下(xia)是一份spark-submit命(ming)令的示(shi)例(li),大(da)家可(ke)以參考(kao)一下(xia),并根據(ju)自(zi)己的實際情況(kuang)進行調節:
./bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3 \
寫在最后的(de)話
根據實(shi)踐經(jing)驗來看,大部分Spark作業經(jing)過本(ben)次基礎篇(pian)所講解(jie)(jie)的(de)(de)開(kai)發調(diao)優(you)與資(zi)源調(diao)優(you)之后(hou),一(yi)般都能(neng)以較高(gao)(gao)的(de)(de)性能(neng)運(yun)行了,足以滿足我們的(de)(de)需求(qiu)。但是(shi)在不同的(de)(de)生(sheng)產環境和項目背景下,可能(neng)會遇到(dao)其他更(geng)加棘手的(de)(de)問題(比如各種數(shu)據傾(qing)斜),也可能(neng)會遇到(dao)更(geng)高(gao)(gao)的(de)(de)性能(neng)要求(qiu)。為了應對(dui)這(zhe)些挑(tiao)戰(zhan),需要使用更(geng)高(gao)(gao)級(ji)的(de)(de)技巧來處理這(zhe)類(lei)問題。在后(hou)續(xu)的(de)(de)《Spark性能(neng)優(you)化指(zhi)南——高(gao)(gao)級(ji)篇(pian)》中,我們會詳(xiang)細(xi)講解(jie)(jie)數(shu)據傾(qing)斜調(diao)優(you)以及Shuffle調(diao)優(you)。
參(can)考:
