中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

Spark踩(cai)坑記(ji)——從RDD看(kan)集群調度

前言

在Spark的(de)使(shi)用中,性能的(de)調優(you)配置過程中,查(cha)閱(yue)了很多資料,本文的(de)思路是從spark最細節(jie)的(de)本質,即核心的(de)數據結構RDD出(chu)發(fa),到整(zheng)個(ge)Spark集群宏(hong)觀(guan)的(de)調度過程做一個(ge)整(zheng)理(li)歸(gui)納,從微觀(guan)到宏(hong)觀(guan)兩方面總結,方便自(zi)己在調優(you)過程中找尋問題,理(li)清思路,也加深自(zi)己對于分布式程序開(kai)發(fa)的(de)理(li)解。(有任何問題和紕漏還請各位大牛指出(chu)啦,我會第一時間改正)

RDD詳談

在Spark開山之作"Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing"的這篇paper中(以下簡稱RDD Paper),Matei等提出了RDD這種數據結構,文中開頭對RDD定義是:
A distributed memory abstraction that lets programmers perform in-memory computations on large clusters in a fault-tolerant manner.
也(ye)就是說RDD設計的核(he)心(xin)點為:

  • 內存計算
  • 適合于計算機集群
  • 有容錯方式

文中提到了對于RDD設計的最大挑戰便是在提供有效的容錯機制(fault tolerance efficiently),之前存在的基于內存存儲的集群抽象,例如分布式共享內存、鍵值存儲、數據庫等,更多是細粒度的(fine-grained)更新一個可變狀態表,而其容錯方式通常為在機器間進行數據復制或者日志更新,而這些方式很明顯會造成機器負載加大以及大量的網絡傳輸開銷。
而RDD則使用了粗粒度的(coarse-grained)轉換,即對于很多相同的數據項使用同一種操作(如map/filter/join),這種方式能夠通過記錄RDD之間的轉換從而刻畫RDD的繼承關系(lineage),而不是真實的數據,最終構成一個DAG(有向無環圖),而如果發生RDD丟失,RDD會有充足的信息來得知怎么從其他RDDs重新計算得到。
這也(ye)是(shi)(shi)RDD設計的(de)核(he)心理(li)念,接下(xia)來圍繞這一(yi)理(li)念我們來剖析,看RDD是(shi)(shi)怎么實現這種(zhong)高效的(de)容錯機制(zhi)的(de)。

RDD存儲結構

RDD實現的數據結構核心是(shi)一個五元組,如下(xia)表:

屬性 說明
分區列表-partitions 每個分區為RDD的一部分數據
依賴列表-dependencies table存儲其父RDD即依賴RDD
計算函數-compute 利用父分區計算RDD各分區的值
分區器-partitioner 指明RDD的分區方式(hash/range)
分區位置列表-preferredLocations 指明分區優先存放的結點位置

其中每個屬性的代(dai)碼如下:

// RDD中的依賴關系由一個Seq數據集來記錄,這里使用Seq的原因是經常取第一個元素或者遍歷
private var dependencies_: Seq[Dependency[_]] = null

// 分區列表定義在一個數組中,這里使用Array的原因是隨時使用下標來訪問分區內容
// @transient分區列表不需要被序列化
@transient private var partitions_: Array[Partition] = null

// 接口定義,具體由子類實現,對輸入的RDD分區進行計算
def compute(split: Partition, context: TaskContext): Iterator[T]

// 分區器
// 可選,子類可以重寫以指定新的分區方式,Spark支持Hash和Range兩種分區方式
@transient val partitioner: Option[Partitioner] = None

// 可選,子類可以指定分區的位置,如HadoopRDD可以重寫此方法,讓分區盡可能與數據在相同的節點上
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

在RDD Paper中,作者提到在抽象RDD時,一個很重要的點便是如何使得RDD能夠記錄RDD之間的繼承依賴關系(lineage),這種繼承關系來自豐富的轉移(Transformation)操作。所以作者提出了一種基于圖的表示方式來實現這個目標,這也正是上面RDD五種屬性的核心作用。
這五種屬性(xing)從(cong)spark誕生到新的(de)版本(ben)迭代(dai),一直在使用,沒(mei)有(you)增加也沒(mei)有(you)減少,所以(yi)可以(yi)說(shuo)Spark的(de)核心(xin)就是RDD,而RDD的(de)核心(xin)就是這五種屬性(xing)。

RDD的操作

Spark踩坑記——初試中對(dui)RDD的操作(zuo)也進(jin)行了簡單說明,在(zai)Spark中,對(dui)RDD的操作(zuo)可以分為(wei)Transformation和Action兩(liang)種(zhong),我們分別進(jin)行整(zheng)理說明:

Transformation

對于Transformation操作是(shi)指由一個RDD生成新(xin)RDD的(de)過程,其代表(biao)了是(shi)計算的(de)中間過程,其并不會觸發真實(shi)的(de)計算。

  • map(f:T=>U) : RDD[T]=>RDD[U]
    返回一個新的(de)分布式數據集,由每個原元素經過func函數轉(zhuan)換(huan)后組成

  • filter(f:T=>Bool) : RDD[T]=>RDD[T]
    返回一個(ge)新(xin)的(de)數據集(ji),由經過(guo)func函(han)數后返回值為true的(de)原元素(su)組成

  • flatMap(f:T=>Seq[U]) : RDD[T]=>RDD[U])
    類似于(yu)map,但是(shi)每一個(ge)輸(shu)入(ru)元素,會被(bei)映射為(wei)0到多個(ge)輸(shu)出(chu)元素(因此,func函數的返回值是(shi)一個(ge)Seq,而不(bu)是(shi)單一元素)

  • sample(withReplacement: Boolean, fraction: Double, seed: Long) : RDD[T]=>RDD[T]
    sample將RDD這個集合內的元素進行采樣,獲取所有元素的子集。用戶可以設定是否有放回的抽樣、百分比、隨機種子,進而決定采樣方式。
    withReplacement=true, 表示有放回的抽樣;
    withReplacement=false, 表示無放回的抽樣。
    如下圖:

    每個(ge)(ge)方框是一個(ge)(ge)RDD分區。通過sample函數(shu),采(cai)樣50%的數(shu)據(ju)(ju)。V1、V2、U1、U2、U3、U4采(cai)樣出數(shu)據(ju)(ju)V1和U1、U4,形成新的RDD。

  • groupByKey([numTasks]) : RDD[(K,V)]=>RDD[(K,Seq[V])]
    在一個由(K,V)對組成(cheng)的數(shu)據(ju)集上調用(yong),返(fan)回一個(K,Seq[V])對的數(shu)據(ju)集。注意:

    1. 默認情況下,使用與父RDD的partition數量對應的并行任務進行分組,也可以傳入numTask可選參數,根據數據量設置不同數目的Task。
    2. 另外如果相同key的value求和或者求平均,那么使用reduceByKey性能更好
  • reduceByKey(f:(V,V)=>V, [numTasks]) : RDD[(K, V)]=>RDD[(K, V)]
    在一個(ge)(K,V)對(dui)的(de)(de)數(shu)據(ju)集上(shang)使(shi)用(yong),返回一個(ge)(K,V)對(dui)的(de)(de)數(shu)據(ju)集,key相同的(de)(de)值,都被使(shi)用(yong)指定的(de)(de)reduce函數(shu)聚合到一起。和groupbykey類似,任(ren)務的(de)(de)個(ge)數(shu)是可(ke)以(yi)通過第二個(ge)可(ke)選參數(shu)來配置的(de)(de)。

  • union(otherDataset) : (RDD[T],RDD[T])=>RDD[T]
    返(fan)回(hui)一個(ge)新(xin)的數據集(ji),由原數據集(ji)和參數聯合而成(cheng)

  • join(otherDataset, [numTasks]) : (RDD[(K,V)],RDD[(K,W)])=>RDD[(K,(V,W))]
    返回key值相同的所有匹配對,如下圖:

    join操(cao)作會將兩個RDD中相同key值的(de)合并成key,pair(value1, value2)的(de)形式。

  • cogroup() : (RDD[(K,V)],RDD[(K,W)])=>RDD[(K,(Seq[V],Seq[W]))]
    cogroup函數將兩(liang)(liang)個(ge)RDD進行協同(tong)劃分。對(dui)在兩(liang)(liang)個(ge)RDD中(zhong)的(de)Key-Value類型的(de)元素(su),每個(ge)RDD相同(tong)Key的(de)元素(su)分別聚合為一(yi)個(ge)集(ji)(ji)合,并且(qie)返(fan)回(hui)兩(liang)(liang)個(ge)RDD中(zhong)對(dui)應Key的(de)元素(su)集(ji)(ji)合的(de)迭(die)代(dai)器(qi)(K, (Iterable[V], Iterable[w]))。其中(zhong),Key和Value,Value是兩(liang)(liang)個(ge)RDD下相同(tong)Key的(de)兩(liang)(liang)個(ge)數據集(ji)(ji)合的(de)迭(die)代(dai)器(qi)所構成的(de)元組(zu)。

  • cartesian(otherDataset) : (RDD[T],RDD[U])=>RDD[(T,U)]
    笛卡爾積。但在數據集T和U上調(diao)用時(shi),返回(hui)一個(T,U)對的數據集,所有元素交互進行笛卡爾積。

  • sortByKey([ascending], [numTasks]) : RDD[(K,V)]=>RDD[(K,V)]
    根據key值(zhi)進行排序,如果(guo)ascending設置為true則按照(zhao)升序排序

  • repartition(numPartitions) :
    對(dui)RDD中的(de)所有數據(ju)進(jin)行shuffle操作(zuo),建立更多或者更少的(de)分區使(shi)得更加平(ping)衡。往(wang)往(wang)需要通過網(wang)絡進(jin)行數據(ju)傳輸

Action

不同于(yu)Transformation操(cao)作,Action代表(biao)一(yi)次計算的(de)結束,不再產生新的(de)RDD,將結果(guo)返(fan)回到Driver程序。所(suo)以Transformation只是建立計算關系,而Action才是實際的(de)執(zhi)行(xing)者(zhe)。每個Action都會調用SparkContext的(de)runJob方法向集群正式提交請求(qiu),所(suo)以每個Action對應一(yi)個Job。

  • count() : RDD[T]=>Long
    返回數據(ju)集(ji)的元(yuan)素個數

  • countByKey() : RDD[T]=>Map[T, Long]
    對(dui)(K,V)類型的(de)(de)RDD有(you)效,返回一(yi)(yi)個(K,Int)對(dui)的(de)(de)Map,表示每一(yi)(yi)個key對(dui)應的(de)(de)元(yuan)素個數

  • collect() : RDD[T]=>Seq[T]
    在Driver中,以數(shu)(shu)組的形式,返回(hui)數(shu)(shu)據(ju)集(ji)(ji)的所有元素(su)。這(zhe)通(tong)常會在使用filter或者其(qi)它操作并返回(hui)一個足(zu)夠小的數(shu)(shu)據(ju)子集(ji)(ji)后再使用會比(bi)較(jiao)有用。

  • reduce(f:(T,T)=>T) : RDD[T]=>T
    通過函數func(接受兩個參數,返回一(yi)個參數)聚(ju)集數據(ju)集中的所有元素。這個功能必須可交(jiao)換且可關聯的,從而可以正確的被并(bing)行(xing)執(zhi)行(xing)。

  • saveAsTextFile(path:String)
    將(jiang)(jiang)數據(ju)集的元素(su),以textfile的形式,保存到本地文件(jian)系統(tong),HDFS或者任何(he)其它hadoop支持的文件(jian)系統(tong)。對(dui)于每個元素(su),Spark將(jiang)(jiang)會(hui)調(diao)用toString方法(fa),將(jiang)(jiang)它轉換(huan)為(wei)文件(jian)中的文本行

  • saveAsSequenceFile(path:String)
    將(jiang)數據集(ji)的(de)(de)元(yuan)素,以(yi)Hadoop sequencefile的(de)(de)格式,保存到指定的(de)(de)目錄下,本(ben)地系統,HDFS或者(zhe)任何其(qi)它hadoop支持的(de)(de)文件系統。這個只限于由key-value對組成,并實現了Hadoop的(de)(de)Writable接口,或者(zhe)隱式的(de)(de)可以(yi)轉(zhuan)換(huan)為Writable的(de)(de)RDD。(Spark包括了基(ji)本(ben)類型的(de)(de)轉(zhuan)換(huan),例如Int,Double,String,等(deng)(deng)等(deng)(deng))

  • saveAsObjectFile(path:String)
    利用Java的Serialization接(jie)口進行持久化操(cao)作,之后可以使用SparkContext.objectFile()重新(xin)load回內存(cun)

  • take(n)
    返回一(yi)個(ge)(ge)由數據(ju)集的(de)(de)(de)前(qian)(qian)n個(ge)(ge)元素(su)組(zu)成的(de)(de)(de)數組(zu)。注(zhu)意,這(zhe)個(ge)(ge)操(cao)作目前(qian)(qian)并非并行執行,而是由驅動程序計算(suan)所有的(de)(de)(de)元素(su)

  • takeSample(withReplacement, num, [seed])
    返(fan)回一個數(shu)組,在(zai)數(shu)據(ju)集中隨(sui)機(ji)采(cai)樣num個元(yuan)素組成(cheng),可以選擇(ze)是否用(yong)隨(sui)機(ji)數(shu)替換不足的部(bu)分,Seed用(yong)于指定的隨(sui)機(ji)數(shu)生成(cheng)器種子

  • takeOrdered(n, [ordering])
    返(fan)回前n個元素(su)(su),可以使(shi)用元素(su)(su)的自然順序,也可以使(shi)用用戶(hu)自定(ding)義(yi)comparator

  • first()
    返回數據集(ji)的(de)第一個元素(類似于take(1))

  • foreach(func)
    在數據集的每一個元素上,運行函數func進行更新。這通常用于邊緣效果,例如更新一個累加器,或者和外部存儲系統進行交互,例如HBase。關于foreach我在Spark踩坑記——數據庫(Hbase+Mysql)中對sparkstreaming的(de)foreach操(cao)作有詳細(xi)整理(li)

RDD依賴方式

RDD 的容錯機制是通過記錄更新來實現的,且記錄的是粗粒度的轉換操作。在外部,我們將記錄的信息稱為血統(Lineage)關系,而到了源碼級別,Apache Spark 記錄的則是 RDD 之間的依賴(Dependency)關系。在一次轉換操作中,創建得到的新 RDD 稱為子 RDD,提供數據的 RDD 稱為父 RDD,父 RDD 可能會存在多個,我們把子 RDD 與父 RDD 之間的關系稱為依賴關系,或者可以說是子 RDD 依賴于父 RDD。
依賴只保存父 RDD 信息,轉換操作的其他信息,如數據處理函數,會在創建 RDD 時候,保存在新的 RDD 內。依賴在 Apache Spark 源碼中的對應實現是 Dependency 抽象類。
Apache Spark 將(jiang)依(yi)賴(lai)進一步分為(wei)兩類,分別是窄(zhai)依(yi)賴(lai)(Narrow Dependency)和 Shuffle 依(yi)賴(lai)(Shuffle Dependency,在部分文獻中也(ye)被稱為(wei) Wide Dependency,即寬(kuan)依(yi)賴(lai))。

窄依賴(Narrow Dependency)

窄依賴中,父 RDD 中的一個分區最多只會被子 RDD 中的一個分區使用,換句話說,父 RDD 中,一個分區內的數據是不能被分割的,必須整個交付給子 RDD 中的一個分區。下圖展示了幾類常見的窄依賴及其對應的轉換操作。

Shuffle依賴(寬依賴 Shffle/Wide Dependency)

Shuffle 依賴中,父 RDD 中的分區可能會被多個子 RDD 分區使用。因為父 RDD 中一個分區內的數據會被分割,發送給子 RDD 的所有分區,因此 Shuffle 依賴也意味著父 RDD 與子 RDD 之間存在著 Shuffle 過程。下圖展示了幾類常見的Shuffle依賴及其對應的轉換操作。

需要說明的是,依賴關系時RDD到RDD之間的一種映射關系,是兩個RDD之間的依賴,那么如果在一次操作中涉及到多個父RDD,也有可能同時包含窄依賴和Shuffle依賴,如join操作:

集群部署

組件

說到Spark集群的部署,我們先來討論一下Spark中一些關鍵的組件,在我的博文Spark踩坑記——初試中,我對Master/Worker/Driver/Executor幾個關鍵概念做了闡述。首先,先上官方文檔中的一張圖:

官方文檔對其中的術語進行了總結,如下表:

從官方文(wen)檔摘抄了(le)這么多東(dong)東(dong),對Spark中基本的集群結(jie)構(gou),以(yi)及(ji)一個程序(xu)提交到Spark后的調度(du)情況我們有了(le)了(le)解。

部署方式

對于集(ji)群的部署(shu)方式(shi),Spark提供了多種集(ji)群部署(shu)方式(shi),如下:

  • Local模式:本地調試的一種模式,可以在一臺機器上完成程序的運行與調試
  • Standalone模式:即獨立模式,自帶完整的服務,可單獨部署到一個集群中,無需依賴任何其他資源管理系統。
  • Spark On YARN模式:將Spark搭建在Hadoop之上,由hadoop中的yarn負責資源調配,Spark負責計算任務;
  • Spark On Mesos模式:這是很多公司采用的模式,官方推薦這種模式(當然,原因之一是血緣關系)。正是由于Spark開發之初就考慮到支持Mesos,因此,目前而言,Spark運行在Mesos上會比運行在YARN上更加靈活,更加自然。目前在Spark On Mesos環境中,用戶可選擇兩種調度模式之一運行自己的應用程序。

集群部署舉例

由于在我平時的使用中,是直接采用的Standalone的部署方式,我這里將部署的框架做一個簡單的介紹,其他部署方式其實可以做一些參考來進行搭配部署:

假設(she)我(wo)們(men)(men)的(de)網(wang)段為10.214.55.x,其中1、2、3機器我(wo)們(men)(men)用(yong)作集(ji)群節(jie)點(dian),4和(he)5位(wei)master節(jie)點(dian),這里我(wo)們(men)(men)用(yong)到了zookeeper,關于(yu)zookeeper的(de)介(jie)紹大家可(ke)以(yi)在網(wang)上搜搜,我(wo)們(men)(men)這里加(jia)入zk的(de)目的(de)就(jiu)是master節(jie)點(dian)如果(guo)崩潰后進行(xing)一個主備切換,保證集(ji)群能夠繼續(xu)正(zheng)常(chang)運行(xing)。如果(guo)我(wo)們(men)(men)在1提(ti)交我(wo)們(men)(men)的(de)應用(yong),那么(me)2和(he)3就(jiu)將(jiang)作為我(wo)們(men)(men)的(de)worker節(jie)點(dian)參與運算。而關于(yu)配(pei)(pei)置文件中需(xu)要的(de)具體配(pei)(pei)置項可(ke)以(yi)參考官(guan)方文檔:

從RDD看集群任務調度

上文我們從微(wei)觀(guan)和(he)宏觀(guan)兩個角度對(dui)Spark進行(xing)了(le)總結,RDD以(yi)(yi)及RDD的(de)依賴,Spark集(ji)(ji)群以(yi)(yi)及部署,那么當我們在提交了(le)一個任務或者說Application到Spark集(ji)(ji)群時,它是怎么運作的(de)呢(ni)?

  • 首先我們通過maven或者sbt等,將我們的應用以及其依賴的jar包完整的打包,利用spark-submit命令將jar提交到spark;
  • 提交程序的這個Spark節點會作為Driver節點,并從Cluster Manager中獲取資源;
  • 程序會在worker節點中獲得executor用來執行我們的任務;
  • 在spark程序中每次RDD的action變換會產生一個新的job,每個job包含多個task;
  • 而RDD在進行Transformation時,會產生新的stage;
  • task會被送往各個executor運行;
  • 而最終的計算結果會回到driver節點進行匯總并輸出(如reduceByKey)。

針對這個過程,我們可以從微觀和宏觀兩個角度把控,將RDD的操作依賴關系,以及task在集群間的分配情況綜合起來看,如下圖:

Spark監控界面

在提交Spark任務時,我們可以在提交命令中加入一項參數--conf spark.ui.port=xxxx,其中"xxxx"為你需要的端口號,這樣在瀏覽器中我們就可以利用Spark提供的UI界面對Application的運行情況進行監控如下圖:

踩坑小記

在spark平時的(de)使(shi)用過程當(dang)中,由于(yu)程序(xu)在整個集群當(dang)中奔(ben)跑,經常會遇到很多莫名其妙(miao)的(de)錯誤(wu),有時候通過日(ri)志給(gei)定的(de)錯誤(wu)很難真(zhen)的(de)定位到真(zhen)正(zheng)的(de)原因,那叫一(yi)個憂傷(shang)阿T^T

Driver程序崩潰

出(chu)現這(zhe)類(lei)(lei)(lei)錯誤(wu),往(wang)(wang)往(wang)(wang)日志中(zhong)會(hui)提(ti)到(dao)(dao)JVM。在Spark中(zhong)大多數操作會(hui)分擔到(dao)(dao)各個結點(dian)(dian)的(de)worker進行計(ji)算,但是對于shuffle類(lei)(lei)(lei)操作,如我們經常會(hui)用的(de)reduceByKey或者collect等,都會(hui)使得spark將所有結點(dian)(dian)的(de)數據(ju)匯總到(dao)(dao)driver進行計(ji)算,這(zhe)樣就會(hui)導(dao)致(zhi)driver需要遠大于正常worker的(de)內存,所以(yi)遇(yu)到(dao)(dao)這(zhe)類(lei)(lei)(lei)問題,最先(xian)可以(yi)考(kao)慮的(de)便(bian)是增加driver結點(dian)(dian)的(de)內存,增加方式如下:

--driver-memory 15g

kafka編碼錯誤

在利用(yong)spark streaming的(de)python版本,消費(fei)kafka數據的(de)時候(hou),遇到(dao)類似(si)下面的(de)問題:

UnicodeDecodeError: 'utf8' codec can't decode byte 0x85 in position 87: invalid start byte

我們知道python2中的字符串形式有兩種即unicode形式和普通str形式,通過反復分析日志和查看kafka.py的源碼找到了問題所在。首先在中,找到createStream函數的如下說明:

圖中紅框內(nei)清楚的(de)說明了(le),在解析kafka傳來的(de)數(shu)據的(de)時候,默認使用了(le)utf8_decoder函數(shu),那(nei)這個東東是個什(shen)么玩(wan)意呢,找到kafka.py的(de)源碼,其(qi)定義如下:

# 默認解碼器
def utf8_decoder(s):
    """ Decode the unicode as UTF-8 """
    if s is None:
        return None
    return s.decode('utf-8')

class KafkaUtils(object):

    @staticmethod
    def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None,
                     storageLevel=StorageLevel.MEMORY_AND_DISK_2,
                     keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
        """
        Create an input stream that pulls messages from a Kafka Broker.
        :param ssc:  StreamingContext object
        :param zkQuorum:  Zookeeper quorum (hostname:port,hostname:port,..).
        :param groupId:  The group id for this consumer.
        :param topics:  Dict of (topic_name -> numPartitions) to consume.
                        Each partition is consumed in its own thread.
        :param kafkaParams: Additional params for Kafka
        :param storageLevel:  RDD storage level.
        :param keyDecoder:  A function used to decode key (default is utf8_decoder)
        :param valueDecoder:  A function used to decode value (default is utf8_decoder)
        :return: A DStream object
        """
        if kafkaParams is None:
            kafkaParams = dict()
        kafkaParams.update({
            "zookeeper.connect": zkQuorum,
            "group.id": groupId,
            "zookeeper.connection.timeout.ms": "10000",
        })
        if not isinstance(topics, dict):
            raise TypeError("topics should be dict")
        jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
        helper = KafkaUtils._get_helper(ssc._sc)
        jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel)
        ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
        stream = DStream(jstream, ssc, ser)
        return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1])))
...

我們看到(dao)默(mo)認(ren)的(de)解(jie)碼器(qi)直接調用了(le)s.decode,那么當kafka傳來的(de)數據(ju)中有非utf8編碼的(de)字符時,整個stage就會掛掉,所以修改(gai)如下:

def my_uft8_decoder(s):
    if s is None:
        return None
    try:
        return s.decode('utf-8', 'replace')
    except Exception, e:
        print e;
        return None

# 創建stream時傳入
kafkaStream = KafkaUtils.createStream(ssc, \
     conf.kafka_quorum, conf.kafka_consumer_group, {conf.kafka_topic:conf.spark_streaming_topic_parallelism}, {
        "auto.commit.interval.ms":"50000",
        "auto.offset.reset":"smallest",
        },
        StorageLevel.MEMORY_AND_DISK_SER,
        valueDecoder=my_uft8_decoder
)

如果采用createDirectStream來創建context與(yu)此類似,不再(zai)贅述。所以在pyspark的kafka消費中遇到解碼(ma)問題可以關注一下(xia)這里。

總結

挺長的(de)(de)一(yi)(yi)篇(pian)整理(li),前后拖了很久。本篇(pian)博文(wen)我(wo)的(de)(de)構思主要(yao)就是,當我(wo)們提交了一(yi)(yi)個應用到(dao)Spark時(shi),我(wo)們需(xu)要(yao)大致(zhi)了解(jie)Spark做(zuo)了什么,這(zhe)里我(wo)并沒有(you)分(fen)(fen)析源碼(因為我(wo)木有(you)看哈(ha)哈(ha))。從最微觀(guan)的(de)(de)RDD的(de)(de)操作,到(dao)宏觀(guan)的(de)(de)整個集群的(de)(de)調度(du)運算,這(zhe)樣(yang)從RDD看集群調度(du)就有(you)了一(yi)(yi)個整體的(de)(de)認識,當遇到(dao)問題(ti)的(de)(de)時(shi)候就更容易排查(cha),遇到(dao)性能拼瓶(ping)頸也容易查(cha)找(zhao)。OK,這(zhe)就是這(zhe)篇(pian)博文(wen)的(de)(de)全部整理(li)哈(ha),其中(zhong)末尾部分(fen)(fen)闡述了在實際(ji)項目中(zhong)遇到(dao)的(de)(de)一(yi)(yi)些(xie)問題(ti)和坑,如(ru)果有(you)相似的(de)(de)問題(ti)的(de)(de)朋友(you)可以參考(kao)下。

做個小廣告,項目是WeTest輿情,企鵝風訊,感興趣的歡迎大家來踩踩:

參考文獻:

  1. 《Spark最佳實踐》陳歡 林世飛(鵝廠大神的作品v
  2. Zaharia M, Chowdhury M, Das T, et al. Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing[C]//Proceedings of the 9th USENIX conference on Networked Systems Design and Implementation. USENIX Association, 2012: 2-2.
posted @ 2017-05-27 18:19  xlturing  閱讀(4196)  評論(0)    收藏  舉報