中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

Spark踩坑記——Spark Streaming+Kafka

前言

在WeTest輿情項目中,需要對每天千萬級的游戲評論信息進行詞頻統計,在生產者一端,我們將數據按照每天的拉取時間存入了Kafka當中,而在消費者一端,我們利用了spark streaming從kafka中不斷拉取數據進行詞頻統計。本文首先對spark streaming嵌入kafka的方式進行歸納總結,之后簡單闡述Spark streaming+kafka在輿情項目中的應用,最后將自己在Spark Streaming+kafka的實際優化中的一些經驗進行歸納總結。(如有任何紕漏歡迎補充來踩,我會第一時間改正v

Spark streaming接收Kafka數據

用spark streaming流(liu)式處(chu)理kafka中的(de)數(shu)(shu)據(ju),第一(yi)步當然是先把數(shu)(shu)據(ju)接收(shou)過來,轉換為spark streaming中的(de)數(shu)(shu)據(ju)結構Dstream。接收(shou)數(shu)(shu)據(ju)的(de)方式有兩種:1.利用Receiver接收(shou)數(shu)(shu)據(ju),2.直接從kafka讀取(qu)數(shu)(shu)據(ju)。

基于Receiver的方式

這種方式利用接收器(Receiver)來接收kafka中的數據,其最基本是使用Kafka高階用戶API接口。對于所有的接收器,從kafka接收來的數據會存儲在spark的executor中,之后spark streaming提交的job會處理這些數據。如下圖:
Receiver圖形解釋
在使用時,我們需要添加相(xiang)應的依賴包:

<dependency><!-- Spark Streaming Kafka -->
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.10</artifactId>
    <version>1.6.3</version>
</dependency>

而對于Scala的基本(ben)使用方式(shi)如下:

import org.apache.spark.streaming.kafka._

 val kafkaStream = KafkaUtils.createStream(streamingContext, 
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

還有幾個需要注意的點(dian):

  • 在Receiver的方式中,Spark中的partition和kafka中的partition并不是相關的,所以如果我們加大每個topic的partition數量,僅僅是增加線程來處理由單一Receiver消費的主題。但是這并沒有增加Spark在處理數據上的并行度。
  • 對于不同的Group和topic我們可以使用多個Receiver創建不同的Dstream來并行接收數據,之后可以利用union來統一成一個Dstream。
  • 如果我們啟用了Write Ahead Logs復制到文件系統如HDFS,那么storage level需要設置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)

直接讀取方式

在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式沒有receiver這一層,其會周期性的獲取Kafka中每個topic的每個partition中的最新offsets,之后根據設定的maxRatePerPartition來處理每個batch。其形式如下圖:

這種方法相較于(yu)Receiver方式的(de)優(you)勢(shi)在于(yu):

  • 簡化的并行:在Receiver的方式中我們提到創建多個Receiver之后利用union來合并成一個Dstream的方式提高數據傳輸并行度。而在Direct方式中,Kafka中的partition與RDD中的partition是一一對應的并行讀取Kafka數據,這種映射關系也更利于理解和優化。
  • 高效:在Receiver的方式中,為了達到0數據丟失需要將數據存入Write Ahead Log中,這樣在Kafka和日志中就保存了兩份數據,浪費!而第二種方式不存在這個問題,只要我們Kafka的數據保留時間足夠長,我們都能夠從Kafka進行數據恢復。
  • 精確一次:在Receiver的方式中,使用的是Kafka的高階API接口從Zookeeper中獲取offset值,這也是傳統的從Kafka中讀取數據的方式,但由于Spark Streaming消費的數據和Zookeeper中記錄的offset不同步,這種方式偶爾會造成數據重復消費。而第二種方式,直接使用了簡單的低階Kafka API,Offsets則利用Spark Streaming的checkpoints進行記錄,消除了這種不一致性。

以(yi)上(shang)主要是(shi)對官方(fang)文檔[1]的(de)一個簡單翻譯(yi),詳細內(nei)容大(da)家可以(yi)直接(jie)看下(xia)官方(fang)文檔這里不再贅述。

不同于Receiver的方式,是從Zookeeper中讀取offset值,那么自然zookeeper就保存了當前消費的offset值,那么如果重新啟動開始消費就會接著上一次offset值繼續消費。而在Direct的方式中,我們是直接從kafka來讀數據,那么offset需要自己記錄,可以利用checkpoint、數據庫或文件記錄或者回寫到zookeeper中進行記錄。這里我們給出利用Kafka底層API接口,將offset及時同步到zookeeper中的通用類,我將其放在了github上:

示例中(zhong)KafkaManager是(shi)一個通(tong)用(yong)(yong)類(lei),而(er)KafkaCluster是(shi)kafka源(yuan)碼中(zhong)的(de)(de)(de)一個類(lei),由于包名權限的(de)(de)(de)原因我把它單(dan)獨提出(chu)來,ComsumerMain簡單(dan)展示了通(tong)用(yong)(yong)類(lei)的(de)(de)(de)使(shi)用(yong)(yong)方(fang)法,在每次創建KafkaStream時,都會先從(cong)zooker中(zhong)查看上次的(de)(de)(de)消(xiao)費(fei)記錄offsets,而(er)每個batch處(chu)理完成后,會同步offsets到zookeeper中(zhong)。

Spark向kafka中寫入數據

上文闡述了Spark如何從Kafka中流式的讀取數據,下面我整理向Kafka中寫數據。與讀數據不同,Spark并沒有提供統一的接口用于寫入Kafka,所以我們需要使用底層Kafka接口進行包裝。
最直(zhi)接的(de)做法(fa)我們可以想(xiang)到如下這(zhe)種方式:

input.foreachRDD(rdd =>
  // 不能在這里創建KafkaProducer
  rdd.foreachPartition(partition =>
    partition.foreach{
      case x:String=>{
        val props = new HashMap[String, Object]()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        println(x)
        val producer = new KafkaProducer[String,String](props)
        val message=new ProducerRecord[String, String]("output",null,x)
        producer.send(message)
      }
    }
  )
) 

但是(shi)這(zhe)種方式缺(que)點很明(ming)顯,對于每個partition的(de)每條記錄(lu),我們(men)都需要(yao)創建KafkaProducer,然(ran)后利用producer進(jin)行輸出操作,注(zhu)意(yi)這(zhe)里我們(men)并(bing)不(bu)能將KafkaProducer的(de)新(xin)建任務(wu)放在foreachPartition外邊(bian),因為KafkaProducer是(shi)不(bu)可序(xu)列化的(de)(not serializable)。顯然(ran)這(zhe)種做法是(shi)不(bu)靈活且低(di)效的(de),因為每條記錄(lu)都需要(yao)建立(li)一次連接。如何解決呢?

  1. 首先,我們需要將KafkaProducer利用lazy val的方式進行包裝如下:
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }
class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
  /* This is the key idea that allows us to work around running into
     NotSerializableExceptions. */
  lazy val producer = createProducer()
  def send(topic: String, key: K, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, key, value))
  def send(topic: String, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, value))
}

object KafkaSink {
  import scala.collection.JavaConversions._
  def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
    val createProducerFunc = () => {
      val producer = new KafkaProducer[K, V](config)
      sys.addShutdownHook {
        // Ensure that, on executor JVM shutdown, the Kafka producer sends
        // any buffered messages to Kafka before shutting down.
        producer.close()
      }
      producer
    }
    new KafkaSink(createProducerFunc)
  }
  def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
}
  1. 之后我們利用廣播變量的形式,將KafkaProducer廣播到每一個executor,如下:
// 廣播KafkaSink
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
  val kafkaProducerConfig = {
    val p = new Properties()
    p.setProperty("bootstrap.servers", Conf.brokers)
    p.setProperty("key.serializer", classOf[StringSerializer].getName)
    p.setProperty("value.serializer", classOf[StringSerializer].getName)
    p
  }
  log.warn("kafka producer init done!")
  ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
}

這樣我們就(jiu)能在每個(ge)executor中愉(yu)快(kuai)的(de)將數(shu)據輸入到kafka當中:

//輸出到kafka
segmentedStream.foreachRDD(rdd => {
  if (!rdd.isEmpty) {
    rdd.foreach(record => {
      kafkaProducer.value.send(Conf.outTopics, record._1.toString, record._2)
      // do something else
    })
  }
})

Spark streaming+Kafka應用

WeTest輿情(qing)監控對于每天(tian)爬(pa)取的(de)(de)千萬級游戲玩(wan)家評(ping)論(lun)信息都(dou)要實時的(de)(de)進行(xing)詞頻統(tong)計,對于爬(pa)取到(dao)的(de)(de)游戲玩(wan)家評(ping)論(lun)數據,我(wo)們(men)會生產到(dao)Kafka中(zhong),而另一端的(de)(de)消費者我(wo)們(men)采用了Spark Streaming來進行(xing)流式處(chu)理(li)(li),首先利用上文我(wo)們(men)闡述的(de)(de)Direct方式從Kafka拉取batch,之后經過分(fen)詞、統(tong)計等相關處(chu)理(li)(li),回寫到(dao)DB上(至于Spark中(zhong)DB的(de)(de)回寫方式可參考我(wo)之前總(zong)結(jie)的(de)(de)博文:),由此高效實時的(de)(de)完成每天(tian)大(da)量數據的(de)(de)詞頻統(tong)計任(ren)務(wu)。

Spark streaming+Kafka調優

Spark streaming+Kafka的使用中,當數(shu)據量較小,很多時(shi)候默(mo)認配置(zhi)和(he)使用便能夠滿足情況,但(dan)是當數(shu)據量大的時(shi)候,就需要進行一(yi)定的調整(zheng)和(he)優化,而這種調整(zheng)和(he)優化本身也是不同的場景需要不同的配置(zhi)。

合理的批處理時間(batchDuration)

幾乎所有的Spark Streaming調優文檔都會提及批處理時間的調整,在StreamingContext初始化的時候,有一個參數便是批處理時間的設定。如果這個值設置的過短,即個batchDuration所產生的Job并不能在這期間完成處理,那么就會造成數據不斷堆積,最終導致Spark Streaming發生阻塞。而且,一般對于batchDuration的設置不會小于500ms,因為過小會導致SparkStreaming頻繁的提交作業,對整個streaming造成額外的負擔。在平時的應用中,根據不同的應用場景和硬件配置,我設在1~10s之間,我們可以根據SparkStreaming的可視化監控界面,觀察Total Delay來進行batchDuration的調整,如下圖:

合理的Kafka拉取量(maxRatePerPartition重要)

對于Spark Streaming消費kafka中數據的應用場景,這個配置是非常關鍵的,配置參數為:spark.streaming.kafka.maxRatePerPartition。這個參數默認是沒有上線的,即kafka當中有多少數據它就會直接全部拉出。而根據生產者寫入Kafka的速率以及消費者本身處理數據的速度,同時這個參數需要結合上面的batchDuration,使得每個partition拉取在每個batchDuration期間拉取的數據能夠順利的處理完畢,做到盡可能高的吞吐量,而這個參數的調整可以參考可視化監控界面中的Input Rate和Processing Time,如下圖:

緩存反復使用的Dstream(RDD)

Spark中的RDD和SparkStreaming中的Dstream,如果被反復的使用,最好利用cache(),將該數據流緩存起來,防止過度的調度資源造成的網絡開銷。可以參考觀察Scheduling Delay參數,如下圖:

設置合理的GC

長期使用Java的(de)(de)小伙(huo)伴都知道,JVM中(zhong)的(de)(de)垃圾回(hui)(hui)收機(ji)制(zhi),可以讓我們(men)不過多的(de)(de)關注與內存(cun)的(de)(de)分(fen)配(pei)回(hui)(hui)收,更加專注于業務邏(luo)輯,JVM都會為我們(men)搞定。對(dui)JVM有些(xie)了解的(de)(de)小伙(huo)伴應(ying)該知道,在(zai)Java虛擬機(ji)中(zhong),將內存(cun)分(fen)為了初生代(eden generation)、年(nian)(nian)輕代(young generation)、老(lao)年(nian)(nian)代(old generation)以及永久代(permanent generation),其中(zhong)每次GC都是(shi)需要(yao)耗費一定時(shi)間的(de)(de),尤其是(shi)老(lao)年(nian)(nian)代的(de)(de)GC回(hui)(hui)收,需要(yao)對(dui)內存(cun)碎片進(jin)行整理,通常采用標記-清(qing)楚的(de)(de)做法。同(tong)樣的(de)(de)在(zai)Spark程序中(zhong),JVM GC的(de)(de)頻率和時(shi)間也(ye)是(shi)影響整個Spark效率的(de)(de)關鍵(jian)因素。在(zai)通常的(de)(de)使用中(zhong)建議:

--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"

設置合理的CPU資源數

CPU的(de)(de)(de)(de)core數(shu)量(liang),每個executor可(ke)以占用(yong)(yong)一(yi)個或(huo)多(duo)個core,可(ke)以通(tong)過觀(guan)察CPU的(de)(de)(de)(de)使用(yong)(yong)率(lv)變化來(lai)了解計算資源的(de)(de)(de)(de)使用(yong)(yong)情(qing)況,例如(ru),很常見的(de)(de)(de)(de)一(yi)種浪費(fei)是(shi)一(yi)個executor占用(yong)(yong)了多(duo)個core,但是(shi)總的(de)(de)(de)(de)CPU使用(yong)(yong)率(lv)卻不(bu)高(因為一(yi)個executor并不(bu)總能充分利用(yong)(yong)多(duo)核的(de)(de)(de)(de)能力),這個時候(hou)可(ke)以考慮讓么個executor占用(yong)(yong)更(geng)少的(de)(de)(de)(de)core,同時worker下面(mian)增(zeng)(zeng)加(jia)(jia)更(geng)多(duo)的(de)(de)(de)(de)executor,或(huo)者一(yi)臺host上面(mian)增(zeng)(zeng)加(jia)(jia)更(geng)多(duo)的(de)(de)(de)(de)worker來(lai)增(zeng)(zeng)加(jia)(jia)并行執行的(de)(de)(de)(de)executor的(de)(de)(de)(de)數(shu)量(liang),從(cong)而增(zeng)(zeng)加(jia)(jia)CPU利用(yong)(yong)率(lv)。但是(shi)增(zeng)(zeng)加(jia)(jia)executor的(de)(de)(de)(de)時候(hou)需要考慮好內(nei)存消耗,因為一(yi)臺機器的(de)(de)(de)(de)內(nei)存分配給越多(duo)的(de)(de)(de)(de)executor,每個executor的(de)(de)(de)(de)內(nei)存就越小,以致出現(xian)過多(duo)的(de)(de)(de)(de)數(shu)據(ju)spill over甚至out of memory的(de)(de)(de)(de)情(qing)況。

設置合理的parallelism

partition和parallelism,partition指的就是數據分片的數量,每一次task只能處理一個partition的數據,這個值太小了會導致每片數據量太大,導致內存壓力,或者諸多executor的計算能力無法利用充分;但是如果太大了則會導致分片太多,執行效率降低。在執行action類型操作的時候(比如各種reduce操作),partition的數量會選擇parent RDD中最大的那一個。而parallelism則指的是在RDD進行reduce類操作的時候,默認返回數據的paritition數量(而在進行map類操作的時候,partition數量通常取自parent RDD中較大的一個,而且也不會涉及shuffle,因此這個parallelism的參數沒有影響)。所以說,這兩個概念密切相關,都是涉及到數據分片的,作用方式其實是統一的。通過spark.default.parallelism可以設置默認的分片數量,而很多RDD的操作都可以指定一個partition參數來顯式控制具體的分片數量。
在SparkStreaming+kafka的(de)使用(yong)中(zhong),我們采用(yong)了Direct連接方式,前文闡述過Spark中(zhong)的(de)partition和(he)Kafka中(zhong)的(de)Partition是一(yi)一(yi)對應(ying)的(de),我們一(yi)般默認設置為Kafka中(zhong)Partition的(de)數量(liang)。

使用高性能的算子

這里參考了美團(tuan)技(ji)術團(tuan)隊的(de)博文,并沒有做過具體的(de)性能測試(shi),其(qi)建議如下:

  • 使用reduceByKey/aggregateByKey替代groupByKey
  • 使用mapPartitions替代普通map
  • 使用foreachPartitions替代foreach
  • 使用filter之后進行coalesce操作
  • 使用repartitionAndSortWithinPartitions替代repartition與sort類操作

使用Kryo優化序列化性能

這個優化原則我本身也沒有經過測試,但是好多優化文檔有提到,這里也記錄下來。
在Spark中,主要有三個地方涉及(ji)到了序列化:

  • 在算子函數中使用到外部變量時,該變量會被序列化后進行網絡傳輸(見“原則七:廣播大變量”中的講解)。
  • 將自定義的類型作為RDD的泛型類型時(比如JavaRDD,Student是自定義類型),所有自定義類型對象,都會進行序列化。因此這種情況下,也要求自定義的類必須實現Serializable接口。
  • 使用可序列化的持久化策略時(比如MEMORY_ONLY_SER),Spark會將RDD中的每個partition都序列化成一個大的字節數組。

對(dui)于這三種出現序(xu)(xu)列(lie)(lie)(lie)(lie)(lie)(lie)化(hua)的(de)地方,我(wo)們都可以(yi)通過(guo)使(shi)用(yong)Kryo序(xu)(xu)列(lie)(lie)(lie)(lie)(lie)(lie)化(hua)類(lei)(lei)庫,來(lai)優化(hua)序(xu)(xu)列(lie)(lie)(lie)(lie)(lie)(lie)化(hua)和反序(xu)(xu)列(lie)(lie)(lie)(lie)(lie)(lie)化(hua)的(de)性(xing)能。Spark默(mo)認使(shi)用(yong)的(de)是(shi)Java的(de)序(xu)(xu)列(lie)(lie)(lie)(lie)(lie)(lie)化(hua)機制,也就是(shi)ObjectOutputStream/ObjectInputStream API來(lai)進(jin)行(xing)序(xu)(xu)列(lie)(lie)(lie)(lie)(lie)(lie)化(hua)和反序(xu)(xu)列(lie)(lie)(lie)(lie)(lie)(lie)化(hua)。但是(shi)Spark同時(shi)支持(chi)使(shi)用(yong)Kryo序(xu)(xu)列(lie)(lie)(lie)(lie)(lie)(lie)化(hua)庫,Kryo序(xu)(xu)列(lie)(lie)(lie)(lie)(lie)(lie)化(hua)類(lei)(lei)庫的(de)性(xing)能比(bi)Java序(xu)(xu)列(lie)(lie)(lie)(lie)(lie)(lie)化(hua)類(lei)(lei)庫的(de)性(xing)能要(yao)高很多(duo)。官方介紹,Kryo序(xu)(xu)列(lie)(lie)(lie)(lie)(lie)(lie)化(hua)機制比(bi)Java序(xu)(xu)列(lie)(lie)(lie)(lie)(lie)(lie)化(hua)機制,性(xing)能高10倍(bei)左右。Spark之所(suo)以(yi)默(mo)認沒有使(shi)用(yong)Kryo作(zuo)為序(xu)(xu)列(lie)(lie)(lie)(lie)(lie)(lie)化(hua)類(lei)(lei)庫,是(shi)因為Kryo要(yao)求最好要(yao)注冊所(suo)有需要(yao)進(jin)行(xing)序(xu)(xu)列(lie)(lie)(lie)(lie)(lie)(lie)化(hua)的(de)自定義類(lei)(lei)型,因此對(dui)于開發者來(lai)說,這種方式比(bi)較麻煩。

以下是使用Kryo的代碼示例(li),我(wo)們只要(yao)設置序(xu)列(lie)化類(lei),再注冊要(yao)序(xu)列(lie)化的自定義(yi)類(lei)型即可(比如算子(zi)函(han)數中(zhong)使用到的外部變量類(lei)型、作為(wei)RDD泛(fan)型類(lei)型的自定義(yi)類(lei)型等):

// 創建SparkConf對象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 設置序列化器為KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注冊要序列化的自定義類型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

結果

經過種種調試優化,我們最終要達到的目的是,Spark Streaming能夠實時的拉取Kafka當中的數據,并且能夠保持穩定,如下圖所示:

當然不同的(de)應用場景會(hui)有(you)不同的(de)圖(tu)形,這是本文詞頻(pin)統計優化穩(wen)定(ding)(ding)后的(de)監控圖(tu),我們可以看到(dao)Processing Time這一(yi)柱(zhu)形圖(tu)中有(you)一(yi)Stable的(de)虛線,而大多數Batch都能夠在這一(yi)虛線下處(chu)理完畢,說明(ming)整體Spark Streaming是運行穩(wen)定(ding)(ding)的(de)。

參考文獻

posted @ 2017-01-03 21:35  xlturing  閱讀(91302)  評論(13)    收藏  舉報