Kafka文(wen)件存儲機(ji)制及offset存取
Kafka是什么
Kafka是(shi)最初由Linkedin公司開發,是(shi)一個分布(bu)式、分區的、多副本的、多訂(ding)閱者,基于zookeeper協調(diao)的分布(bu)式日志(zhi)系統(也可以當(dang)做MQ系統),常(chang)見可以用于web/nginx日志(zhi)、訪問日志(zhi),消息(xi)服(fu)務等等,Linkedin于2010年(nian)貢獻給了Apache基金(jin)會并成為(wei)頂級開源項目。
1.前言
一個(ge)商業(ye)化消(xiao)息隊(dui)列(lie)的性能好壞(huai),其文(wen)件存儲機制設計是衡量一個(ge)消(xiao)息隊(dui)列(lie)服務技術水(shui)平和最關鍵指標之一。
下面將從Kafka文件(jian)存儲機制和(he)物理結構角度,分析Kafka是如何(he)實現高效(xiao)文件(jian)存儲,及實際應用效(xiao)果。
2.Kafka文(wen)件存儲機(ji)制
Kafka部(bu)分名(ming)詞解釋如下:
- Broker:消息中間件處理結點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka集群。
- Topic:一類消息,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能夠同時負責多個topic的分發。
- Partition:topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列。
- Segment:partition物理上由多個segment組成,下面2.2和2.3有詳細說明。
- offset:每個partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到partition中。partition中的每個消息都有一個連續的序列號叫做offset,用于partition唯一標識一條消息.
分析過程分為以下4個步驟:
- topic中partition存儲分布(bu)
- partiton中文件存儲(chu)方式
- partiton中(zhong)segment文件存儲(chu)結(jie)構
- 在partition中如何通過offset查找message
通過上述4過程詳細分析,我們就可以清楚認識到(dao)kafka文(wen)件存儲(chu)機(ji)制的(de)奧秘。
2.1 topic中partition存(cun)儲分布
假設實驗環境中Kafka集群只(zhi)有(you)一(yi)個broker,xxx/message-folder為數(shu)據文件存儲(chu)根(gen)目錄(lu),在(zai)Kafka broker中server.properties文件配置(參數(shu)log.dirs=xxx/message-folder),例如創建(jian)2個topic名稱分別為report_push、launch_info, partitions數(shu)量都為partitions=4
存儲路徑和(he)目錄規則為:
xxx/message-folder
|--report_push-0
|--report_push-1
|--report_push-2
|--report_push-3
|--launch_info-0
|--launch_info-1
|--launch_info-2
|--launch_info-3
在Kafka文件(jian)存儲中,同一(yi)個topic下有多(duo)個不同partition,每個partition為一(yi)個目錄,partiton命名規則為topic名稱+有序(xu)序(xu)號(hao),第(di)一(yi)個partiton序(xu)號(hao)從0開始,序(xu)號(hao)最大值(zhi)為partitions數量減1。
如果是(shi)多broker分布情況(kuang),請參(can)考
2.2 partiton中文件存儲方式(shi)
下面(mian)示意圖形象說明(ming)了partition中文件存儲方式:
圖1
- 每個(ge)partion(目(mu)錄)相當于一個(ge)巨(ju)型(xing)文(wen)(wen)件被平均分配到多個(ge)大小(xiao)相等segment(段)數(shu)據文(wen)(wen)件中。但每個(ge)段segment file消息數(shu)量(liang)不(bu)一定相等,這種特性方便old segment file快速被刪(shan)除。
- 每個partiton只需要支持順(shun)序(xu)讀寫就行了,segment文件生命周期由(you)服務端(duan)配置(zhi)參(can)數決(jue)定。
這樣做的好處(chu)就是能快(kuai)速刪(shan)除無用(yong)文件(jian),有效(xiao)提高磁盤利用(yong)率。
2.3 partiton中segment文件(jian)存(cun)儲結構
讀者從2.2節了解(jie)到Kafka文件系統partition存(cun)儲方式,本節深入分(fen)析partion中segment file組成和物理(li)結(jie)構。
- segment file組成:由2大部分組成,分別為index file和data file,此2個文件(jian)一一對應,成對出(chu)現,后綴(zhui)".index"和“.log”分別表(biao)示為segment索引文件(jian)、數據文件(jian).
- segment文件命名規則:partion全局的第一個segment從0開始,后續每(mei)個segment文(wen)件(jian)名為上一個segment文(wen)件(jian)最后一條消息的offset值。數值最大為64位long大小,19位數字字符長度,沒有數字用0填充。
下(xia)面文件(jian)(jian)列(lie)表是筆者在Kafka broker上做的一個實驗,創建一個topicXXX包含1 partition,設置每個segment大(da)小為500MB,并啟動(dong)producer向Kafka broker寫入大(da)量數據,如下(xia)圖2所(suo)示segment文件(jian)(jian)列(lie)表形象說明了(le)上述2個規則:
圖2
以上述圖2中(zhong)一對segment file文件為例,說明segment中(zhong)index<—->data file對應(ying)關(guan)系物理結構如(ru)下(xia):
圖3
上述圖3中(zhong)(zhong)索引(yin)文(wen)件存儲大(da)量元數(shu)據,數(shu)據文(wen)件存儲大(da)量消息,索引(yin)文(wen)件中(zhong)(zhong)元數(shu)據指向對應數(shu)據文(wen)件中(zhong)(zhong)message的物理偏移地址。
其中以(yi)索(suo)引文(wen)件中元(yuan)數據3,497為(wei)例,依次在數據文(wen)件中表(biao)(biao)示(shi)第(di)3個(ge)message(在全局(ju)partiton表(biao)(biao)示(shi)第(di)368772個(ge)message)、以(yi)及(ji)該消息的(de)物理偏移(yi)地址為(wei)497。
從上述圖(tu)3了解(jie)到segment data file由許多message組成,下面詳細說明message物理結構(gou)如下:
圖4
參數(shu)說明:
| 關(guan)鍵(jian)字(zi) | 解釋說(shuo)明 |
|---|---|
| 8 byte offset | 在parition(分(fen)區)內(nei)的(de)(de)每條消息都有一(yi)個有序(xu)的(de)(de)id號,這個id號被稱為偏移(offset),它(ta)可以(yi)唯一(yi)確(que)定每條消息在parition(分(fen)區)內(nei)的(de)(de)位置。即offset表示partiion的(de)(de)第(di)多少message |
| 4 byte message size | message大小 |
| 4 byte CRC32 | 用crc32校驗message |
| 1 byte “magic" | 表示本(ben)次發布(bu)Kafka服務程序協議版本(ben)號 |
| 1 byte “attributes" | 表示為獨立版本、或(huo)標識壓縮類型、或(huo)編碼類型。 |
| 4 byte key length | 表示key的長(chang)度,當key為-1時,K byte key字段不填 |
| K byte key | 可選 |
| value bytes payload | 表示實(shi)際消息數(shu)據。 |
2.4 在partition中如何通過(guo)offset查找message
例如讀取(qu)offset=368776的(de)message,需要通(tong)過下(xia)面2個步驟查找。
-
第一步(bu)查找segment file
上述(shu)圖2為例(li),其中00000000000000000000.index表示最(zui)開始(shi)(shi)的(de)文(wen)件(jian),起(qi)始(shi)(shi)偏(pian)(pian)移量(liang)(offset)為0.第二個(ge)(ge)文(wen)件(jian)00000000000000368769.index的(de)消(xiao)息量(liang)起(qi)始(shi)(shi)偏(pian)(pian)移量(liang)為368770 = 368769 + 1.同樣,第三個(ge)(ge)文(wen)件(jian)00000000000000737337.index的(de)起(qi)始(shi)(shi)偏(pian)(pian)移量(liang)為737338=737337 + 1,其他后(hou)續文(wen)件(jian)依次類推,以(yi)起(qi)始(shi)(shi)偏(pian)(pian)移量(liang)命名并排序(xu)這些文(wen)件(jian),只要根據offset **二分查找**文(wen)件(jian)列表,就可以(yi)快(kuai)速定位到具體文(wen)件(jian)。
當offset=368776時定(ding)位到00000000000000368769.index|log -
第二步(bu)通過segment file查找message
通過第一步定(ding)位(wei)(wei)到(dao)segment file,當(dang)offset=368776時,依次(ci)定(ding)位(wei)(wei)到(dao)00000000000000368769.index的元(yuan)數據物(wu)理位(wei)(wei)置和00000000000000368769.log的物(wu)理偏(pian)移(yi)地址,然后再通過00000000000000368769.log順序查找(zhao)直(zhi)到(dao)offset=368776為止。
從上(shang)述圖3可知這(zhe)樣(yang)做的優(you)點,segment index file采(cai)取(qu)稀疏索引(yin)存(cun)(cun)儲(chu)方式,它(ta)減少索引(yin)文(wen)件(jian)大小,通過mmap可以(yi)直(zhi)接內存(cun)(cun)操(cao)作,稀疏索引(yin)為數據文(wen)件(jian)的每個(ge)對應(ying)message設(she)置一個(ge)元數據指針,它(ta)比稠密(mi)索引(yin)節省了(le)更多的存(cun)(cun)儲(chu)空間,但查找起(qi)來需要消耗更多的時間。
3 Kafka文件(jian)存儲機制–實際運行效果
實驗環境:
- Kafka集(ji)群:由(you)2臺虛擬機組成(cheng)
- cpu:4核
- 物(wu)理內(nei)存:8GB
- 網(wang)卡:千(qian)兆網(wang)卡
- jvm heap: 4GB
- 詳細Kafka服務端配置及其優化請參考:

圖5
從上述(shu)圖5可以看(kan)出,Kafka運行時很少有大量讀(du)磁盤的操(cao)作,主要是(shi)定期批(pi)量寫磁盤操(cao)作,因此操(cao)作磁盤很高效。這跟Kafka文件存(cun)儲(chu)中讀(du)寫message的設計是(shi)息息相關的。Kafka中讀(du)寫message有如下特點(dian):
寫message
- 消息從java堆轉入page cache(即物理內存)。
- 由(you)異步(bu)線程刷盤(pan),消息從page cache刷入(ru)磁(ci)盤(pan)。
讀message
- 消(xiao)息直接從page cache轉入(ru)socket發送出去。
- 當從page cache沒有找到相應數據時,此時會產生磁盤IO,從磁
盤(pan)Load消息到(dao)page cache,然后直接從socket發出去
4.offset存(cun)儲(chu)方式
- 1、在kafka 0.9版本之后,kafka為了降低zookeeper的io讀寫,減少(shao)network data transfer,也自己實現了在kafka server上存儲consumer,topic,partitions,offset信息將消費的 offset 遷入到了 Kafka 一個名為 __consumer_offsets 的Topic中。
- 2、將消費的 offset 存放在(zai) Zookeeper 集群中。
- 3、將offset存放至第三(san)方存儲,如Redis, 為了(le)嚴格實(shi)現不(bu)重復消費
下面分別(bie)說一下這三種存儲方式的實現(xian)
4.1 __consumer_offsets [kafka]
下面的(de)代碼案例實現了test這一topic的(de)數據連續消(xiao)費
from kafka import KafkaConsumer class KafkaStreamTest: ''' This class consume all external Kafka topics''' def __init__(self): self.appName = "kafkatest" self.kafkaHosts = "192.168.4.201:6667,192.168.4.231:6667" self.kafkaAutoOffsetReset = "largest" self._kafka_topic = "test" def start(self): reload(sys) sys.setdefaultencoding('utf-8') elogging.debug(self.appName, elogging.normalCID(), "receiver starting") consumer = KafkaConsumer('test', bootstrap_servers=['192.168.4.201:6667','192.168.4.231:6667'], enable_auto_commit=True, auto_offset_reset='earliest') #consumer = KafkaConsumer('test', bootstrap_servers=['192.168.4.201:6667', '192.168.4.231:6667'], auto_offset_reset='earliest') while True: # The definition of KafkaMessage: # KafkaMessage = namedtuple("KafkaMessage", # ["topic", "partition", "offset", "key", "value"]) kafkaMsg = consumer.next() # for debug print kafkaMsg.topic, kafkaMsg.partition, kafkaMsg.offset, kafkaMsg.key, kafkaMsg.value if __name__ =="__main__": test = KafkaStreamTest() test.start()
enable_auto_commit (bool) – If True , the consumer’s offset will be periodically committed in the background. Default: True設置為true,表示offset自動托管到kafka內部的一個特定名稱為__consumer_offsets的topic
auto_offset_reset:What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
- earliest: automatically reset the offset to the earliest offset
- latest: automatically reset the offset to the latest offset
只有當offset不存在的時候,才用latest或者earliest
其他詳(xiang)細內容(rong)請參看
//stackoverflow.com/questions/35432326/how-to-get-latest-offset-for-a-partition-for-a-kafka-topic
Kafka 如何讀取offset topic內容 (__consumer_offsets)
4.2 zookeeper
請參考
4.3 Redis[推(tui)薦]
import os import sys sys.path.append("..") sys.path.append(sys.argv[0][:sys.argv[0].rfind(os.path.join('com','ericsson'))]) import copy import traceback import redis from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext, DStream from pyspark.sql import SQLContext import simplejson as json from com.ericsson.analytics.fms.common.common import ELogForDistributedApp,getSqlContextInstance from pyspark.streaming.kafka import KafkaUtils,TopicAndPartition from com.ericsson.analytics.oamf.client.logging import elogging from com.ericsson.analytics.fms.common.common import HDFSOperation class KafkaStreamTest: ''' This class consume all external Kafka topics, store the data into Parquet and send the data to internal Kafka topics ''' def __init__(self): self.appName = "kafkatest" self.kafkaHosts = "192.168.4.201:6667,192.168.4.231:6667" self.kafkaAutoOffsetReset = "largest" self.kafka_offset_redis_db = 6 self._kafka_topic = "test" self.redisHost = "192.168.4.231" self.redisPort = 6379 self.spark_batch_duration = 20 def createStreamingContext(self, sc): ssc = StreamingContext(sc, self.spark_batch_duration) ds = self.getDStreamFromKafka(ssc) if ds is not None: elogging.info(self.appName, elogging.normalCID(), "Kafka succeeded to getting the data") return ssc, ds else: return None, None def getDStreamFromKafka(self, ssc): kafkaParams = {"metadata.broker.list": self.kafkaHosts} elogging.debug(self.appName, elogging.normalCID(), kafkaParams) sc = ssc.sparkContext dstream = None try: redisConn = self.getRedisConnection(self.kafka_offset_redis_db) if redisConn.exists(self.appName): elogging.debug(self.appName, elogging.normalCID(), "key " + self.appName + " exists in redis") fromOffset = {} offsetListStr = redisConn.get(self.appName) offsetList = for offset in offsetList: elogging.debug(self.appName, elogging.normalCID(), str(offset)) topicPartion = TopicAndPartition(offset["topic"], offset["partition"]) fromOffset[topicPartion] = offset["untilOffset"] dstream = KafkaUtils.createDirectStream(ssc, [self._kafka_topic], kafkaParams, fromOffset) else: kafkaParams = {"metadata.broker.list": self.kafkaHosts, "auto.offset.reset": self.kafkaAutoOffsetReset} elogging.debug(self.appName, elogging.normalCID(), "key " + self.appName + " doesn't exist in redis") dstream = KafkaUtils.createDirectStream(ssc, [self._kafka_topic], kafkaParams) except: traceInfo = traceback.format_exc() elogging.error(self.appName, elogging.faultCID(), "failed to create DStream : " + traceInfo) return dstream def getRedisConnection(self, redisDB): try: pool = redis.ConnectionPool(host=self.redisHost, port=self.redisPort, db=redisDB) redisConn = redis.Redis(connection_pool=pool) except: traceInfo = traceback.format_exc() elogging.error(self.appName, elogging.faultCID(), "failed to create DStream : " + traceInfo) return None return redisConn def getOffSetRangesFromRDD(self, rdd): try: offsetRanges = rdd.offsetRanges() except: traceInfo = traceback.format_exc() elogging.error(self.appName, elogging.faultCID(), "failed to call rdd.offsetRanges() function : " + traceInfo) return None offsetList = [] for offset in offsetRanges: offsetList.append({"topic": offset.topic, "partition": offset.partition, "fromOffset": offset.fromOffset, "untilOffset": offset.untilOffset}) elogging.info(self.appName, elogging.normalCID(), "getOffSetRangesFromRDD, offsetList: " + str(offsetList)) return offsetList def saveOffSetRangesToRedis(self, offsetList): redisConn = self.getRedisConnection(self.kafka_offset_redis_db) if redisConn is not None: redisConn.set(self.appName, offsetList) elogging.info(self.appName, elogging.normalCID(), "saveOffSetRangesToRedis, offsetList : " + str(offsetList)) def handleMessages(self, runTime, rdd): elogging.debug(self.appName, elogging.normalCID(), "========= %s =========" % str(runTime)) offsetList = self.getOffSetRangesFromRDD(rdd) if offsetList is not None: self.saveOffSetRangesToRedis(offsetList) rddFilter = rdd.map(lambda p: p[1]) counts = rddFilter.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b) sqlContext = getSqlContextInstance(rddFilter.context) if counts is not None: df = sqlContext.createDataFrame(counts) df.show() def start(self): reload(sys) sys.setdefaultencoding('utf-8') sc = SparkContext(appName=self.appName) eloggingConfig = None try: eloggingConfig = HDFSOperation.getConfigFromHDFS(ELogForDistributedApp.LOGHDFSPATH, sc) elogging.initLogFromDict(eloggingConfig) except StandardError, se: pass elogging.debug(self.appName, elogging.normalCID(), "receiver starting") configInfoStr = 'kafkaHosts:' + str(self.kafkaHosts) + ', kafkaAutoOffsetReset:' + str(self.kafkaAutoOffsetReset) + \ ', kafka_offset_redis_db:' + str(self.kafka_offset_redis_db) + ', spark_batch_duration:' + str(self.spark_batch_duration) + \ ', redisHost:' + str(self.redisHost) + ', redisPort:' + str(self.redisPort) elogging.info(self.appName, elogging.normalCID(), configInfoStr) ssc, newDS = self.createStreamingContext(sc) if newDS is not None: newDS.foreachRDD(self.handleMessages) ssc.start() elogging.debug(self.appName, elogging.normalCID(), "StreamingContext start") ssc.awaitTermination() elogging.debug(self.appName, elogging.normalCID(), "receiver end") else: traceInfo = traceback.format_exc() elogging.error(self.appName, elogging.faultCID(), "Failed to create DStream " + traceInfo) if __name__ =="__main__": test = KafkaStreamTest() test.start()
5.總結(jie)
Kafka高(gao)效文件存儲(chu)設計特點
- Kafka把topic中(zhong)一個parition大(da)文件(jian)(jian)分(fen)成(cheng)多個小文件(jian)(jian)段,通過多個小文件(jian)(jian)段,就(jiu)容易定期清除或刪除已經消費完文件(jian)(jian),減少磁盤占用(yong)。
- 通過索引信息可以(yi)快速定位message和確定response的最大大小。
- 通過(guo)index元數據全部映射到memory,可(ke)以避免(mian)segment file的IO磁(ci)盤操作(zuo)。
- 通過(guo)索(suo)引文件(jian)稀疏(shu)存(cun)儲,可以大(da)幅降低index文件(jian)元數據(ju)占用空(kong)間(jian)大(da)小。
