1.概述
在《Kafka 消息監(jiān)控 - Kafka Eagle》一文中,簡(jiǎn)單的介紹了 Kafka Eagle這款監(jiān)控工具的作用,截圖預(yù)覽,以及使用詳情。今天筆者通過(guò)其源碼來(lái)解讀實(shí)現(xiàn)細(xì)節(jié)。目前該項(xiàng)目已托管于 Github 之上,作者編寫(xiě)了使用手冊(cè),告知使用者如何安裝,部署,啟動(dòng)該系統(tǒng)。但對(duì)于實(shí)現(xiàn)的細(xì)節(jié)并未在參考手冊(cè)中詳細(xì)指出。這里,筆者通過(guò)本篇博文,來(lái)詳細(xì)解讀其實(shí)現(xiàn)細(xì)節(jié)。相關(guān)資料文獻(xiàn)地址如下所示:
2.內(nèi)容
截止到版本 Kafka Eagle v1.1.1 支持監(jiān)控0.8.2.x(存放消費(fèi)信息于Zookeeper)以及 0.10.x(存放消費(fèi)信息于Kafka的topic中)。對(duì)于前者,從Zookeeper中獲取消息信息,難度不大,編寫(xiě)Zookeeper客戶(hù)端實(shí)現(xiàn)代碼即可,該版本在Zookeeper下的存儲(chǔ)結(jié)構(gòu)樹(shù)如下圖所示:
對(duì)于實(shí)現(xiàn)細(xì)節(jié),可使用ZkUtils工具類(lèi)來(lái)獲取相關(guān)數(shù)據(jù),以獲取消費(fèi)信息為例,代碼如下所示:
/** Obtaining kafka consumer information from zookeeper. */ public Map<String, List<String>> getConsumers(String clusterAlias) { ZkClient zkc = zkPool.getZkClient(clusterAlias); Map<String, List<String>> consumers = new HashMap<String, List<String>>(); try { Seq<String> subConsumerPaths = ZkUtils.getChildren(zkc, CONSUMERS_PATH); List<String> groups = JavaConversions.seqAsJavaList(subConsumerPaths); for (String group : groups) { String path = CONSUMERS_PATH + "/" + group + "/owners"; if (ZkUtils.pathExists(zkc, path)) { Seq<String> owners = ZkUtils.getChildren(zkc, path); List<String> ownersSerialize = JavaConversions.seqAsJavaList(owners); consumers.put(group, ownersSerialize); } else { LOG.error("Consumer Path[" + path + "] is not exist."); } } } catch (Exception ex) { LOG.error(ex.getMessage()); } finally { if (zkc != null) { zkPool.release(clusterAlias, zkc); zkc = null; } } return consumers; }
其他監(jiān)控信息可以按照Z(yǔ)ookeeper中結(jié)構(gòu)樹(shù)路徑獲取。如下圖所示:
然而,對(duì)于新版本,官方默認(rèn)將消費(fèi)信息遷移到Kafka的topic中,這樣原來(lái)的接口只能獲取topic,broker等信息,對(duì)于消費(fèi)的信息,我們需要從kafka中一個(gè)叫__consumer_offsets的topic中獲取。為了兼容0.8.2.x版本的Kafka,這里在Kafka Eagle中另外啟動(dòng)一個(gè)RpcServer來(lái)貢獻(xiàn)__consumer_offsets中的消費(fèi)信息。消費(fèi)__consumer_offsets這個(gè)topic時(shí),需要指定該內(nèi)部topic不暴露給consumer,將 exclude.internal.topics 設(shè)置為 false 即可。這樣我們通過(guò)一個(gè) kafka.eagle.offset.storage 開(kāi)關(guān)來(lái)控制系統(tǒng)獲取監(jiān)控元數(shù)據(jù)的走向。獲取流程如下圖所示:
3.消費(fèi) Owner
當(dāng)消費(fèi)的信息存放于Zookeeper中,我們可以直接從consumer模塊下直接獲取對(duì)應(yīng)的Owner,但是在Kafka的Topic中,我們需要編碼來(lái)間接的獲取。這里,我們需要知道 Kafka 的Owner的組成規(guī)則,其規(guī)則由 Group+ConusmerHostAddress+Timespan+UUID+PartitionId組成,實(shí)現(xiàn)細(xì)節(jié)可參考源碼,界面展示如下圖所示:
4.Kafka SQL
關(guān)于Kafka SQL,旨在使用SQL來(lái)快速可視化Topic的相關(guān)信息,目前 Kafka SQL 實(shí)現(xiàn)的功能包含有展示某一個(gè)Topic的Partition,Offset,以及其對(duì)應(yīng)的消息記錄,若不加limit條件限制,默認(rèn)展示該Topic下最新的5000條記錄,詳細(xì)實(shí)現(xiàn)細(xì)節(jié),可參看源碼,預(yù)覽截圖如下所示:
查詢(xún)結(jié)果,如下圖所示:
5.多集群
Kafka Eagle 目前是支持多集群監(jiān)控,所謂多集群,是指多個(gè)Zookeeper集群下的Kafka集群,通過(guò)切換Session來(lái)管理不同的Zookeeper集群下的Kafka集群,細(xì)節(jié)參看源碼。管理界面如下圖所示:
6.總結(jié)
Kafka Eagle總體實(shí)現(xiàn)思路基本如上所述。針對(duì),Kafka 0.10.x版本,Kafka Eagle監(jiān)控部分模塊不展示的問(wèn)題,這里在啟動(dòng) Kafka Eagle之前,默認(rèn)啟動(dòng)一個(gè)系統(tǒng)consumer來(lái)消費(fèi)kafka.eagle該group下的__system.topic__,保證__consumer_offsets是有數(shù)據(jù)可供獲取的。
7.結(jié)束語(yǔ)
這篇博客就和大家分享到這里,如果大家在研究學(xué)習(xí)的過(guò)程當(dāng)中有什么問(wèn)題,可以加群進(jìn)行討論或發(fā)送郵件給我,我會(huì)盡我所能為您解答,與君共勉!