滑動窗口在監(jiān)控和統(tǒng)計應(yīng)用的場景比較廣泛,比如每隔一段時間(10s)統(tǒng)計最近30s的請求量或者異常次數(shù),根據(jù)請求或者異常次數(shù)采取相應(yīng)措施。在storm1.0版本之前,沒有提供關(guān)于滑動窗口的實現(xiàn),需要開發(fā)者自己實現(xiàn)滑動窗口的功能(storm1.0以前實現(xiàn)滑動窗口的實現(xiàn)原理可以自行百度)。

原文和作者一起討論:http://www.cnblogs.com/intsmaze/p/6481588.html

微信:intsmaze

移動開發(fā)培訓,Android培訓,安卓培訓,手機開發(fā)培訓,手機維修培訓,手機軟件培訓

這里主要演示在storm1.0以后如何通過繼承storm1.0提供的類來快速開發(fā)出窗口滑動的功能。窗口可以從時間或數(shù)量上來劃分,由如下兩個因素決定:窗口的長度,可以是時間間隔或Tuple數(shù)量;滑動間隔(sliding Interval),可以是時間間隔或Tuple數(shù)量。比如:每兩秒統(tǒng)計最近6秒的請求數(shù)量;每接收2個Tuple就統(tǒng)計最近接收的6個Tuple的平均值......。

storm1.0支持的時間和數(shù)量的排列組合有如下:


withWindow(Count windowLength, Count slidingInterval)

  每收到slidingInterval條數(shù)據(jù)統(tǒng)計最近的windowLength條數(shù)據(jù)。

withWindow(Count windowLength)

  每收到1條數(shù)據(jù)統(tǒng)計最近的windowLength條數(shù)據(jù)。

withWindow(Count windowLength, Duration slidingInterval)

  每過slidingInterval秒統(tǒng)計最近的windowLength條數(shù)據(jù)。

withWindow(Duration windowLength, Count slidingInterval)

  每收到slidingInterval條數(shù)據(jù)統(tǒng)計最近的windowLength秒的數(shù)據(jù)。

withWindow(Duration windowLength, Duration slidingInterval)

  每過slidingInterval秒統(tǒng)計最近的windowLength秒的數(shù)據(jù)。

public withWindow(Duration windowLength)

  每收到1條數(shù)據(jù)統(tǒng)計最近的windowLength秒的數(shù)據(jù)。


接下來,簡單的演示如何使用storm1.0實現(xiàn)滑動窗口的功能,先編寫spout類,RandomSentenceSpout負責發(fā)送一個整形數(shù)值,數(shù)值每次發(fā)送都會自動加一,且RandomSentenceSpout固定每隔兩秒向bolt發(fā)送一次數(shù)據(jù)。RandomSentenceSpout和前面關(guān)于spout的講解一樣。

移動開發(fā)培訓,Android培訓,安卓培訓,手機開發(fā)培訓,手機維修培訓,手機軟件培訓

1.public class RandomSentenceSpout extends BaseRichSpout { 2. 3.    private static final long serialVersionUID = 5028304756439810609L;   4. 5.    private SpoutOutputCollector collector;   6. 7.    int intsmaze=0; 8. 9.    public void declareOutputFields(OutputFieldsDeclarer declarer) { 10.        declarer.declare(new Fields("intsmaze")); 11.    } 12. 13.    public void open(Map conf, TopologyContext context,  14.                          SpoutOutputCollector collector) { 15.        this.collector = collector; 16.    } 17. 18.    public void nextTuple() { 19.        System.out.println("發(fā)送數(shù)據(jù):"+intsmaze); 20.        collector.emit(new Values(intsmaze++)); 21.        try { 22.            Thread.sleep(2000); 23.//         Thread.sleep(1000); 24.        } catch (InterruptedException e) { 25.            e.printStackTrace(); 26.        } 27.    } }

移動開發(fā)培訓,Android培訓,安卓培訓,手機開發(fā)培訓,手機維修培訓,手機軟件培訓



滑動窗口的邏輯實現(xiàn)的重點是bolt類,這里我們編寫SlidingWindowBolt類讓它繼承一個新的類名為BaseWindowedBolt來獲得窗口計數(shù)的功能。BaseWindowedBolt和前面的BaseBaseBoltBaseWindowedBolt提供的方法名都一樣,只是execute方法的參數(shù)類型為TupleWindow,TupleWindow參數(shù)里面裝載了一個窗口長度類的tuple數(shù)據(jù)。通過對TupleWindow遍歷,我們可以計算這一個窗口內(nèi)tuple數(shù)的平均值或總和等指標。具體見代碼12-16行,統(tǒng)計了一個窗口內(nèi)的數(shù)值型數(shù)據(jù)的總和。

移動開發(fā)培訓,Android培訓,安卓培訓,手機開發(fā)培訓,手機維修培訓,手機軟件培訓

1.public class SlidingWindowBolt extends BaseWindowedBolt { 2. 3.    private OutputCollector collector; 4. 5.    @Override 6.    public void prepare(Map stormConf, TopologyContext context,  7.            OutputCollector collector) { 8.        this.collector = collector; 9.    } 10. 11.    public void execute(TupleWindow inputWindow) {         12.        int sum=0; 13.        System.out.print("一個窗口內(nèi)的數(shù)據(jù)"); 14.        for(Tuple tuple: inputWindow.get()) { 15.            int str=(Integer) tuple.getValueByField("intsmaze"); 16.            System.out.print(" "+str); 17.            sum+=str; 18.        } 19.        System.out.println("======="+sum); 20. //        collector.emit(new Values(sum)); 21.    } 22. 23.    @Override 24.    public void declareOutputFields(OutputFieldsDeclarer declarer) { 25.//       declarer.declare(new Fields("count")); 26.    } }

移動開發(fā)培訓,Android培訓,安卓培訓,手機開發(fā)培訓,手機維修培訓,手機軟件培訓



我們已經(jīng)實現(xiàn)了窗口計數(shù)的邏輯代碼,現(xiàn)在我們需要提供topology來指明各個組件的關(guān)系,以及指定SlidingWindowBolt的窗口的組合,這里我們演示了如何每兩秒統(tǒng)計最近6秒的數(shù)值總和,如果注釋掉10-13行代碼,去掉5-8行的注釋,這個topology就是告訴SlidingWindowBolt每接收到兩條tuple就統(tǒng)計最近接收到的6條tuple的數(shù)值的總和。

移動開發(fā)培訓,Android培訓,安卓培訓,手機開發(fā)培訓,手機維修培訓,手機軟件培訓

1.public class WindowsTopology { 2. 3.    public static void main(String[] args) throws Exception { 4.       TopologyBuilder builder = new TopologyBuilder(); 5.       builder.setSpout("spout1", new RandomSentenceSpout(), 1); 6.//       builder.setBolt("slidingwindowbolt", new SlidingWindowBolt() 7.//       .withWindow(new Count(6), new Count(2)),1) 8.//       .shuffleGrouping("spout"); 9.//滑窗 窗口長度:tuple數(shù), 滑動間隔: tuple數(shù) 每收到2條數(shù)據(jù)統(tǒng)計當前6條數(shù)據(jù)的總和。   10.      11.       builder.setBolt("slidingwindowbolt", new SlidingWindowBolt() 12.       .withWindow(new Duration(6, TimeUnit.SECONDS),  13.               new Duration(2, TimeUnit.SECONDS)),1) 14.       .shuffleGrouping("spout");//每兩秒統(tǒng)計最近6秒的數(shù)據(jù)        15. 16.       Config conf = new Config(); 17.       conf.setNumWorkers(1); 18.       LocalCluster cluster = new LocalCluster(); 19.       cluster.submitTopology("word-count", conf, builder.createTopology()); 20.   } }

移動開發(fā)培訓,Android培訓,安卓培訓,手機開發(fā)培訓,手機維修培訓,手機軟件培訓



這里演示的是bolt節(jié)點并發(fā)度為1的窗口功能,實際生產(chǎn)中,因為數(shù)據(jù)量很大,往往將bolt節(jié)點的并發(fā)度設(shè)置為多個,這個時候我們的SlidingWindowBolt就無法統(tǒng)計出一個窗口的數(shù)值總和了。因為每一個bolt的并行節(jié)點只能統(tǒng)計自己一個窗口接收到數(shù)據(jù)的總和,無法統(tǒng)計出一個窗口內(nèi)全局數(shù)據(jù)的總和,借助redis來實現(xiàn)是可以的,但是必須引入redis的事務(wù)機制或者借助分布式鎖,否則會出現(xiàn)臟數(shù)據(jù)的情況。在這里我們介紹另一種實現(xiàn)方式就是靈活的使用storm提供的窗口功能,只是窗口的tuple數(shù)。

仍然是使用上面提供的類,只是我們增加一個bolt類,來統(tǒng)計每個SlidingWindowBolt節(jié)點發(fā)送給它的數(shù)值。

移動開發(fā)培訓,Android培訓,安卓培訓,手機開發(fā)培訓,手機維修培訓,手機軟件培訓

1.public class CountWord extends BaseWindowedBolt{ 2.     3.    private static final long serialVersionUID = -5283595260540124273L; 4.     5.    private OutputCollector collector; 6.     7.    public void prepare(Map stormConf, TopologyContext context 8.                             , OutputCollector collector) { 9.        this.collector = collector; 10.    } 11.     12.    public void execute(TupleWindow inputWindow) { 13.         int sum=0; 14.         for(Tuple tuple: inputWindow.get()) { 15.             int i=(Integer) tuple.getValueByField("count"); 16.               System.out.println("接收到一個bolt的總和值為:"+i); 17.               sum+=i; 18.          } 19.         System.out.println("一個窗口內(nèi)的總值為:"+sum); 20.    } 21. 22.    public void declareOutputFields(OutputFieldsDeclarer declarer) { 23.    } }

移動開發(fā)培訓,Android培訓,安卓培訓,手機開發(fā)培訓,手機維修培訓,手機軟件培訓



然后我們注釋RandomSentenceSpout22行代碼,取消對23行代碼的注釋,方便觀察結(jié)果。去掉SlidingWindowBolt20和25行代碼。

topology啟動類如下:

移動開發(fā)培訓,Android培訓,安卓培訓,手機開發(fā)培訓,手機維修培訓,手機軟件培訓

1.public class WindowsTopology { 2. 3.    public static void main(String[] args) throws Exception { 4.       TopologyBuilder builder = new TopologyBuilder(); 5.       builder.setSpout("spout", new RandomSentenceSpout(), 1); 6.        7.       builder.setBolt("slidingwindowbolt", new SlidingWindowBolt() 8.       .withWindow(new Duration(6, TimeUnit.SECONDS),  9.               new Duration(2, TimeUnit.SECONDS)),2) 10.       .shuffleGrouping("spout");//每兩秒統(tǒng)計最近6秒的數(shù)據(jù) 11.        12.       builder.setBolt("countwordbolt", new CountWord() 13.       .withWindow(new Count(2), new Count(2)),1) 14.       .shuffleGrouping("slidingwindowbolt"); 15.       //每收到2條tuple就統(tǒng)計最近兩條統(tǒng)的數(shù)據(jù) 16.       Config conf = new Config(); 17.       conf.setNumWorkers(1); 18.       LocalCluster cluster = new LocalCluster(); 19.       cluster.submitTopology("word-count", conf, builder.createTopology()); 20.   } }

移動開發(fā)培訓,Android培訓,安卓培訓,手機開發(fā)培訓,手機維修培訓,手機軟件培訓

http://www.cnblogs.com/intsmaze/p/6481588.html