中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

Kafka Offset Storage

1.概述

  目前,Kafka 官(guan)網(wang)最新版[],已默(mo)認將消費(fei)的(de)(de) offset 遷入到了 Kafka 一個名(ming)為(wei) __consumer_offsets 的(de)(de)Topic中。其實,早在(zai)(zai)(zai) 0.8.2.2 版本,已支持存(cun)入消費(fei)的(de)(de) offset 到Topic中,只是(shi)那(nei)時(shi)候(hou)默(mo)認是(shi)將消費(fei)的(de)(de) offset 存(cun)放在(zai)(zai)(zai) Zookeeper 集群(qun)中。那(nei)現在(zai)(zai)(zai),官(guan)方默(mo)認將消費(fei)的(de)(de)offset存(cun)儲(chu)在(zai)(zai)(zai) Kafka 的(de)(de)Topic中,同時(shi),也保(bao)留(liu)了存(cun)儲(chu)在(zai)(zai)(zai) Zookeeper 的(de)(de)接口,通(tong)過 offsets.storage 屬性來(lai)進(jin)行設置。

2.內容

  其(qi)實,官方這樣推薦,也(ye)是有其(qi)道理的(de)(de)(de)。之前版(ban)本(ben),Kafka其(qi)實存在(zai)一(yi)個比較(jiao)大的(de)(de)(de)隱患,就是利(li)用(yong) Zookeeper 來存儲記錄每個消費者/組(zu)的(de)(de)(de)消費進(jin)度(du)。雖然,在(zai)使用(yong)過程(cheng)當中,JVM幫(bang)助我們完(wan)成(cheng)了自一(yi)些優化(hua)(hua),但是消費者需(xu)要頻(pin)繁的(de)(de)(de)去與 Zookeeper 進(jin)行交互,而利(li)用(yong)ZKClient的(de)(de)(de)API操作Zookeeper頻(pin)繁的(de)(de)(de)Write其(qi)本(ben)身就是一(yi)個比較(jiao)低效的(de)(de)(de)Action,對于后期水平擴展也(ye)是一(yi)個比較(jiao)頭疼的(de)(de)(de)問題。如果(guo)期間 Zookeeper 集群(qun)發(fa)生變(bian)化(hua)(hua),那(nei) Kafka 集群(qun)的(de)(de)(de)吞吐(tu)量也(ye)跟著受影(ying)響(xiang)。

  在此之后,官(guan)方(fang)其實很早就(jiu)提(ti)出了遷移到 Kafka 的概念,只是(shi),之前是(shi)一(yi)(yi)直默(mo)認(ren)(ren)存儲(chu)在 Zookeeper集(ji)群中,需要手(shou)動的設(she)置,如果,對 Kafka 的使用不是(shi)很熟(shu)悉的話,一(yi)(yi)般我們就(jiu)接受了默(mo)認(ren)(ren)的存儲(chu)(即:存在 ZK 中)。在新版(ban) Kafka 以(yi)及之后的版(ban)本,Kafka 消費(fei)的offset都(dou)會默(mo)認(ren)(ren)存放在 Kafka 集(ji)群中的一(yi)(yi)個叫 __consumer_offsets 的topic中。

  當(dang)然(ran),其實(shi)她實(shi)現的(de)原(yuan)理(li)也(ye)讓我們很熟悉,利(li)用 Kafka 自身的(de) Topic,以消(xiao)費(fei)的(de)Group,Topic,以及Partition做為(wei)組合 Key。所(suo)有的(de)消(xiao)費(fei)offset都(dou)提交寫入到(dao)上述的(de)Topic中(zhong)。因為(wei)這部(bu)分(fen)消(xiao)息(xi)(xi)是非常重要,以至于(yu)是不能容忍丟數(shu)據的(de),所(suo)以消(xiao)息(xi)(xi)的(de) acking 級(ji)別設置(zhi)為(wei)了(le) -1,生產者等到(dao)所(suo)有的(de) ISR 都(dou)收(shou)到(dao)消(xiao)息(xi)(xi)后才會(hui)得到(dao) ack(數(shu)據安(an)全(quan)性(xing)極好,當(dang)然(ran),其速度會(hui)有所(suo)影響)。所(suo)以 Kafka 又在內存中(zhong)維護(hu)了(le)一(yi)個(ge)關于(yu) Group,Topic 和 Partition 的(de)三元組來維護(hu)最新的(de) offset 信息(xi)(xi),消(xiao)費(fei)者獲取最新的(de)offset的(de)時候會(hui)直接(jie)從內存中(zhong)獲取。

3.實現

  那我(wo)們如何實現獲取這部分消費的 offset,我(wo)們可以(yi)在內存中(zhong)定義一個Map集合,來維(wei)護消費中(zhong)所捕捉(zhuo)到(dao) offset,如下所示(shi):

protected static Map<GroupTopicPartition, OffsetAndMetadata> offsetMap = new ConcurrentHashMap<>();

  然后,我們通過(guo)一個監聽線程(cheng)來更新內存(cun)中的Map,代碼如下所(suo)示:

private static synchronized void startOffsetListener(ConsumerConnector consumerConnector) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(consumerOffsetTopic, new Integer(1));
        KafkaStream<byte[], byte[]> offsetMsgStream = consumerConnector.createMessageStreams(topicCountMap).get(consumerOffsetTopic).get(0);

        ConsumerIterator<byte[], byte[]> it = offsetMsgStream.iterator();
        while (true) {
            MessageAndMetadata<byte[], byte[]> offsetMsg = it.next();
            if (ByteBuffer.wrap(offsetMsg.key()).getShort() < 2) {
                try {
                    GroupTopicPartition commitKey = readMessageKey(ByteBuffer.wrap(offsetMsg.key()));
                    if (offsetMsg.message() == null) {
                        continue;
                    }
                    OffsetAndMetadata commitValue = readMessageValue(ByteBuffer.wrap(offsetMsg.message()));
                    offsetMap.put(commitKey, commitValue);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

  在拿(na)到這部分(fen)更新后的offset數據,我們(men)可(ke)以通(tong)過(guo) RPC 將這部分(fen)數據共享(xiang)出(chu)去(qu),讓客戶端(duan)獲取這部分(fen)數據并可(ke)視化。RPC 接(jie)口如下所示:

namespace java org.smartloli.kafka.eagle.ipc

service KafkaOffsetServer{
    string query(1:string group,2:string topic,3:i32 partition),
    string getOffset(),
    string sql(1:string sql),
    string getConsumer(),
    string getActiverConsumer()
}

  這里(li),如果我們不想寫接(jie)口來操(cao)(cao)作 offset,可以(yi)通過(guo) SQL 來操(cao)(cao)作消費的(de) offset 數組,使用(yong)方式如下所示:

  • 引入依賴JAR
<dependency>
    <groupId>org.smartloli</groupId>
    <artifactId>jsql-client</artifactId>
    <version>1.0.0</version>
</dependency>
  • 使用接口
JSqlUtils.query(tabSchema, tableName, dataSets, sql);

  tabSchema:表結構;tableName:表名(ming);dataSets:數據(ju)集;sql:操作的SQL語句。

4.預覽

  消費者(zhe)預覽如下圖所示(shi):

  正在消費(fei)的關系圖如下(xia)所示:

  消(xiao)費詳細 offset 如(ru)下所示:

  消費和生(sheng)產的速(su)率圖,如下所示:

5.總結

  這里,說明一下,當 offset 存入到 Kafka 的topic中后,消費(fei)(fei)線程(cheng)ID信息并(bing)沒有記錄,不(bu)過,我們通過閱讀Kafka消費(fei)(fei)線程(cheng)ID的組成規(gui)則后,可以手動生成,其(qi)消費(fei)(fei)線程(cheng)ID由(you):Group+ConsumerLocalAddress+Timespan+UUID(8bit)+PartitionId,由(you)于消費(fei)(fei)者(zhe)在(zai)其(qi)他(ta)節點,我們暫時無法確定ConsumerLocalAddress。最后,歡迎大家使用 Kafka 集群監控 ——[  ],[ ]。

6.結束語

  這篇博(bo)客(ke)就和大(da)家(jia)分享到這里,如(ru)果大(da)家(jia)在(zai)研(yan)究學(xue)習(xi)的過程當中有什么問題(ti),可以(yi)加(jia)群進行討論(lun)或(huo)發送郵(you)件給我(wo),我(wo)會盡(jin)我(wo)所能為(wei)您(nin)解答(da),與君共勉(mian)!

posted @ 2017-01-09 20:26  哥不是小蘿莉  閱讀(13747)  評論(0)    收藏  舉報