1.概述
目前,隨著大數據的浪潮,Kafka 被越來越多的企業(yè)所認可,如今的Kafka已發(fā)展到0.10.x,其優(yōu)秀的特性也帶給我們解決實際業(yè)務的方案。對于數據分流來說,既可以分流到離線存儲平臺(HDFS),離線計算平臺(Hive倉庫),也可以分流實時流水計算(Storm,Spark)等,同樣也可以分流到海量數據查詢(HBase),或是及時查詢(ElasticSearch)。而今天筆者給大家分享的就是Kafka 分流數據到 ElasticSearch。
2.內容
我們知道,ElasticSearch是有其自己的套件的,簡稱ELK,即ElasticSearch,Logstash以及Kibana。ElasticSearch負責存儲,Logstash負責收集數據來源,Kibana負責可視化數據,分工明確。想要分流Kafka中的消息數據,可以使用Logstash的插件直接消費,但是需要我們編寫復雜的過濾條件,和特殊的映射處理,比如系統(tǒng)保留的`_uid`字段等需要我們額外的轉化。今天我們使用另外一種方式來處理數據,使用Kafka的消費API和ES的存儲API來處理分流數據。通過編寫Kafka消費者,消費對應的業(yè)務數據,將消費的數據通過ES存儲API,通過創(chuàng)建對應的索引的,存儲到ES中。其流程如下圖所示:
上圖可知,消費收集的數據,通過ES提供的存儲接口進行存儲。存儲的數據,這里我們可以規(guī)劃,做定時調度。最后,我們可以通過Kibana來可視化ES中的數據,對外提供業(yè)務調用接口,進行數據共享。
3.實現
下面,我們開始進行實現細節(jié)處理,這里給大家提供實現的核心代碼部分,實現代碼如下所示:
3.1 定義ES格式
我們以插件的形式進行消費,從Kafka到ES的數據流向,只需要定義插件格式,如下所示:
{ "job": { "content": { "reader": { "name": "kafka", "parameter": {