Kafka 如何讀取offset topic內(nei)容 (__consumer_offsets)
眾所周知,由于Zookeeper并(bing)不適(shi)合大(da)批(pi)量的(de)頻繁寫(xie)入操作,新版Kafka已(yi)推薦(jian)將consumer的(de)位移(yi)信息(xi)保存在Kafka內部的(de)topic中(zhong),即__consumer_offsets topic,并(bing)且默認(ren)提供了kafka_consumer_groups.sh腳本供用戶(hu)查(cha)看consumer信息(xi)。
不(bu)過依(yi)然(ran)有(you)很多用戶希望了解__consumer_offsets topic內部到(dao)底(di)保(bao)存了什么信(xin)息,特別是想查(cha)詢某些consumer group的位移是如何在該topic中保(bao)存的。針對(dui)這些問題,本(ben)文將結合一個實例探討如何使用kafka-simple-consumer-shell腳本(ben)來(lai)查(cha)詢該內部topic。
1. 創建topic “test”
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 3 --partitions 3
2. 使用(yong)kafka-console-producer.sh腳本生產消息
由于默認沒有指定key,所以根據round-robin方式,消息分布到不同的分區上。 (本例中生產了64條消息)
3. 驗證(zheng)消息生產成功(gong)
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test --time -1
結果輸(shu)出表明(ming)64條消息全部生產成功!
test:2:21
test:1:21
test:0:22
4. 創建一個console consumer group
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic test --from-beginning --new-consumer
5. 獲取該consumer group的group id(后面需(xu)要根據該id查詢它的位移信(xin)息)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --list --new-consumer
輸出: console-consumer-46965 (記住這個id!)
6. 查詢__consumer_offsets topic所(suo)有內(nei)容(rong)
注意:運行下面命令前先要在consumer.properties中設置exclude.internal.topics=false
0.11.0.0之(zhi)前版本
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
0.11.0.0之后(hou)版本(含)
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
默認(ren)情(qing)況下__consumer_offsets有50個分區,如果你的(de)系統中consumer group也(ye)很多(duo)(duo)的(de)話,那(nei)么這個命令的(de)輸出結果會很多(duo)(duo)。
7. 計(ji)算指定consumer group在__consumer_offsets topic中分區信息
這(zhe)時候就用到了第5步獲(huo)取(qu)的(de)group.id(本例(li)中是(shi)console-consumer-46965)。Kafka會使用下面公式計(ji)算(suan)該group位移保存在__consumer_offsets的(de)哪個分區上:
Math.abs(groupID.hashCode()) % numPartitions
所以在本例中,對應的分(fen)區=Math.abs("console-consumer-46965".hashCode()) % 50 = 11,即__consumer_offsets的分(fen)區11保存了(le)這(zhe)個consumer group的位移信息,下面讓我們驗證一下。
8. 獲取指定(ding)consumer group的位移信息
0.11.0.0版本之前
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
0.11.0.0版本以后(含)
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
下面是輸出結(jie)果:
... [console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092279434,ExpirationTime 1479178679434] [console-consumer-46965,test,1]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246] [console-consumer-46965,test,0]::[OffsetMetadata[22,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246] [console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246] [console-consumer-46965,test,1]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436] [console-consumer-46965,test,0]::[OffsetMetadata[22,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436] [console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436] ...
上圖(tu)可(ke)見,該consumer group果(guo)然保存在分區11上,且位(wei)(wei)(wei)(wei)移(yi)(yi)信息都是對的(這里(li)(li)的位(wei)(wei)(wei)(wei)移(yi)(yi)信息是已消費的位(wei)(wei)(wei)(wei)移(yi)(yi),嚴格來(lai)說(shuo)不是第3步中的位(wei)(wei)(wei)(wei)移(yi)(yi)。由于(yu)我的consumer已經(jing)消費完(wan)了(le)所有的消息,所以這里(li)(li)的位(wei)(wei)(wei)(wei)移(yi)(yi)與第3步中的位(wei)(wei)(wei)(wei)移(yi)(yi)相同)。另外,可以看到__consumer_offsets topic的每一日志(zhi)項(xiang)的格(ge)式都是(shi):[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]
okay,寫(xie)到此你(ni)應該已經知道如何查(cha)(cha)詢__consumer_offsets topic的內(nei)容了吧。希望本文對你(ni)有所幫助。(Kafka當(dang)然(ran)還提供了Java APIs用于查(cha)(cha)詢,具體使(shi)用方法不(bu)在這里贅述了,有興(xing)趣的可以看。)