SparkStreaming基本(ben)架(jia)構及使用
1、簡介
Spark Streaming處理的數據流(liu)圖(tu):

Spark Streaming在內部的處理機制是,接收實時流的數據,并根(gen)據一定的(de)時(shi)間(jian)間(jian)隔拆分成一批批的(de)數據,然后通過Spark Engine處理這些批數據,最終得到處理后的一批批結果數據。
對應(ying)的批數據,在Spark內(nei)核對應(ying)一個RDD實例(li),因此,對應流數據的DStream可以看成是一組RDDs,即RDD的一個序列。通俗點理解的話,在流數據分成一批一批后,通過一個先進先出的隊列,然后 Spark Engine從該隊列中依次取出一個個批數據,把批數據封裝成一個RDD,然后進行處理,這是一個典型的生產者消費者模型。
1.2 術語定義
l離散流(discretized stream)或DStream:Spark Streaming對內部持續的實時數據流的抽象描述,即我們處理的一個實時數據流,在Spark Streaming中對應于一個DStream 實例。
l批數據(batch data):這是化整為零的第一步,將實時流數據以時間(jian)片(pian)為單位進行分批,將流處理轉化為時間片數據的批處理。隨著持續時間的推移,這些處理結果就形成了對應的結果數據流了。
l時間片或批處理時間間隔( batch interval):人為地對流數據進行定量的標準,以時間片作為我們拆分流數據的依據。一個(ge)時間片的數據對應(ying)一個(ge)RDD實(shi)例。
l窗口長度(window length):一個窗口覆蓋的流數據的時間長度。必須是批處理時間間隔的倍數,
l滑動時間間隔:前一個窗口到后一個窗口所經過的時間長度。必須是批處理時間間隔的倍數
lInput DStream :一個input DStream是一個特殊的DStream,將Spark Streaming連接到一個外部數據源來讀取數據。
2、運(yun)行原理
2.1 Streaming架構
SparkStreaming是一個對實時數據流進行(xing)高通(tong)量(liang)、容錯處理的流式處理系統(tong),可以(yi)對多種數據源(如Kdfka、Flume、Twitter、Zero和TCP 套接字(zi))進行(xing)類似(si)Map、Reduce和Join等復(fu)雜操作,并將結果保存到外(wai)部文件系統(tong)、數據庫(ku)或應用到實時儀表盤。
l計算流程:Spark Streaming是將流式計算分解成一系列短小的批處理作業。這里的批處理引擎是Spark Core,也就是把Spark Streaming的輸入(ru)數據(ju)按照batch size(如1秒)分成一段一段的數據(Discretized Stream),每一段數據都轉換成Spark中的RDD(Resilient Distributed Dataset),然后(hou)將Spark Streaming中對DStream的Transformation操作變(bian)為(wei)針對Spark中(zhong)對RDD的Transformation操作,將RDD經過操作變成中間結果保存在內存中。整個流式計算根據業務的需求可以對中間的結果進行疊加或者存儲到外部設備。下圖顯示了Spark Streaming的整個流程。

圖Spark Streaming構架
2.2 編程模型
DStream(Discretized Stream)作為Spark Streaming的(de)(de)(de)基礎抽(chou)象(xiang),它代表(biao)(biao)持(chi)續性的(de)(de)(de)數(shu)據(ju)流。這(zhe)些數(shu)據(ju)流既可以通(tong)過外部(bu)輸入源賴獲(huo)取,也可以通(tong)過現有的(de)(de)(de)Dstream的(de)(de)(de)transformation操作來獲(huo)得。在內部(bu)實(shi)現上(shang),DStream由一組時間(jian)序(xu)列上(shang)連(lian)續的(de)(de)(de)RDD來表(biao)(biao)示。每個RDD都包含了(le)自己特定(ding)時間(jian)間(jian)隔內的(de)(de)(de)數(shu)據(ju)流。如圖7-3所示。

圖7-3 DStream中(zhong)在時間軸(zhou)下生成離散的RDD序(xu)列

對(dui)DStream中數據的(de)各種操(cao)(cao)作也是(shi)映(ying)射到內部的(de)RDD上來進行的(de),如圖7-4所示(shi),對(dui)Dtream的(de)操(cao)(cao)作可以通(tong)過RDD的(de)transformation生成新的(de)DStream。這里(li)的(de)執(zhi)行引擎(qing)是(shi)Spark。
2.2.1 如何使用Spark Streaming
"""
Counts words in UTF8 encoded, '\n' delimited text directly received from Kafka in every 2 seconds.
Usage: direct_kafka_wordcount.py <broker_list> <topic>
To run this on your local machine, you need to setup Kafka and create a producer first, see
http://kafka.apache.org/documentation.html#quickstart
and then run the example
`$ bin/spark-submit --jars \
external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar \
examples/src/main/python/streaming/direct_kafka_wordcount.py \
localhost:9092 test`
"""
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
#這里kafka產生(sheng)的是一個map, key是null, value是實際發(fa)送(song)的數據,所以(yi)取x[1]
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
1.創建StreamingContext對象 同Spark初始化需要創建SparkContext對象一樣,使用Spark Streaming就需要創建StreamingContext對象。創建StreamingContext對象所需的參數與SparkContext基本一致,包括指明Master,設定名稱。Spark Streaming需要指定處理數據的時間間隔,如上例所示的2s,那么Spark Streaming會以2s為時間窗口進行數據處理。此參數需要根據用戶的需求和集群的處理能力進行適當的設置;
2.創建InputDStream Spark Streaming需要指明數據源。如socketTextStream,Spark Streaming以socket連接作為數據源讀取數據。當然Spark Streaming支持多種不同的數據源,包括Kafka、 Flume、HDFS/S3、Kinesis和Twitter等數據源;
3.操作DStream 對于從數據源得到的DStream,用戶可以在其基礎上進行各種操作,如上例所示的操作就是一個典型的WordCount執行流程:對于當前時間窗口內從數據源得到的數據首先進行分割,然后利用Map和ReduceByKey方法進行計算,當然最后還有使用print()方法輸出結果;
4.啟動Spark Streaming 之前所作的所有步驟只是創建了執行流程,程序沒有真正連接上數據源,也沒有對數據進行任何操作,只是設定好了所有的執行計劃,當ssc.start()啟動后程序才真正進行所有預期的操作。
至(zhi)此對于Spark Streaming的(de)如(ru)何(he)使(shi)用有了一個大概的(de)印(yin)象,在后(hou)面的(de)章節我們(men)會通過源代碼深入探究一下Spark Streaming的(de)執行流(liu)程
2.2.3 DStream的(de)操(cao)作
與RDD類似,DStream也提(ti)供(gong)了自(zi)己的(de)一系(xi)列(lie)操(cao)作(zuo)方法,這些(xie)操(cao)作(zuo)可(ke)以(yi)分成三類:普(pu)通的(de)轉(zhuan)換(huan)操(cao)作(zuo)、窗口轉(zhuan)換(huan)操(cao)作(zuo)和輸(shu)出(chu)操(cao)作(zuo)。
2.2.3.1 普通的轉換操作
普(pu)通(tong)的轉換操(cao)作如(ru)下(xia)表所(suo)示(shi):
|
轉換 |
描述 |
|
map(func) |
源 DStream的(de)每(mei)個元素(su)通過函(han)數func返回一個新的(de)DStream。 |
|
flatMap(func) |
類似與map操作,不同的是每(mei)個輸入元素可以被映(ying)射出0或者(zhe)更(geng)多的輸出元素。 |
|
filter(func) |
在(zai)源DSTREAM上(shang)選(xuan)擇Func函數返回(hui)僅為true的(de)元素,最(zui)終返回(hui)一(yi)個新的(de)DSTREAM 。 |
|
repartition(numPartitions) |
通過輸(shu)入的(de)(de)參數numPartitions的(de)(de)值來改變DStream的(de)(de)分區大小。 |
|
union(otherStream) |
返回一個包(bao)含源DStream與其他(ta) DStream的元素合并后的新(xin)DSTREAM。 |
|
count() |
對源DStream內(nei)部的所(suo)含有的RDD的元(yuan)素數量(liang)進行計數,返回一個內(nei)部的RDD只包含一個元(yuan)素的DStreaam。 |
|
reduce(func) |
使(shi)用函數func(有(you)兩個參數并返回(hui)(hui)一(yi)個結果)將源DStream 中(zhong)每個RDD的(de)元素進(jin)行聚 合操作(zuo),返回(hui)(hui)一(yi)個內部所包含(han)的(de)RDD只(zhi)有(you)一(yi)個元素的(de)新DStream。 |
|
countByValue() |
計(ji)算(suan)DStream中每個RDD內的(de)元(yuan)(yuan)素出現的(de)頻次并返回(hui)新的(de)DStream[(K,Long)],其中K是RDD中元(yuan)(yuan)素的(de)類(lei)型,Long是元(yuan)(yuan)素出現的(de)頻次。 |
|
reduceByKey(func, [numTasks]) |
當一個(ge)類型為(K,V)鍵(jian)值對的(de)DStream被(bei)調用(yong)的(de)時候,返回類型為類型為(K,V)鍵(jian)值對的(de)新 DStream,其(qi)中每個(ge)鍵(jian)的(de)值V都是使(shi)用(yong)聚合(he)函數func匯總。注意:默認情況(kuang)下,使(shi)用(yong) Spark的(de)默認并行度提交任務(本地模(mo)式(shi)下并行度為2,集群模(mo)式(shi)下位8),可以通(tong)過(guo)配置numTasks設置不同的(de)并行任務數。 |
|
join(otherStream, [numTasks]) |
當被調用類型分(fen)別為(wei)(K,V)和(K,W)鍵值(zhi)對(dui)的(de)(de)2個(ge)(ge)DStream時(shi),返回類型為(wei)(K,(V,W))鍵值(zhi)對(dui)的(de)(de)一(yi)個(ge)(ge)新 DSTREAM。 |
|
cogroup(otherStream, [numTasks]) |
當被(bei)調(diao)用(yong)的兩個DStream分別含有(K, V) 和(he)(K, W)鍵(jian)值對時,返回一個(K, Seq[V], Seq[W])類型的新的DStream。 |
|
transform(func) |
通過(guo)對源DStream的每RDD應用(yong)RDD-to-RDD函數返(fan)回一個(ge)新的DStream,這可以用(yong)來在DStream做任意RDD操作。 |
|
updateStateByKey(func) |
返(fan)回一(yi)個新(xin)狀態(tai)的(de)DStream,其中每個鍵(jian)的(de)狀態(tai)是根據鍵(jian)的(de)前一(yi)個狀態(tai)和鍵(jian)的(de)新(xin)值(zhi)應用給(gei)定函(han)數func后(hou)的(de)更新(xin)。這個方法可(ke)以(yi)被用來維持每個鍵(jian)的(de)任(ren)何狀態(tai)數據。 |
在(zai)上(shang)面(mian)列出(chu)的(de)(de)這些操(cao)作中,transform()方法(fa)和updateStateByKey()方法(fa)值得(de)我(wo)們深入的(de)(de)探(tan)討一下(xia):
l transform(func)操作
該transform操(cao)作(zuo)(轉換操(cao)作(zuo))連同(tong)其(qi)其(qi)類似的(de)(de)(de) transformWith操(cao)作(zuo)允許(xu)DStream 上應(ying)用任意RDD-to-RDD函數(shu)。它可以被應(ying)用于未在(zai)DStream API 中(zhong)(zhong)暴露(lu)任何的(de)(de)(de)RDD操(cao)作(zuo)。例如,在(zai)每(mei)批次的(de)(de)(de)數(shu)據流與另一(yi)數(shu)據集的(de)(de)(de)連接(jie)功能不直接(jie)暴露(lu)在(zai)DStream API 中(zhong)(zhong),但可以輕松地使用transform操(cao)作(zuo)來做到這一(yi)點,這使得DStream的(de)(de)(de)功能非(fei)常強大。
l updateStateByKey操作
該 updateStateByKey 操作可以讓(rang)你(ni)保(bao)持任意狀態,同(tong)時不斷(duan)有新的(de)信息(xi)進(jin)行(xing)更新。要使用(yong)此功能,必須進(jin)行(xing)兩(liang)個步驟 :
(1) 定義狀態(tai) - 狀態(tai)可以是(shi)任意的數(shu)據類型。
(2) 定義狀態(tai)更(geng)新函數(shu) - 用一個函數(shu)指定如何使用先前的(de)狀態(tai)和從輸(shu)入(ru)流中獲取的(de)新值 更(geng)新狀態(tai)。
讓我們(men)用一個(ge)例子來說明,假設你(ni)要進行文本數據(ju)流中(zhong)單詞計數。在(zai)這里(li),正在(zai)運行的計數是狀態(tai)而且它是一個(ge)整數。我們(men)定義了(le)更新(xin)功能如下:
詳細案例(li)參考(kao):
此函(han)數應用于(yu)含有鍵(jian)值對(dui)的(de)(de)DStream中(zhong)(如(ru)前面的(de)(de)示例中(zhong),在DStream中(zhong)含有(word,1)鍵(jian)值對(dui))。它會(hui)針對(dui)里(li)面的(de)(de)每個元素(su)(如(ru)wordCount中(zhong)的(de)(de)word)調用一下更新函(han)數,newValues是最新的(de)(de)值,runningCount是之(zhi)前的(de)(de)值。
2.2.3.2 窗口轉換操作
Spark Streaming 還提供了窗(chuang)口(kou)的計算(suan),它(ta)允(yun)許你通過滑動窗(chuang)口(kou)對數(shu)據進行轉換(huan),窗(chuang)口(kou)轉換(huan)操作(zuo)如下:
|
轉換 |
描述 |
|
window(windowLength, slideInterval) |
返回一個基(ji)于源DStream的窗口批次計算后(hou)得到新的DStream。 |
|
countByWindow(windowLength,slideInterval) |
返回基(ji)于滑動窗口的(de)(de)DStream中(zhong)的(de)(de)元素的(de)(de)數量。 |
|
reduceByWindow(func, windowLength,slideInterval) |
基于滑(hua)動(dong)窗口對源DStream中的元(yuan)素進行聚合(he)操作,得到一個新的DStream。 |
|
reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks]) |
基(ji)于滑動窗口對(K,V)鍵值(zhi)對類(lei)型的DStream中(zhong)的值(zhi)按K使用聚合函數func進行聚合操作(zuo),得到(dao)一(yi)個新的DStream。 |
|
reduceByKeyAndWindow(func, invFunc,windowLength,slideInterval, [numTasks]) |
一個更高效(xiao)的(de)(de)(de)(de)(de)reduceByKkeyAndWindow()的(de)(de)(de)(de)(de)實現(xian)版本,先對滑動窗口中新的(de)(de)(de)(de)(de)時間(jian)(jian)間(jian)(jian)隔內數據增量聚(ju)合并移(yi)去(qu)最早的(de)(de)(de)(de)(de)與(yu)新增數據量的(de)(de)(de)(de)(de)時間(jian)(jian)間(jian)(jian)隔內的(de)(de)(de)(de)(de)數據統(tong)(tong)計(ji)(ji)(ji)量。例如(ru),計(ji)(ji)(ji)算t+4秒(miao)這個時刻過去(qu)5秒(miao)窗口的(de)(de)(de)(de)(de)WordCount,那么(me)我們(men)可以將t+3時刻過去(qu)5秒(miao)的(de)(de)(de)(de)(de)統(tong)(tong)計(ji)(ji)(ji)量加(jia)上[t+3,t+4]的(de)(de)(de)(de)(de)統(tong)(tong)計(ji)(ji)(ji)量,在減去(qu)[t-2,t-1]的(de)(de)(de)(de)(de)統(tong)(tong)計(ji)(ji)(ji)量,這種方(fang)法(fa)可以復用(yong)中間(jian)(jian)三秒(miao)的(de)(de)(de)(de)(de)統(tong)(tong)計(ji)(ji)(ji)量,提(ti)高統(tong)(tong)計(ji)(ji)(ji)的(de)(de)(de)(de)(de)效(xiao)率。 |
|
countByValueAndWindow(windowLength,slideInterval, [numTasks]) |
基于(yu)滑動窗口計(ji)算源DStream中每(mei)個(ge)RDD內每(mei)個(ge)元素出現的(de)(de)頻次(ci)并返回(hui)DStream[(K,Long)],其中K是RDD中元素的(de)(de)類型,Long是元素頻次(ci)。與countByValue一(yi)樣(yang),reduce任務(wu)的(de)(de)數量可以通過一(yi)個(ge)可選參數進(jin)行配置。
|
2.2.3.3 輸出操作
Spark Streaming允許DStream的數(shu)(shu)據被輸出到外(wai)部系統,如數(shu)(shu)據庫或文件系統。由于輸出操作實(shi)際上使transformation操作后的數(shu)(shu)據可(ke)以通過外(wai)部系統被使用,同(tong)時輸出操作觸(chu)發所有DStream的transformation操作的實(shi)際執行(類似于RDD操作)。以下表列出了目(mu)前主要的輸出操作:
|
轉換 |
描述 |
|
print() |
在Driver中打印出DStream中數據的前10個(ge)元素。 |
|
saveAsTextFiles(prefix, [suffix]) |
將DStream中的(de)內(nei)(nei)容以文本(ben)的(de)形式保存為文本(ben)文件(jian),其中每次批處(chu)理間(jian)隔內(nei)(nei)產(chan)生的(de)文件(jian)以prefix-TIME_IN_MS[.suffix]的(de)方式命(ming)名。 |
|
saveAsObjectFiles(prefix, [suffix]) |
將DStream中的內容按(an)對象序列(lie)化并且以(yi)SequenceFile的格式保(bao)存。其(qi)中每次批處(chu)理間隔內產生的文件以(yi)prefix-TIME_IN_MS[.suffix]的方式命名。 |
|
saveAsHadoopFiles(prefix, [suffix]) |
將DStream中(zhong)的(de)內容以文(wen)本的(de)形式保存為Hadoop文(wen)件,其中(zhong)每(mei)次(ci)批處理間(jian)隔內產生的(de)文(wen)件以prefix-TIME_IN_MS[.suffix]的(de)方式命名。 |
|
foreachRDD(func) |
最基(ji)本的輸(shu)出(chu)操(cao)作,將func函數應用(yong)于DStream中的RDD上,這個操(cao)作會(hui)輸(shu)出(chu)數據(ju)到外部系(xi)統,比如保存RDD到文件(jian)或者(zhe)網絡數據(ju)庫等。需要(yao)注(zhu)意的是func函數是在運行該streaming應用(yong)的Driver進程里執行的。 |
dstream.foreachRDD是一個非常強(qiang)大的輸出(chu)操作(zuo),它允(yun)將(jiang)許數(shu)據(ju)輸出(chu)到外部(bu)系(xi)統。詳細案例請參(can)考:
3、spark整合kafka
用spark streaming流式(shi)處理kafka中(zhong)的數(shu)據,第一步當(dang)然是先把數(shu)據接收過來,轉換為spark streaming中(zhong)的數(shu)據結構Dstream。接收數(shu)據的方式(shi)有兩(liang)種:1.利(li)用Receiver接收數(shu)據,2.直(zhi)接從kafka讀取數(shu)據。
基于Receiver的方式
這種方式利用接收器(Receiver)來接收kafka中的數據,其最基本是使用Kafka高階用戶API接口。對于所有的接收器,從kafka接收來的數據會存儲在spark的executor中,之后spark streaming提交的job會處理這些數據。如下圖:
還有幾個(ge)需要注意的點:
- 在Receiver的方式中,ssc中的partition和kafka中的partition并不是相關的,所以如果我們加大每個topic的partition數量,僅僅是增加線程來處理由單一Receiver消費的主題。但是這并沒有增加Spark在處理數據上的并行度。
- 對于不同的Group和topic我們可以使用多個Receiver創建不同的Dstream來并行接收數據,之后可以利用union來統一成一個Dstream。
- 如果我們啟用了Write Ahead Logs復制到文件系統如HDFS,那么storage level需要設置成 StorageLevel.MEMORY_AND_DISK_SER,也就是
KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)
構造函數為KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] )
對于所有的receivers接收到的數據將會保存在spark executors中,然后通過Spark Streaming啟動job來處理這些數據,默認會丟失,可啟用WAL日志,該日志存儲在HDFS上
直接讀取方式
在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方(fang)式沒有receiver這一層,其(qi)會周期性的獲取Kafka中(zhong)每個topic的每個partition中(zhong)的最新offsets,之后根據設定的maxRatePerPartition偏移量范圍來處理每個batch。其形式如下圖:
這(zhe)種方(fang)法相較于(yu)Receiver方(fang)式的優勢(shi)在(zai)于(yu):
- 簡化的并行:在Receiver的方式中我們提到創建多個Receiver之后利用union來合并成一個Dstream的方式提高數據傳輸并行度。而在Direct方式中,Kafka中的partition與RDD中的partition是一一對應的并行讀取Kafka數據,會創建和kafka分區一樣的rdd個數。
- 高效:在Receiver的方式中,為了達到0數據丟失需要將數據存入Write Ahead Log中,這樣在Kafka和日志中就保存了兩份數據,第一次是被kafka復制,另一次是寫到wal中,浪費!而第二種方式不存在這個問題,只要我們Kafka的數據保留時間足夠長,我們都能夠從Kafka進行數據恢復。
- 精確一次:在Receiver的方式中,使用的是Kafka的高階API接口從Zookeeper中獲取offset值,這也是傳統的從Kafka中讀取數據的方式,但由于Spark Streaming消費的數據和Zookeeper中記錄的offset不同步,這種方式偶爾會造成數據重復消費。而第二種方式,直接使用了簡單的低階Kafka API,Offsets則利用Spark Streaming的checkpoints進行記錄,消除了zk和ssc偏移量不一致的問題。缺點是無法使用基于zookeeper的kafka監控工具。
以上主要是(shi)對官(guan)方(fang)文(wen)檔[1]的一個簡單翻譯,詳(xiang)細(xi)內容大家可(ke)以直接看下(xia)官(guan)方(fang)文(wen)檔這(zhe)里不再(zai)贅述。
不(bu)同于Receiver的方式,是(shi)從(cong)Zookeeper中讀取offset值,那么自然zookeeper就保存(cun)了當前消(xiao)費的offset值,那么如果重新(xin)啟動開始消(xiao)費就會接著(zhu)上(shang)一次offset值繼續(xu)消(xiao)費。
而在Direct的方式中,我們是直接從kafka來讀數據,那么offset需要自己記錄,可以利用checkpoint、數據庫或文件記錄或者回寫到zookeeper中進行記錄。這里我們給出利用Kafka底層API接口,將offset及時同步到zookeeper中的通用類,我將其放在了github上:
示例中KafkaManager是一個通用類,而KafkaCluster是kafka源碼中的一個類,由于包名權限的原因我把它單獨提出來,ComsumerMain簡單展示了通用類的使用方法,在每次創建KafkaStream時,都會先從zooker中查看上次的消費記錄offsets,而每個batch處理完成后,會同步offsets到zookeeper中。
refer://blog.csdn.net/zhong_han_jun/article/details/50814038
參考:
Spark入門實戰系列--7.Spark Streaming(上)--實時流計算Spark Streaming原理介紹
Spark踩坑記——Spark Streaming+Kafka
