1.概述

  在《Kafka 消息監(jiān)控 - Kafka Eagle》一文中,簡(jiǎn)單的介紹了 Kafka Eagle這款監(jiān)控工具的作用,截圖預(yù)覽,以及使用詳情。今天筆者通過(guò)其源碼來(lái)解讀實(shí)現(xiàn)細(xì)節(jié)。目前該項(xiàng)目已托管于 Github 之上,作者編寫(xiě)了使用手冊(cè),告知使用者如何安裝,部署,啟動(dòng)該系統(tǒng)。但對(duì)于實(shí)現(xiàn)的細(xì)節(jié)并未在參考手冊(cè)中詳細(xì)指出。這里,筆者通過(guò)本篇博文,來(lái)詳細(xì)解讀其實(shí)現(xiàn)細(xì)節(jié)。相關(guān)資料文獻(xiàn)地址如下所示:

2.內(nèi)容

  截止到版本 Kafka Eagle v1.1.1 支持監(jiān)控0.8.2.x(存放消費(fèi)信息于Zookeeper)以及 0.10.x(存放消費(fèi)信息于Kafka的topic中)。對(duì)于前者,從Zookeeper中獲取消息信息,難度不大,編寫(xiě)Zookeeper客戶(hù)端實(shí)現(xiàn)代碼即可,該版本在Zookeeper下的存儲(chǔ)結(jié)構(gòu)樹(shù)如下圖所示:

萬(wàn)碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開(kāi)發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

對(duì)于實(shí)現(xiàn)細(xì)節(jié),可使用ZkUtils工具類(lèi)來(lái)獲取相關(guān)數(shù)據(jù),以獲取消費(fèi)信息為例,代碼如下所示:

萬(wàn)碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開(kāi)發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

/** Obtaining kafka consumer information from zookeeper. */
    public Map<String, List<String>> getConsumers(String clusterAlias) {
        ZkClient zkc = zkPool.getZkClient(clusterAlias);
        Map<String, List<String>> consumers = new HashMap<String, List<String>>();        try {
            Seq<String> subConsumerPaths = ZkUtils.getChildren(zkc, CONSUMERS_PATH);
            List<String> groups = JavaConversions.seqAsJavaList(subConsumerPaths);            for (String group : groups) {
                String path = CONSUMERS_PATH + "/" + group + "/owners";                if (ZkUtils.pathExists(zkc, path)) {
                    Seq<String> owners = ZkUtils.getChildren(zkc, path);
                    List<String> ownersSerialize = JavaConversions.seqAsJavaList(owners);
                    consumers.put(group, ownersSerialize);
                } else {
                    LOG.error("Consumer Path[" + path + "] is not exist.");
                }
            }
        } catch (Exception ex) {
            LOG.error(ex.getMessage());
        } finally {            if (zkc != null) {
                zkPool.release(clusterAlias, zkc);
                zkc = null;
            }
        }        return consumers;
    }

萬(wàn)碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開(kāi)發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

其他監(jiān)控信息可以按照Z(yǔ)ookeeper中結(jié)構(gòu)樹(shù)路徑獲取。如下圖所示:

萬(wàn)碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開(kāi)發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

然而,對(duì)于新版本,官方默認(rèn)將消費(fèi)信息遷移到Kafka的topic中,這樣原來(lái)的接口只能獲取topic,broker等信息,對(duì)于消費(fèi)的信息,我們需要從kafka中一個(gè)叫__consumer_offsets的topic中獲取。為了兼容0.8.2.x版本的Kafka,這里在Kafka Eagle中另外啟動(dòng)一個(gè)RpcServer來(lái)貢獻(xiàn)__consumer_offsets中的消費(fèi)信息。消費(fèi)__consumer_offsets這個(gè)topic時(shí),需要指定該內(nèi)部topic不暴露給consumer,將 exclude.internal.topics 設(shè)置為 false 即可。這樣我們通過(guò)一個(gè) kafka.eagle.offset.storage 開(kāi)關(guān)來(lái)控制系統(tǒng)獲取監(jiān)控元數(shù)據(jù)的走向。獲取流程如下圖所示:

萬(wàn)碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開(kāi)發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

3.消費(fèi) Owner

  當(dāng)消費(fèi)的信息存放于Zookeeper中,我們可以直接從consumer模塊下直接獲取對(duì)應(yīng)的Owner,但是在Kafka的Topic中,我們需要編碼來(lái)間接的獲取。這里,我們需要知道 Kafka 的Owner的組成規(guī)則,其規(guī)則由 Group+ConusmerHostAddress+Timespan+UUID+PartitionId組成,實(shí)現(xiàn)細(xì)節(jié)可參考源碼,界面展示如下圖所示:

萬(wàn)碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開(kāi)發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

4.Kafka SQL

  關(guān)于Kafka SQL,旨在使用SQL來(lái)快速可視化Topic的相關(guān)信息,目前 Kafka SQL 實(shí)現(xiàn)的功能包含有展示某一個(gè)Topic的Partition,Offset,以及其對(duì)應(yīng)的消息記錄,若不加limit條件限制,默認(rèn)展示該Topic下最新的5000條記錄,詳細(xì)實(shí)現(xiàn)細(xì)節(jié),可參看源碼,預(yù)覽截圖如下所示:

萬(wàn)碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開(kāi)發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

查詢(xún)結(jié)果,如下圖所示:

萬(wàn)碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開(kāi)發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

5.多集群

  Kafka Eagle 目前是支持多集群監(jiān)控,所謂多集群,是指多個(gè)Zookeeper集群下的Kafka集群,通過(guò)切換Session來(lái)管理不同的Zookeeper集群下的Kafka集群,細(xì)節(jié)參看源碼。管理界面如下圖所示:

萬(wàn)碼學(xué)堂,電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),Java培訓(xùn),JavaEE開(kāi)發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

6.總結(jié)

  Kafka Eagle總體實(shí)現(xiàn)思路基本如上所述。針對(duì),Kafka 0.10.x版本,Kafka Eagle監(jiān)控部分模塊不展示的問(wèn)題,這里在啟動(dòng) Kafka Eagle之前,默認(rèn)啟動(dòng)一個(gè)系統(tǒng)consumer來(lái)消費(fèi)kafka.eagle該group下的__system.topic__,保證__consumer_offsets是有數(shù)據(jù)可供獲取的。

7.結(jié)束語(yǔ)

  這篇博客就和大家分享到這里,如果大家在研究學(xué)習(xí)的過(guò)程當(dāng)中有什么問(wèn)題,可以加群進(jìn)行討論或發(fā)送郵件給我,我會(huì)盡我所能為您解答,與君共勉!