1.概述

  目前,隨著大數(shù)據(jù)的浪潮,Kafka 被越來越多的企業(yè)所認可,如今的Kafka已發(fā)展到0.10.x,其優(yōu)秀的特性也帶給我們解決實際業(yè)務的方案。對于數(shù)據(jù)分流來說,既可以分流到離線存儲平臺(HDFS),離線計算平臺(Hive倉庫),也可以分流實時流水計算(Storm,Spark)等,同樣也可以分流到海量數(shù)據(jù)查詢(HBase),或是及時查詢(ElasticSearch)。而今天筆者給大家分享的就是Kafka 分流數(shù)據(jù)到 ElasticSearch。

2.內容

  我們知道,ElasticSearch是有其自己的套件的,簡稱ELK,即ElasticSearch,Logstash以及Kibana。ElasticSearch負責存儲,Logstash負責收集數(shù)據(jù)來源,Kibana負責可視化數(shù)據(jù),分工明確。想要分流Kafka中的消息數(shù)據(jù),可以使用Logstash的插件直接消費,但是需要我們編寫復雜的過濾條件,和特殊的映射處理,比如系統(tǒng)保留的`_uid`字段等需要我們額外的轉化。今天我們使用另外一種方式來處理數(shù)據(jù),使用Kafka的消費API和ES的存儲API來處理分流數(shù)據(jù)。通過編寫Kafka消費者,消費對應的業(yè)務數(shù)據(jù),將消費的數(shù)據(jù)通過ES存儲API,通過創(chuàng)建對應的索引的,存儲到ES中。其流程如下圖所示:

iOS培訓,Swift培訓,蘋果開發(fā)培訓,移動開發(fā)培訓

  上圖可知,消費收集的數(shù)據(jù),通過ES提供的存儲接口進行存儲。存儲的數(shù)據(jù),這里我們可以規(guī)劃,做定時調度。最后,我們可以通過Kibana來可視化ES中的數(shù)據(jù),對外提供業(yè)務調用接口,進行數(shù)據(jù)共享。

3.實現(xiàn)

  下面,我們開始進行實現(xiàn)細節(jié)處理,這里給大家提供實現(xiàn)的核心代碼部分,實現(xiàn)代碼如下所示:

3.1 定義ES格式

  我們以插件的形式進行消費,從Kafka到ES的數(shù)據(jù)流向,只需要定義插件格式,如下所示:

iOS培訓,Swift培訓,蘋果開發(fā)培訓,移動開發(fā)培訓

{    "job": {        "content": {            "reader": {                "name": "kafka",                "parameter": {                    "topic": "kafka_es_client_error",                    "groupid": "es2",                    "bootstrapServers": "k1:9094,k2:9094,k3:9094"
                },                "threads": 6
            },            "writer": {                "name": "es",                "parameter": {                    "host": [                        "es1:9300,es2:9300,es3:9300"
                    ],                    "index": "client_error_%s",                    "type": "client_error"
                }
            }
        }
    }
}

iOS培訓,Swift培訓,蘋果開發(fā)培訓,移動開發(fā)培訓

  這里處理消費存儲的方式,將讀和寫的源分開,配置各自屬性即可。

3.2 數(shù)據(jù)存儲

  這里,我們通過每天建立索引進行存儲,便于業(yè)務查詢,實現(xiàn)細節(jié)如下所示:

iOS培訓,Swift培訓,蘋果開發(fā)培訓,移動開發(fā)培訓

public class EsProducer {    private final static Logger LOG = LoggerFactory.getLogger(EsProducer.class);    private final KafkaConsumer<String, String> consumer;    private ExecutorService executorService;    private Configuration conf = null;    private static int counter = 0;    public EsProducer() {
        String root = System.getProperty("user.dir") + "/conf/";
        String path = SystemConfigUtils.getProperty("kafka.x.plugins.exec.path");
        conf = Configuration.from(new File(root + path));
        Properties props = new Properties();
        props.put("bootstrap.servers", conf.getString("job.content.reader.parameter.bootstrapServers"));
        props.put("group.id", conf.getString("job.content.reader.parameter.groupid"));
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(conf.getString("job.content.reader.parameter.topic")));
    }    public void execute() {
        executorService = Executors.newFixedThreadPool(conf.getInt("job.content.reader.threads"));        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);            if (null != records) {
                executorService.submit(new KafkaConsumerThread(records, consumer));
            }
        }
    }    public void shutdown() {        try {            if (consumer != null) {
                consumer.close();
            }            if (executorService != null) {
                executorService.shutdown();
            }            if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
                LOG.error("Shutdown kafka consumer thread timeout.");
            }
        } catch (InterruptedException ignored) {
            Thread.currentThread().interrupt();
        }
    }    class KafkaConsumerThread implements Runnable {        private ConsumerRecords<String, String> records;        public KafkaConsumerThread(ConsumerRecords<String, String> records, KafkaConsumer<String, String> consumer) {            this.records = records;
        }

        @Override        public void run() {
            String index = conf.getString("job.content.writer.parameter.index");
            String type = conf.getString("job.content.writer.parameter.type");            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);                for (ConsumerRecord<String, String> record : partitionRecords) {
                    JSONObject json = JSON.parseObject(record.value());
                    List<Map<String, Object>> list = new ArrayList<>();
                    Map<String, Object> map = new HashMap<>();
                    index = String.format(index, CalendarUtils.timeSpan2EsDay(json.getLongValue("_tm") * 1000L));                    
                    if (counter < 10) {
                        LOG.info("Index : " + index);
                        counter++;
                    }                    
                    for (String key : json.keySet()) {                        if ("_uid".equals(key)) {
                            map.put("uid", json.get(key));
                        } else {
                            map.put(key, json.get(key));
                        }
                        list.add(map);
                    }
                    
                    EsUtils.write2Es(index, type, list);
                }
            }
        }

    }

}

iOS培訓,Swift培訓,蘋果開發(fā)培訓,移動開發(fā)培訓

  這里消費的數(shù)據(jù)源就處理好了,接下來,開始ES的存儲,實現(xiàn)代碼如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class EsUtils {
 
    private static TransportClient client = null;
 
    static {
        if (client == null) {
            client = new PreBuiltTransportClient(Settings.EMPTY);
        }
        String root = System.getProperty("user.dir") + "/conf/";
        String path = SystemConfigUtils.getProperty("kafka.x.plugins.exec.path");
        Configuration conf = Configuration.from(new File(root + path));
        List<Object> hosts = conf.getList("job.content.writer.parameter.host");
        for (Object object : hosts) {
            try {
                client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(object.toString().split(":")[0]), Integer.parseInt(object.toString().split(":")[1])));
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
 
    public static void write2Es(String index, String type, List<Map<String, Object>> dataSets) {
 
        BulkRequestBuilder bulkRequest = client.prepareBulk();
        for (Map<String, Object> dataSet : dataSets) {
            bulkRequest.add(client.prepareIndex(index, type).setSource(dataSet));
        }
 
        bulkRequest.execute().actionGet();
        // if (client != null) {
        // client.close();
        // }
    }
 
    public static void close() {
        if (client != null) {
            client.close();
        }
    }  
}

  這里,我們利用BulkRequestBuilder進行批量寫入,減少頻繁寫入率。

4.調度

  存儲在ES中的數(shù)據(jù),如果不需要長期存儲,比如:我們只需要存儲及時查詢數(shù)據(jù)一個月,對于一個月以前的數(shù)據(jù)需要清除掉。這里,我們可以編寫腳本直接使用Crontab來進行簡單調用即可,腳本如下所示:

iOS培訓,Swift培訓,蘋果開發(fā)培訓,移動開發(fā)培訓

#!/bin/delete_es_by_day. kafka_error_client logsdate <Usage>: ./delete_es_by_day.sh kafka_error_client logsdate 30 </Usage>
index_name=$1daycolumn=$2savedays=$3format_day=$4if [ ! -n "$savedays" ]; then
  echo "Oops. The args is not right,please input again...."
  exit 1fiif [ ! -n "$format_day" ]; then
   format_day='%Y%m%d'fisevendayago=`date -d "-${savedays} day " +${format_day}`

curl -XDELETE "es1:9200/${index_name}/_query?pretty" -d "{        "query": {                "filtered": {                        "filter": {                                "bool": {                                        "must": {                                                "range": {                                                        "${daycolumn}": {                                                                "from": null,                                                                "to": ${sevendayago},                                                                "include_lower": true,                                                                "include_upper": true
                                                        }
                                                }
                                        }
                                }
                        }
                }
        }
}"echo "Finished."

iOS培訓,Swift培訓,蘋果開發(fā)培訓,移動開發(fā)培訓

然后,在Crontab中進行定時調度即可。

5.總結

  這里,我們在進行數(shù)據(jù)寫入ES的時候,需要注意,有些字段是ES保留字段,比如`_uid`,這里我們需要轉化,不然寫到ES的時候,會引發(fā)沖突導致異常,最終寫入失敗。

6.結束語

  這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發(fā)送郵件給我,我會盡我所能為您解答,與君共勉

聯(lián)系方式: 
郵箱:smartloli.org@gmail.com 
Twitter:https://twitter.com/smartloli 
QQ群(Hadoop - 交流社區(qū)1):424769183 
溫馨提示:請大家加群的時候寫上加群理由(姓名+公司/學校),方便管理員審核,謝謝! 

熱愛生活,享受編程,與君共勉!


作者:哥不是小蘿莉 [關于我][犒賞

出處:http://www.cnblogs.com/smartloli/

轉載請注明出處,謝謝合作!

http://www.cnblogs.com/smartloli/p/6978645.html