Kafka Offset Storage
1.概述
目前,Kafka 官(guan)網(wang)最新版[],已默(mo)認將消費(fei)的(de)(de) offset 遷入到了 Kafka 一個名(ming)為(wei) __consumer_offsets 的(de)(de)Topic中。其實,早在(zai)(zai)(zai) 0.8.2.2 版本,已支持存(cun)入消費(fei)的(de)(de) offset 到Topic中,只是(shi)那(nei)時(shi)候(hou)默(mo)認是(shi)將消費(fei)的(de)(de) offset 存(cun)放在(zai)(zai)(zai) Zookeeper 集群(qun)中。那(nei)現在(zai)(zai)(zai),官(guan)方默(mo)認將消費(fei)的(de)(de)offset存(cun)儲(chu)在(zai)(zai)(zai) Kafka 的(de)(de)Topic中,同時(shi),也保(bao)留(liu)了存(cun)儲(chu)在(zai)(zai)(zai) Zookeeper 的(de)(de)接口,通(tong)過 offsets.storage 屬性來(lai)進(jin)行設置。
2.內容
其(qi)實,官方這樣推薦,也(ye)是有其(qi)道理的(de)(de)(de)。之前版(ban)本(ben),Kafka其(qi)實存在(zai)一(yi)個比較(jiao)大的(de)(de)(de)隱患,就是利(li)用(yong) Zookeeper 來存儲記錄每個消費者/組(zu)的(de)(de)(de)消費進(jin)度(du)。雖然,在(zai)使用(yong)過程(cheng)當中,JVM幫(bang)助我們完(wan)成(cheng)了自一(yi)些優化(hua)(hua),但是消費者需(xu)要頻(pin)繁的(de)(de)(de)去與 Zookeeper 進(jin)行交互,而利(li)用(yong)ZKClient的(de)(de)(de)API操作Zookeeper頻(pin)繁的(de)(de)(de)Write其(qi)本(ben)身就是一(yi)個比較(jiao)低效的(de)(de)(de)Action,對于后期水平擴展也(ye)是一(yi)個比較(jiao)頭疼的(de)(de)(de)問題。如果(guo)期間 Zookeeper 集群(qun)發(fa)生變(bian)化(hua)(hua),那(nei) Kafka 集群(qun)的(de)(de)(de)吞吐(tu)量也(ye)跟著受影(ying)響(xiang)。
在此之后,官(guan)方(fang)其實很早就(jiu)提(ti)出了遷移到 Kafka 的概念,只是(shi),之前是(shi)一(yi)(yi)直默(mo)認(ren)(ren)存儲(chu)在 Zookeeper集(ji)群中,需要手(shou)動的設(she)置,如果,對 Kafka 的使用不是(shi)很熟(shu)悉的話,一(yi)(yi)般我們就(jiu)接受了默(mo)認(ren)(ren)的存儲(chu)(即:存在 ZK 中)。在新版(ban) Kafka 以(yi)及之后的版(ban)本,Kafka 消費(fei)的offset都(dou)會默(mo)認(ren)(ren)存放在 Kafka 集(ji)群中的一(yi)(yi)個叫 __consumer_offsets 的topic中。
當(dang)然(ran),其實(shi)她實(shi)現的(de)原(yuan)理(li)也(ye)讓我們很熟悉,利(li)用 Kafka 自身的(de) Topic,以消(xiao)費(fei)的(de)Group,Topic,以及Partition做為(wei)組合 Key。所(suo)有的(de)消(xiao)費(fei)offset都(dou)提交寫入到(dao)上述的(de)Topic中(zhong)。因為(wei)這部(bu)分(fen)消(xiao)息(xi)(xi)是非常重要,以至于(yu)是不能容忍丟數(shu)據的(de),所(suo)以消(xiao)息(xi)(xi)的(de) acking 級(ji)別設置(zhi)為(wei)了(le) -1,生產者等到(dao)所(suo)有的(de) ISR 都(dou)收(shou)到(dao)消(xiao)息(xi)(xi)后才會(hui)得到(dao) ack(數(shu)據安(an)全(quan)性(xing)極好,當(dang)然(ran),其速度會(hui)有所(suo)影響)。所(suo)以 Kafka 又在內存中(zhong)維護(hu)了(le)一(yi)個(ge)關于(yu) Group,Topic 和 Partition 的(de)三元組來維護(hu)最新的(de) offset 信息(xi)(xi),消(xiao)費(fei)者獲取最新的(de)offset的(de)時候會(hui)直接(jie)從內存中(zhong)獲取。
3.實現
那我(wo)們如何實現獲取這部分消費的 offset,我(wo)們可以(yi)在內存中(zhong)定義一個Map集合,來維(wei)護消費中(zhong)所捕捉(zhuo)到(dao) offset,如下所示(shi):
protected static Map<GroupTopicPartition, OffsetAndMetadata> offsetMap = new ConcurrentHashMap<>();
然后,我們通過(guo)一個監聽線程(cheng)來更新內存(cun)中的Map,代碼如下所(suo)示:
private static synchronized void startOffsetListener(ConsumerConnector consumerConnector) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(consumerOffsetTopic, new Integer(1)); KafkaStream<byte[], byte[]> offsetMsgStream = consumerConnector.createMessageStreams(topicCountMap).get(consumerOffsetTopic).get(0); ConsumerIterator<byte[], byte[]> it = offsetMsgStream.iterator(); while (true) { MessageAndMetadata<byte[], byte[]> offsetMsg = it.next(); if (ByteBuffer.wrap(offsetMsg.key()).getShort() < 2) { try { GroupTopicPartition commitKey = readMessageKey(ByteBuffer.wrap(offsetMsg.key())); if (offsetMsg.message() == null) { continue; } OffsetAndMetadata commitValue = readMessageValue(ByteBuffer.wrap(offsetMsg.message())); offsetMap.put(commitKey, commitValue); } catch (Exception e) { e.printStackTrace(); } } } }
在拿(na)到這部分(fen)更新后的offset數據,我們(men)可(ke)以通(tong)過(guo) RPC 將這部分(fen)數據共享(xiang)出(chu)去(qu),讓客戶端(duan)獲取這部分(fen)數據并可(ke)視化。RPC 接(jie)口如下所示:
namespace java org.smartloli.kafka.eagle.ipc service KafkaOffsetServer{ string query(1:string group,2:string topic,3:i32 partition), string getOffset(), string sql(1:string sql), string getConsumer(), string getActiverConsumer() }
這里(li),如果我們不想寫接(jie)口來操(cao)(cao)作 offset,可以(yi)通過(guo) SQL 來操(cao)(cao)作消費的(de) offset 數組,使用(yong)方式如下所示:
- 引入依賴JAR
<dependency> <groupId>org.smartloli</groupId> <artifactId>jsql-client</artifactId> <version>1.0.0</version> </dependency>
- 使用接口
JSqlUtils.query(tabSchema, tableName, dataSets, sql);
tabSchema:表結構;tableName:表名(ming);dataSets:數據(ju)集;sql:操作的SQL語句。
4.預覽
消費者(zhe)預覽如下圖所示(shi):

正在消費(fei)的關系圖如下(xia)所示:

消(xiao)費詳細 offset 如(ru)下所示:

消費和生(sheng)產的速(su)率圖,如下所示:

5.總結
這里,說明一下,當 offset 存入到 Kafka 的topic中后,消費(fei)(fei)線程(cheng)ID信息并(bing)沒有記錄,不(bu)過,我們通過閱讀Kafka消費(fei)(fei)線程(cheng)ID的組成規(gui)則后,可以手動生成,其(qi)消費(fei)(fei)線程(cheng)ID由(you):Group+ConsumerLocalAddress+Timespan+UUID(8bit)+PartitionId,由(you)于消費(fei)(fei)者(zhe)在(zai)其(qi)他(ta)節點,我們暫時無法確定ConsumerLocalAddress。最后,歡迎大家使用 Kafka 集群監控 ——[ ],[ ]。
6.結束語
這篇博(bo)客(ke)就和大(da)家(jia)分享到這里,如(ru)果大(da)家(jia)在(zai)研(yan)究學(xue)習(xi)的過程當中有什么問題(ti),可以(yi)加(jia)群進行討論(lun)或(huo)發送郵(you)件給我(wo),我(wo)會盡(jin)我(wo)所能為(wei)您(nin)解答(da),與君共勉(mian)!
郵箱:smartloli.org@gmail.com
QQ群(Hive與AI實戰【新群】):935396818
QQ群(Hadoop - 交流社區1):424769183
QQ群(Kafka并不難學):825943084
溫馨提示:請大家加群的時候寫上加群理由(姓名+公司/學校),方便管理員審核,謝謝!

