ConsumerConfig.scala 儲存Consumer的配置

按照我的理解,0.10的Kafka沒有專門的SimpleConsumer,仍然是沿用0.8版本的。

1.從poll開始

消費的規(guī)則如下:

  • 一個partition只能被同一個ConsumersGroup的一個線程所消費.

  • 線程數小于partition數,某些線程會消費多個partition.

  • 線程數等于partition數,一個線程正好消費一個線程.

  • 當添加消費者線程時,會觸發(fā)rebalance,partition的分配發(fā)送變化.

  • 同一個partition的offset保證消費有序,不同的partition消費不保證順序.

Consumers編程的用法:

private final KafkaConsumer<Long, String> consumer; // 與Kafka進行通信的consumer...
consumer = new KafkaConsumer<Long, String>(props);
consumer.subscribe(Collections.singletonList(this.topic));
ConsumerRecords<Long, String> records = consumer.poll(512);
...

consumer,是一個純粹的單線程程序,后面所講的所有機制(包括coordinator,rebalance, heartbeat等),都是在這個單線程的poll函數里面完成的。也因此,在consumer的代碼內部,沒有鎖的出現。

1.1包括的組件

從KafkaConsumer的構造函數可以看出,KafkaConsumer有以下幾個核心部件:

  • Metadata: 存儲Topic/Partion與broker的映射關系

  • NetworkClient:網絡層 A network client for asynchronous request/response network i/o.

  • ConsumerNetworkClient: Higher level consumer access to the network layer //對NetworkClient的封裝,非線程安全

  • ConsumerCoordinator:只是client端的類,只是和服務端的GroupCoordinator通信的介質。(broker端的Coordinator 負責reblance、Offset提交、心跳)

  • SubscriptionState: consumer的Topic、Partition的offset狀態(tài)維護

  • Fetcher: manage the fetching process with the brokers. //獲取消息

后面會分組件講解Consumers的工作流程

1.2 Consumer消費者的工作過程:

  1. 在consumer啟動時或者coordinator節(jié)點故障轉移時,consumer發(fā)送ConsumerMetadataRequest給任意一個brokers。在ConsumerMetadataResponse中,它接收對應的Consumer Group所屬的Coordinator的位置信息。

  2. Consumer連接Coordinator節(jié)點,并發(fā)送HeartbeatRequest。如果返回的HeartbeatResponse中返回IllegalGeneration錯誤碼,說明協調節(jié)點已經在初始化平衡。消費者就會停止抓取數據,提交offsets,發(fā)送JoinGroupRequest給協調節(jié)點。在JoinGroupResponse,它接收消費者應該擁有的topic-partitions列表以及當前Consumer Group的新的generation編號。這個時候Consumer Group管理已經完成,Consumer就可以開始fetch數據,并為它擁有的partitions提交offsets。

  3. 如果HeartbeatResponse沒有錯誤返回,Consumer會從它上次擁有的partitions列表繼續(xù)抓取數據,這個過程是不會被中斷的。


2 設計

2.0 MetaData

見Producer里面的分析。

補充一下,KafkaConsumer、KafkaProducer都是在構造函數中獲取metadata信息,通過調用metadata.update方法來獲取信息。

2.1 coordinator 為什么,做什么

1.去zookeeper依賴 -- 為什么

  • 在0.9以前的client api中,consumer是要依賴Zookeeper的。因為同一個consumer group中的所有consumer需要進行協同,這與后面要講的rebalance有關。(ConsumerConnector、KafkaStream、ConsumerIterator) -- package kafka.consumer

  • 0.9之后新的consumer不依賴與Zookeeper,一個consumerGroup內的consumer由Coordinator管理.(KafkaConsumer) -- package org.apache.kafka.clients.consumer

為什么?后面講

提問:為什么在一個group內部,1個parition只能被1個consumer擁有?

2.coordinator協議/partition分配問題

給定一個topic,有4個partition: p0, p1, p2, p3, 一個group有3個consumer: c0, c1, c2。

  • 那么,如果按RangeAssignor策略,分配結果是:
    c0: p0, c1: p1, c2: p2, p3

  • 如果按RoundRobinAssignor策略:
    c0: p1, p3, c1: p1, c2: p2

  • partition.assignment.strategy=RangeAssignor,默認值

(到底是哪種分配狀態(tài)呢)
那這整個分配過程是如何進行的呢?見下圖所示:
平面設計培訓,網頁設計培訓,美工培訓,游戲開發(fā),動畫培訓

3步分配過程

1. 步驟1:對于每1個consumer group,Kafka集群為其從broker集群中選擇一個broker作為其coordinator。因此,第1步就是找到這個coordinator。(1個consumer group對應一個coordinattor)

GroupCoordinatorRequest: GCR,由ConsumerNetworkClient發(fā)送請求去尋找coordinator。

2. 步驟2:找到coordinator之后,發(fā)送JoinGroup請求
consumer在這里會被劃分leader、follower(無責任的說:選擇第一個consumer)

  • leader作用:perform the leader synchronization and send back the assignment for the group(負責發(fā)送partition分配的結果)

  • follower作用:send follower's sync group with an empty assignment

3. 步驟3:JoinGroup返回之后,發(fā)送SyncGroup,得到自己所分配到的partition
SyncGroupRequest

  • consumer leader發(fā)送 SyncGroupRequest給Coordinator,Coordinator回給它null

  • follower發(fā)送 null的 SyncGroupRequest 給Coordinator,Coordinator回給它partition分配的結果。

注意,在上面3步中,有一個關鍵點:

  • partition的分配策略和分配結果其實是由client決定的,而不是由coordinator決定的。什么意思呢?在第2步,所有consumer都往coordinator發(fā)送JoinGroup消息之后,coordinator會指定其中一個consumer作為leader,其他consumer作為follower。

  • 然后由這個leader進行partition分配。

  • 然后在第3步,leader通過SyncGroup消息,把分配結果發(fā)給coordinator,其他consumer也發(fā)送SyncGroup消息,獲得這個分配結果。

接下來就到Fetcher拉取數據了

2.2 Fetcher

四個步驟

  1. 步驟0:獲取consumer的offset

  2. 步驟1:生成FetchRequest,并放入發(fā)送隊列

  3. 步驟2:網絡poll

  4. 步驟3:獲取結果

1.獲取consumer的offset

當consumer初次啟動的時候,面臨的一個首要問題就是:從offset為多少的位置開始消費。

poll之前,給集群發(fā)送請求,讓集群告知客戶端,當前該TopicPartition的offset是多少。通過SubscriptionState來實現, 通過ConsumerCoordinator

if (!subscriptions.hasAllFetchPositions())            updateFetchPositions(this.subscriptions.missingFetchPositions());

核心是:向Coordinator發(fā)了一個OffsetFetchRequest,并且是同步調用,直到獲取到初始的offset,再開始接下來的poll.(也就是說Offset的信息如果存在Kafka里,是存在GroupCoordinator里面)

consumer的每個TopicPartition都有了初始的offset,接下來就可以進行不斷循環(huán)取消息了,這也就是Fetch的過程:

2.生成FetchRequest,并放入發(fā)送隊列 -- fetcher.initFetches(cluster)

核心就是生成FetchRequest: 假設一個consumer訂閱了3個topic: t0, t1, t2,為其分配的partition分別是: t0: p0; t1: p1, p2; t2: p2

即總共4個TopicPartition,即t0p0, t0p1, t1p1, t2p2。這4個TopicPartition可能分布在2臺機器n0, n1上面: n0: t0p0, t1p1 n1: t0p1, t2p2

則會分別針對每臺機器生成一個FetchRequest,即Map<Node, FetchRequest>。所以會有一個方法把所有屬于同一個Node的TopicPartition放在一起,生成一個FetchRequest。

3.網絡poll

調用ConsumerNetworkClient.poll發(fā)送網絡請求。向服務器發(fā) 送響應請求和獲取服務器的響應。(默認值:executeDelayedTasks=true)

4.獲取結果 -- fetcher.fetchedRecords()

獲取Broker返回的Response,里面包含了List<ConsumerRecord> records

2.3 offset確認機制

  • 是否自動消費確認:由參數auto.xxx.commit=true控制

  • 手動消費:用于自定義Consumers的消費控制

下面從自動消費確認來分析,Offset自動確認是由ConsumerCoordinatorAutoCommitTask來實現的。

其調用在ConsumerNetworkClient的 DelayedTaskQueue delayedTasks里面,然后被周期性的調用。 周期性的發(fā)送確認消息,類似HeartBeat,其實現機制也就是前面所講的DelayedQueue + DelayedTask.

確認一次:offset的提交

poll函數中的注釋:
// execute delayed tasks (e.g. autocommits and heartbeats) prior to fetching records

  • 可以這樣理解:第二次poll調用的時候,提交上一次poll的offset和心跳發(fā)送。

  • 先提交offset,再去拉取record。那么這次Offset其實是上一次poll的Record的offset。

  • 因此,當你把按照下面的邏輯寫程序的時候,可能會導致Consumer與Coordinator的心跳超時。

    while(true) {
    consumer.poll();do process message // 假如這個耗時過長,那么這個consumer就無法發(fā)送心跳給coordinator,導致它錯誤認為這個consumer失去聯系了,引起不必要的rebalance。槽糕的情況下,會丟重復消費數據。}

    因此,有必要把offset的提交單獨拿出來做一個線程。

到這里,就把整個Consumer的流程走完了。

2.4 rebalance機制-- 作為一種補充機制,談談原理就好

http://www.cnblogs.com/byrhuangqiang/p/6372600.html