中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

Kafka 如何讀取offset topic內(nei)容 (__consumer_offsets)

  眾所周知,由于Zookeeper并(bing)不適(shi)合大(da)批(pi)量的(de)頻繁寫(xie)入操作,新版Kafka已(yi)推薦(jian)將consumer的(de)位移(yi)信息(xi)保存在Kafka內部的(de)topic中(zhong),即__consumer_offsets topic,并(bing)且默認(ren)提供了kafka_consumer_groups.sh腳本供用戶(hu)查(cha)看consumer信息(xi)。

  不(bu)過依(yi)然(ran)有(you)很多用戶希望了解__consumer_offsets topic內部到(dao)底(di)保(bao)存了什么信(xin)息,特別是想查(cha)詢某些consumer group的位移是如何在該topic中保(bao)存的。針對(dui)這些問題,本(ben)文將結合一個實例探討如何使用kafka-simple-consumer-shell腳本(ben)來(lai)查(cha)詢該內部topic。

1. 創建topic “test”

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 3 --partitions 3

2. 使用(yong)kafka-console-producer.sh腳本生產消息

  由于默認沒有指定key,所以根據round-robin方式,消息分布到不同的分區上。 (本例中生產了64條消息)

3. 驗證(zheng)消息生產成功(gong)

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test --time -1

結果輸(shu)出表明(ming)64條消息全部生產成功!

test:2:21

test:1:21

test:0:22

4. 創建一個console consumer group

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic test --from-beginning --new-consumer

5. 獲取該consumer group的group id(后面需(xu)要根據該id查詢它的位移信(xin)息)

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --list --new-consumer

輸出: console-consumer-46965  (記住這個id!)

6. 查詢__consumer_offsets topic所(suo)有內(nei)容(rong)

注意:運行下面命令前先要在consumer.properties中設置exclude.internal.topics=false

0.11.0.0之(zhi)前版本

bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

0.11.0.0之后(hou)版本(含)

bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

 

默認(ren)情(qing)況下__consumer_offsets有50個分區,如果你的(de)系統中consumer group也(ye)很多(duo)(duo)的(de)話,那(nei)么這個命令的(de)輸出結果會很多(duo)(duo)。

7. 計(ji)算指定consumer group在__consumer_offsets topic中分區信息

這(zhe)時候就用到了第5步獲(huo)取(qu)的(de)group.id(本例(li)中是(shi)console-consumer-46965)。Kafka會使用下面公式計(ji)算(suan)該group位移保存在__consumer_offsets的(de)哪個分區上:

Math.abs(groupID.hashCode()) % numPartitions

所以在本例中,對應的分(fen)區=Math.abs("console-consumer-46965".hashCode()) % 50 = 11,即__consumer_offsets的分(fen)區11保存了(le)這(zhe)個consumer group的位移信息,下面讓我們驗證一下。

8. 獲取指定(ding)consumer group的位移信息 

0.11.0.0版本之前

bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"

0.11.0.0版本以后(含)

bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

下面是輸出結(jie)果:

...
[console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092279434,ExpirationTime 1479178679434]
[console-consumer-46965,test,1]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246]
[console-consumer-46965,test,0]::[OffsetMetadata[22,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246]
[console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246]
[console-consumer-46965,test,1]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436]
[console-consumer-46965,test,0]::[OffsetMetadata[22,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436]
[console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436]
 ...

  上圖(tu)可(ke)見,該consumer group果(guo)然保存在分區11上,且位(wei)(wei)(wei)(wei)移(yi)(yi)信息都是對的(這里(li)(li)的位(wei)(wei)(wei)(wei)移(yi)(yi)信息是已消費的位(wei)(wei)(wei)(wei)移(yi)(yi),嚴格來(lai)說(shuo)不是第3步中的位(wei)(wei)(wei)(wei)移(yi)(yi)。由于(yu)我的consumer已經(jing)消費完(wan)了(le)所有的消息,所以這里(li)(li)的位(wei)(wei)(wei)(wei)移(yi)(yi)與第3步中的位(wei)(wei)(wei)(wei)移(yi)(yi)相同)。另外,可以看到__consumer_offsets topic的每一日志(zhi)項(xiang)的格(ge)式都是(shi):[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]

 

  okay,寫(xie)到此你(ni)應該已經知道如何查(cha)(cha)詢__consumer_offsets topic的內(nei)容了吧。希望本文對你(ni)有所幫助。(Kafka當(dang)然(ran)還提供了Java APIs用于查(cha)(cha)詢,具體使(shi)用方法不(bu)在這里贅述了,有興(xing)趣的可以看。)

posted @ 2016-11-14 11:51  huxihx  閱讀(87289)  評論(41)    收藏  舉報