滑動窗口在監(jiān)控和統(tǒng)計應(yīng)用的場景比較廣泛,比如每隔一段時間(10s)統(tǒng)計最近30s的請求量或者異常次數(shù),根據(jù)請求或者異常次數(shù)采取相應(yīng)措施。在storm1.0版本之前,沒有提供關(guān)于滑動窗口的實現(xiàn),需要開發(fā)者自己實現(xiàn)滑動窗口的功能(storm1.0以前實現(xiàn)滑動窗口的實現(xiàn)原理可以自行百度)。
原文和作者一起討論:http://www.cnblogs.com/intsmaze/p/6481588.html
微信:intsmaze
這里主要演示在storm1.0以后如何通過繼承storm1.0提供的類來快速開發(fā)出窗口滑動的功能。窗口可以從時間或數(shù)量上來劃分,由如下兩個因素決定:窗口的長度,可以是時間間隔或Tuple數(shù)量;滑動間隔(sliding Interval),可以是時間間隔或Tuple數(shù)量。比如:每兩秒統(tǒng)計最近6秒的請求數(shù)量;每接收2個Tuple就統(tǒng)計最近接收的6個Tuple的平均值......。
storm1.0支持的時間和數(shù)量的排列組合有如下:
withWindow(Count windowLength, Count slidingInterval)
每收到slidingInterval條數(shù)據(jù)統(tǒng)計最近的windowLength條數(shù)據(jù)。
withWindow(Count windowLength)
每收到1條數(shù)據(jù)統(tǒng)計最近的windowLength條數(shù)據(jù)。
withWindow(Count windowLength, Duration slidingInterval)
每過slidingInterval秒統(tǒng)計最近的windowLength條數(shù)據(jù)。
withWindow(Duration windowLength, Count slidingInterval)
每收到slidingInterval條數(shù)據(jù)統(tǒng)計最近的windowLength秒的數(shù)據(jù)。
withWindow(Duration windowLength, Duration slidingInterval)
每過slidingInterval秒統(tǒng)計最近的windowLength秒的數(shù)據(jù)。
public withWindow(Duration windowLength)
每收到1條數(shù)據(jù)統(tǒng)計最近的windowLength秒的數(shù)據(jù)。
接下來,簡單的演示如何使用storm1.0實現(xiàn)滑動窗口的功能,先編寫spout類,RandomSentenceSpout負責發(fā)送一個整形數(shù)值,數(shù)值每次發(fā)送都會自動加一,且RandomSentenceSpout固定每隔兩秒向bolt發(fā)送一次數(shù)據(jù)。RandomSentenceSpout和前面關(guān)于spout的講解一樣。
1.public class RandomSentenceSpout extends BaseRichSpout { 2. 3. private static final long serialVersionUID = 5028304756439810609L; 4. 5. private SpoutOutputCollector collector; 6. 7. int intsmaze=0; 8. 9. public void declareOutputFields(OutputFieldsDeclarer declarer) { 10. declarer.declare(new Fields("intsmaze")); 11. } 12. 13. public void open(Map conf, TopologyContext context, 14. SpoutOutputCollector collector) { 15. this.collector = collector; 16. } 17. 18. public void nextTuple() { 19. System.out.println("發(fā)送數(shù)據(jù):"+intsmaze); 20. collector.emit(new Values(intsmaze++)); 21. try { 22. Thread.sleep(2000); 23.// Thread.sleep(1000); 24. } catch (InterruptedException e) { 25. e.printStackTrace(); 26. } 27. } }
滑動窗口的邏輯實現(xiàn)的重點是bolt類,這里我們編寫SlidingWindowBolt類讓它繼承一個新的類名為BaseWindowedBolt來獲得窗口計數(shù)的功能。BaseWindowedBolt和前面的BaseBaseBolt和BaseWindowedBolt提供的方法名都一樣,只是execute方法的參數(shù)類型為TupleWindow,TupleWindow參數(shù)里面裝載了一個窗口長度類的tuple數(shù)據(jù)。通過對TupleWindow遍歷,我們可以計算這一個窗口內(nèi)tuple數(shù)的平均值或總和等指標。具體見代碼12-16行,統(tǒng)計了一個窗口內(nèi)的數(shù)值型數(shù)據(jù)的總和。
1.public class SlidingWindowBolt extends BaseWindowedBolt { 2. 3. private OutputCollector collector; 4. 5. @Override 6. public void prepare(Map stormConf, TopologyContext context, 7. OutputCollector collector) { 8. this.collector = collector; 9. } 10. 11. public void execute(TupleWindow inputWindow) { 12. int sum=0; 13. System.out.print("一個窗口內(nèi)的數(shù)據(jù)"); 14. for(Tuple tuple: inputWindow.get()) { 15. int str=(Integer) tuple.getValueByField("intsmaze"); 16. System.out.print(" "+str); 17. sum+=str; 18. } 19. System.out.println("======="+sum); 20. // collector.emit(new Values(sum)); 21. } 22. 23. @Override 24. public void declareOutputFields(OutputFieldsDeclarer declarer) { 25.// declarer.declare(new Fields("count")); 26. } }
我們已經(jīng)實現(xiàn)了窗口計數(shù)的邏輯代碼,現(xiàn)在我們需要提供topology來指明各個組件的關(guān)系,以及指定SlidingWindowBolt的窗口的組合,這里我們演示了如何每兩秒統(tǒng)計最近6秒的數(shù)值總和,如果注釋掉10-13行代碼,去掉5-8行的注釋,這個topology就是告訴SlidingWindowBolt每接收到兩條tuple就統(tǒng)計最近接收到的6條tuple的數(shù)值的總和。
1.public class WindowsTopology { 2. 3. public static void main(String[] args) throws Exception { 4. TopologyBuilder builder = new TopologyBuilder(); 5. builder.setSpout("spout1", new RandomSentenceSpout(), 1); 6.// builder.setBolt("slidingwindowbolt", new SlidingWindowBolt() 7.// .withWindow(new Count(6), new Count(2)),1) 8.// .shuffleGrouping("spout"); 9.//滑窗 窗口長度:tuple數(shù), 滑動間隔: tuple數(shù) 每收到2條數(shù)據(jù)統(tǒng)計當前6條數(shù)據(jù)的總和。 10. 11. builder.setBolt("slidingwindowbolt", new SlidingWindowBolt() 12. .withWindow(new Duration(6, TimeUnit.SECONDS), 13. new Duration(2, TimeUnit.SECONDS)),1) 14. .shuffleGrouping("spout");//每兩秒統(tǒng)計最近6秒的數(shù)據(jù) 15. 16. Config conf = new Config(); 17. conf.setNumWorkers(1); 18. LocalCluster cluster = new LocalCluster(); 19. cluster.submitTopology("word-count", conf, builder.createTopology()); 20. } }
這里演示的是bolt節(jié)點并發(fā)度為1的窗口功能,實際生產(chǎn)中,因為數(shù)據(jù)量很大,往往將bolt節(jié)點的并發(fā)度設(shè)置為多個,這個時候我們的SlidingWindowBolt就無法統(tǒng)計出一個窗口的數(shù)值總和了。因為每一個bolt的并行節(jié)點只能統(tǒng)計自己一個窗口接收到數(shù)據(jù)的總和,無法統(tǒng)計出一個窗口內(nèi)全局數(shù)據(jù)的總和,借助redis來實現(xiàn)是可以的,但是必須引入redis的事務(wù)機制或者借助分布式鎖,否則會出現(xiàn)臟數(shù)據(jù)的情況。在這里我們介紹另一種實現(xiàn)方式就是靈活的使用storm提供的窗口功能,只是窗口的tuple數(shù)。
仍然是使用上面提供的類,只是我們增加一個bolt類,來統(tǒng)計每個SlidingWindowBolt節(jié)點發(fā)送給它的數(shù)值。
1.public class CountWord extends BaseWindowedBolt{ 2. 3. private static final long serialVersionUID = -5283595260540124273L; 4. 5. private OutputCollector collector; 6. 7. public void prepare(Map stormConf, TopologyContext context 8. , OutputCollector collector) { 9. this.collector = collector; 10. } 11. 12. public void execute(TupleWindow inputWindow) { 13. int sum=0; 14. for(Tuple tuple: inputWindow.get()) { 15. int i=(Integer) tuple.getValueByField("count"); 16. System.out.println("接收到一個bolt的總和值為:"+i); 17. sum+=i; 18. } 19. System.out.println("一個窗口內(nèi)的總值為:"+sum); 20. } 21. 22. public void declareOutputFields(OutputFieldsDeclarer declarer) { 23. } }
然后我們注釋RandomSentenceSpout第22行代碼,取消對23行代碼的注釋,方便觀察結(jié)果。去掉SlidingWindowBolt類20和25行代碼。
topology啟動類如下:
1.public class WindowsTopology { 2. 3. public static void main(String[] args) throws Exception { 4. TopologyBuilder builder = new TopologyBuilder(); 5. builder.setSpout("spout", new RandomSentenceSpout(), 1); 6. 7. builder.setBolt("slidingwindowbolt", new SlidingWindowBolt() 8. .withWindow(new Duration(6, TimeUnit.SECONDS), 9. new Duration(2, TimeUnit.SECONDS)),2) 10. .shuffleGrouping("spout");//每兩秒統(tǒng)計最近6秒的數(shù)據(jù) 11. 12. builder.setBolt("countwordbolt", new CountWord() 13. .withWindow(new Count(2), new Count(2)),1) 14. .shuffleGrouping("slidingwindowbolt"); 15. //每收到2條tuple就統(tǒng)計最近兩條統(tǒng)的數(shù)據(jù) 16. Config conf = new Config(); 17. conf.setNumWorkers(1); 18. LocalCluster cluster = new LocalCluster(); 19. cluster.submitTopology("word-count", conf, builder.createTopology()); 20. } }
http://www.cnblogs.com/intsmaze/p/6481588.html