1.概述
目前,隨著大數(shù)據(jù)的浪潮,Kafka 被越來(lái)越多的企業(yè)所認(rèn)可,如今的Kafka已發(fā)展到0.10.x,其優(yōu)秀的特性也帶給我們解決實(shí)際業(yè)務(wù)的方案。對(duì)于數(shù)據(jù)分流來(lái)說(shuō),既可以分流到離線存儲(chǔ)平臺(tái)(HDFS),離線計(jì)算平臺(tái)(Hive倉(cāng)庫(kù)),也可以分流實(shí)時(shí)流水計(jì)算(Storm,Spark)等,同樣也可以分流到海量數(shù)據(jù)查詢(xún)(HBase),或是及時(shí)查詢(xún)(ElasticSearch)。而今天筆者給大家分享的就是Kafka 分流數(shù)據(jù)到 ElasticSearch。
2.內(nèi)容
我們知道,ElasticSearch是有其自己的套件的,簡(jiǎn)稱(chēng)ELK,即ElasticSearch,Logstash以及Kibana。ElasticSearch負(fù)責(zé)存儲(chǔ),Logstash負(fù)責(zé)收集數(shù)據(jù)來(lái)源,Kibana負(fù)責(zé)可視化數(shù)據(jù),分工明確。想要分流Kafka中的消息數(shù)據(jù),可以使用Logstash的插件直接消費(fèi),但是需要我們編寫(xiě)復(fù)雜的過(guò)濾條件,和特殊的映射處理,比如系統(tǒng)保留的`_uid`字段等需要我們額外的轉(zhuǎn)化。今天我們使用另外一種方式來(lái)處理數(shù)據(jù),使用Kafka的消費(fèi)API和ES的存儲(chǔ)API來(lái)處理分流數(shù)據(jù)。通過(guò)編寫(xiě)Kafka消費(fèi)者,消費(fèi)對(duì)應(yīng)的業(yè)務(wù)數(shù)據(jù),將消費(fèi)的數(shù)據(jù)通過(guò)ES存儲(chǔ)API,通過(guò)創(chuàng)建對(duì)應(yīng)的索引的,存儲(chǔ)到ES中。其流程如下圖所示:
上圖可知,消費(fèi)收集的數(shù)據(jù),通過(guò)ES提供的存儲(chǔ)接口進(jìn)行存儲(chǔ)。存儲(chǔ)的數(shù)據(jù),這里我們可以規(guī)劃,做定時(shí)調(diào)度。最后,我們可以通過(guò)Kibana來(lái)可視化ES中的數(shù)據(jù),對(duì)外提供業(yè)務(wù)調(diào)用接口,進(jìn)行數(shù)據(jù)共享。
3.實(shí)現(xiàn)
下面,我們開(kāi)始進(jìn)行實(shí)現(xiàn)細(xì)節(jié)處理,這里給大家提供實(shí)現(xiàn)的核心代碼部分,實(shí)現(xiàn)代碼如下所示:
3.1 定義ES格式
我們以插件的形式進(jìn)行消費(fèi),從Kafka到ES的數(shù)據(jù)流向,只需要定義插件格式,如下所示:
{ "job": { "content": { "reader": { "name": "kafka", "parameter": {