1.概述

  目前,Kafka 官網(wǎng)最新版[0.10.1.1],已默認(rèn)將消費(fèi)的 offset 遷入到了 Kafka 一個(gè)名為 __consumer_offsets 的Topic中。其實(shí),早在 0.8.2.2 版本,已支持存入消費(fèi)的 offset 到Topic中,只是那時(shí)候默認(rèn)是將消費(fèi)的 offset 存放在 Zookeeper 集群中。那現(xiàn)在,官方默認(rèn)將消費(fèi)的offset存儲(chǔ)在 Kafka 的Topic中,同時(shí),也保留了存儲(chǔ)在 Zookeeper 的接口,通過 offsets.storage 屬性來進(jìn)行設(shè)置。

2.內(nèi)容

  其實(shí),官方這樣推薦,也是有其道理的。之前版本,Kafka其實(shí)存在一個(gè)比較大的隱患,就是利用 Zookeeper 來存儲(chǔ)記錄每個(gè)消費(fèi)者/組的消費(fèi)進(jìn)度。雖然,在使用過程當(dāng)中,JVM幫助我們完成了自一些優(yōu)化,但是消費(fèi)者需要頻繁的去與 Zookeeper 進(jìn)行交互,而利用ZKClient的API操作Zookeeper頻繁的Write其本身就是一個(gè)比較低效的Action,對(duì)于后期水平擴(kuò)展也是一個(gè)比較頭疼的問題。如果期間 Zookeeper 集群發(fā)生變化,那 Kafka 集群的吞吐量也跟著受影響。

  在此之后,官方其實(shí)很早就提出了遷移到 Kafka 的概念,只是,之前是一直默認(rèn)存儲(chǔ)在 Zookeeper集群中,需要手動(dòng)的設(shè)置,如果,對(duì) Kafka 的使用不是很熟悉的話,一般我們就接受了默認(rèn)的存儲(chǔ)(即:存在 ZK 中)。在新版 Kafka 以及之后的版本,Kafka 消費(fèi)的offset都會(huì)默認(rèn)存放在 Kafka 集群中的一個(gè)叫 __consumer_offsets 的topic中。

  當(dāng)然,其實(shí)她實(shí)現(xiàn)的原理也讓我們很熟悉,利用 Kafka 自身的 Topic,以消費(fèi)的Group,Topic,以及Partition做為組合 Key。所有的消費(fèi)offset都提交寫入到上述的Topic中。因?yàn)檫@部分消息是非常重要,以至于是不能容忍丟數(shù)據(jù)的,所以消息的 acking 級(jí)別設(shè)置為了 -1,生產(chǎn)者等到所有的 ISR 都收到消息后才會(huì)得到 ack(數(shù)據(jù)安全性極好,當(dāng)然,其速度會(huì)有所影響)。所以 Kafka 又在內(nèi)存中維護(hù)了一個(gè)關(guān)于 Group,Topic 和 Partition 的三元組來維護(hù)最新的 offset 信息,消費(fèi)者獲取最新的offset的時(shí)候會(huì)直接從內(nèi)存中獲取。

3.實(shí)現(xiàn)

  那我們?nèi)绾螌?shí)現(xiàn)獲取這部分消費(fèi)的 offset,我們可以在內(nèi)存中定義一個(gè)Map集合,來維護(hù)消費(fèi)中所捕捉到 offset,如下所示:

protected static Map<GroupTopicPartition, OffsetAndMetadata> offsetMap = new ConcurrentHashMap<>();

  然后,我們通過一個(gè)監(jiān)聽線程來更新內(nèi)存中的Map,代碼如下所示:

萬碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

private static synchronized void startOffsetListener(ConsumerConnector consumerConnector) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(consumerOffsetTopic, new Integer(1));
        KafkaStream<byte[], byte[]> offsetMsgStream = consumerConnector.createMessageStreams(topicCountMap).get(consumerOffsetTopic).get(0);

        ConsumerIterator<byte[], byte[]> it = offsetMsgStream.iterator();        while (true) {
            MessageAndMetadata<byte[], byte[]> offsetMsg = it.next();            if (ByteBuffer.wrap(offsetMsg.key()).getShort() < 2) {                try {
                    GroupTopicPartition commitKey = readMessageKey(ByteBuffer.wrap(offsetMsg.key()));                    if (offsetMsg.message() == null) {                        continue;
                    }
                    OffsetAndMetadata commitValue = readMessageValue(ByteBuffer.wrap(offsetMsg.message()));
                    offsetMap.put(commitKey, commitValue);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

萬碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

  在拿到這部分更新后的offset數(shù)據(jù),我們可以通過 RPC 將這部分?jǐn)?shù)據(jù)共享出去,讓客戶端獲取這部分?jǐn)?shù)據(jù)并可視化。RPC 接口如下所示:

萬碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

namespace java org.smartloli.kafka.eagle.ipc

service KafkaOffsetServer{
    string query(1:string group,2:string topic,3:i32 partition),
    string getOffset(),
    string sql(1:string sql),
    string getConsumer(),
    string getActiverConsumer()
}

萬碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

  這里,如果我們不想寫接口來操作 offset,可以通過 SQL 來操作消費(fèi)的 offset 數(shù)組,使用方式如下所示:

  • 引入依賴JAR

<dependency>
    <groupId>org.smartloli</groupId>
    <artifactId>jsql-client</artifactId>
    <version>1.0.0</version></dependency>
  • 使用接口

JSqlUtils.query(tabSchema, tableName, dataSets, sql);

  tabSchema:表結(jié)構(gòu);tableName:表名;dataSets:數(shù)據(jù)集;sql:操作的SQL語句。

4.預(yù)覽

  消費(fèi)者預(yù)覽如下圖所示:

萬碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

  正在消費(fèi)的關(guān)系圖如下所示:

萬碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

  消費(fèi)詳細(xì) offset 如下所示:

萬碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

  消費(fèi)和生產(chǎn)的速率圖,如下所示:

萬碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

5.總結(jié)

  這里,說明一下,當(dāng) offset 存入到 Kafka 的topic中后,消費(fèi)線程ID信息并沒有記錄,不過,我們通過閱讀Kafka消費(fèi)線程ID的組成規(guī)則后,可以手動(dòng)生成,其消費(fèi)線程ID由:Group+ConsumerLocalAddress+Timespan+UUID(8bit)+PartitionId,由于消費(fèi)者在其他節(jié)點(diǎn),我們暫時(shí)無法確定ConsumerLocalAddress。最后,歡迎大家使用 Kafka 集群監(jiān)控 ——[ Kafka Eagle ],[ 操作手冊(cè) ]。

6.結(jié)束語

  這篇博客就和大家分享到這里,如果大家在研究學(xué)習(xí)的過程當(dāng)中有什么問題,可以加群進(jìn)行討論或發(fā)送郵件給我,我會(huì)盡我所能為您解答,與君共勉!