滑動(dòng)窗口在監(jiān)控和統(tǒng)計(jì)應(yīng)用的場(chǎng)景比較廣泛,比如每隔一段時(shí)間(10s)統(tǒng)計(jì)最近30s的請(qǐng)求量或者異常次數(shù),根據(jù)請(qǐng)求或者異常次數(shù)采取相應(yīng)措施。在storm1.0版本之前,沒(méi)有提供關(guān)于滑動(dòng)窗口的實(shí)現(xiàn),需要開發(fā)者自己實(shí)現(xiàn)滑動(dòng)窗口的功能(storm1.0以前實(shí)現(xiàn)滑動(dòng)窗口的實(shí)現(xiàn)原理可以自行百度)。
原文和作者一起討論:http://www.cnblogs.com/intsmaze/p/6481588.html
微信:intsmaze
這里主要演示在storm1.0以后如何通過(guò)繼承storm1.0提供的類來(lái)快速開發(fā)出窗口滑動(dòng)的功能。窗口可以從時(shí)間或數(shù)量上來(lái)劃分,由如下兩個(gè)因素決定:窗口的長(zhǎng)度,可以是時(shí)間間隔或Tuple數(shù)量;滑動(dòng)間隔(sliding Interval),可以是時(shí)間間隔或Tuple數(shù)量。比如:每?jī)擅虢y(tǒng)計(jì)最近6秒的請(qǐng)求數(shù)量;每接收2個(gè)Tuple就統(tǒng)計(jì)最近接收的6個(gè)Tuple的平均值......。
storm1.0支持的時(shí)間和數(shù)量的排列組合有如下:
withWindow(Count windowLength, Count slidingInterval)
每收到slidingInterval條數(shù)據(jù)統(tǒng)計(jì)最近的windowLength條數(shù)據(jù)。
withWindow(Count windowLength)
每收到1條數(shù)據(jù)統(tǒng)計(jì)最近的windowLength條數(shù)據(jù)。
withWindow(Count windowLength, Duration slidingInterval)
每過(guò)slidingInterval秒統(tǒng)計(jì)最近的windowLength條數(shù)據(jù)。
withWindow(Duration windowLength, Count slidingInterval)
每收到slidingInterval條數(shù)據(jù)統(tǒng)計(jì)最近的windowLength秒的數(shù)據(jù)。
withWindow(Duration windowLength, Duration slidingInterval)
每過(guò)slidingInterval秒統(tǒng)計(jì)最近的windowLength秒的數(shù)據(jù)。
public withWindow(Duration windowLength)
每收到1條數(shù)據(jù)統(tǒng)計(jì)最近的windowLength秒的數(shù)據(jù)。
接下來(lái),簡(jiǎn)單的演示如何使用storm1.0實(shí)現(xiàn)滑動(dòng)窗口的功能,先編寫spout類,RandomSentenceSpout負(fù)責(zé)發(fā)送一個(gè)整形數(shù)值,數(shù)值每次發(fā)送都會(huì)自動(dòng)加一,且RandomSentenceSpout固定每隔兩秒向bolt發(fā)送一次數(shù)據(jù)。RandomSentenceSpout和前面關(guān)于spout的講解一樣。
1.public class RandomSentenceSpout extends BaseRichSpout { 2. 3. private static final long serialVersionUID = 5028304756439810609L; 4. 5. private SpoutOutputCollector collector; 6. 7. int intsmaze=0; 8. 9. public void declareOutputFields(OutputFieldsDeclarer declarer) { 10. declarer.declare(new Fields("intsmaze")); 11. } 12. 13. public void open(Map conf, TopologyContext context, 14. SpoutOutputCollector collector) { 15. this.collector = collector; 16. } 17. 18. public void nextTuple() { 19. System.out.println("發(fā)送數(shù)據(jù):"+intsmaze); 20. collector.emit(new Values(intsmaze++)); 21. try { 22. Thread.sleep(2000); 23.// Thread.sleep(1000); 24. } catch (InterruptedException e) { 25. e.printStackTrace(); 26. } 27. } }
滑動(dòng)窗口的邏輯實(shí)現(xiàn)的重點(diǎn)是bolt類,這里我們編寫SlidingWindowBolt類讓它繼承一個(gè)新的類名為BaseWindowedBolt來(lái)獲得窗口計(jì)數(shù)的功能。BaseWindowedBolt和前面的BaseBaseBolt和BaseWindowedBolt提供的方法名都一樣,只是execute方法的參數(shù)類型為TupleWindow,TupleWindow參數(shù)里面裝載了一個(gè)窗口長(zhǎng)度類的tuple數(shù)據(jù)。通過(guò)對(duì)TupleWindow遍歷,我們可以計(jì)算這一個(gè)窗口內(nèi)tuple數(shù)的平均值或總和等指標(biāo)。具體見代碼12-16行,統(tǒng)計(jì)了一個(gè)窗口內(nèi)的數(shù)值型數(shù)據(jù)的總和。
1.public class SlidingWindowBolt extends BaseWindowedBolt { 2. 3. private OutputCollector collector; 4. 5. @Override 6. public void prepare(Map stormConf, TopologyContext context, 7. OutputCollector collector) { 8. this.collector = collector; 9. } 10. 11. public void execute(TupleWindow inputWindow) { 12. int sum=0; 13. System.out.print("一個(gè)窗口內(nèi)的數(shù)據(jù)"); 14. for(Tuple tuple: inputWindow.get()) { 15. int str=(Integer) tuple.getValueByField("intsmaze"); 16. System.out.print(" "+str); 17. sum+=str; 18. } 19. System.out.println("======="+sum); 20. // collector.emit(new Values(sum)); 21. } 22. 23. @Override 24. public void declareOutputFields(OutputFieldsDeclarer declarer) { 25.// declarer.declare(new Fields("count")); 26. } }
我們已經(jīng)實(shí)現(xiàn)了窗口計(jì)數(shù)的邏輯代碼,現(xiàn)在我們需要提供topology來(lái)指明各個(gè)組件的關(guān)系,以及指定SlidingWindowBolt的窗口的組合,這里我們演示了如何每?jī)擅虢y(tǒng)計(jì)最近6秒的數(shù)值總和,如果注釋掉10-13行代碼,去掉5-8行的注釋,這個(gè)topology就是告訴SlidingWindowBolt每接收到兩條tuple就統(tǒng)計(jì)最近接收到的6條tuple的數(shù)值的總和。
1.public class WindowsTopology { 2. 3. public static void main(String[] args) throws Exception { 4. TopologyBuilder builder = new TopologyBuilder(); 5. builder.setSpout("spout1", new RandomSentenceSpout(), 1); 6.// builder.setBolt("slidingwindowbolt", new SlidingWindowBolt() 7.// .withWindow(new Count(6), new Count(2)),1) 8.// .shuffleGrouping("spout"); 9.//滑窗 窗口長(zhǎng)度:tuple數(shù), 滑動(dòng)間隔: tuple數(shù) 每收到2條數(shù)據(jù)統(tǒng)計(jì)當(dāng)前6條數(shù)據(jù)的總和。 10. 11. builder.setBolt("slidingwindowbolt", new SlidingWindowBolt() 12. .withWindow(new Duration(6, TimeUnit.SECONDS), 13. new Duration(2, TimeUnit.SECONDS)),1) 14. .shuffleGrouping("spout");//每?jī)擅虢y(tǒng)計(jì)最近6秒的數(shù)據(jù) 15. 16. Config conf = new Config(); 17. conf.setNumWorkers(1); 18. LocalCluster cluster = new LocalCluster(); 19. cluster.submitTopology("word-count", conf, builder.createTopology()); 20. } }
這里演示的是bolt節(jié)點(diǎn)并發(fā)度為1的窗口功能,實(shí)際生產(chǎn)中,因?yàn)閿?shù)據(jù)量很大,往往將bolt節(jié)點(diǎn)的并發(fā)度設(shè)置為多個(gè),這個(gè)時(shí)候我們的SlidingWindowBolt就無(wú)法統(tǒng)計(jì)出一個(gè)窗口的數(shù)值總和了。因?yàn)槊恳粋€(gè)bolt的并行節(jié)點(diǎn)只能統(tǒng)計(jì)自己一個(gè)窗口接收到數(shù)據(jù)的總和,無(wú)法統(tǒng)計(jì)出一個(gè)窗口內(nèi)全局?jǐn)?shù)據(jù)的總和,借助redis來(lái)實(shí)現(xiàn)是可以的,但是必須引入redis的事務(wù)機(jī)制或者借助分布式鎖,否則會(huì)出現(xiàn)臟數(shù)據(jù)的情況。在這里我們介紹另一種實(shí)現(xiàn)方式就是靈活的使用storm提供的窗口功能,只是窗口的tuple數(shù)。
仍然是使用上面提供的類,只是我們?cè)黾右粋€(gè)bolt類,來(lái)統(tǒng)計(jì)每個(gè)SlidingWindowBolt節(jié)點(diǎn)發(fā)送給它的數(shù)值。
1.public class CountWord extends BaseWindowedBolt{ 2. 3. private static final long serialVersionUID = -5283595260540124273L; 4. 5. private OutputCollector collector; 6. 7. public void prepare(Map stormConf, TopologyContext context 8. , OutputCollector collector) { 9. this.collector = collector; 10. } 11. 12. public void execute(TupleWindow inputWindow) { 13. int sum=0; 14. for(Tuple tuple: inputWindow.get()) { 15. int i=(Integer) tuple.getValueByField("count"); 16. System.out.println("接收到一個(gè)bolt的總和值為:"+i); 17. sum+=i; 18. } 19. System.out.println("一個(gè)窗口內(nèi)的總值為:"+sum); 20. } 21. 22. public void declareOutputFields(OutputFieldsDeclarer declarer) { 23. } }
然后我們注釋RandomSentenceSpout第22行代碼,取消對(duì)23行代碼的注釋,方便觀察結(jié)果。去掉SlidingWindowBolt類20和25行代碼。
topology啟動(dòng)類如下:
1.public class WindowsTopology { 2. 3. public static void main(String[] args) throws Exception { 4. TopologyBuilder builder = new TopologyBuilder(); 5. builder.setSpout("spout", new RandomSentenceSpout(), 1); 6. 7. builder.setBolt("slidingwindowbolt", new SlidingWindowBolt() 8. .withWindow(new Duration(6, TimeUnit.SECONDS), 9. new Duration(2, TimeUnit.SECONDS)),2) 10. .shuffleGrouping("spout");//每?jī)擅虢y(tǒng)計(jì)最近6秒的數(shù)據(jù) 11. 12. builder.setBolt("countwordbolt", new CountWord() 13. .withWindow(new Count(2), new Count(2)),1) 14. .shuffleGrouping("slidingwindowbolt"); 15. //每收到2條tuple就統(tǒng)計(jì)最近兩條統(tǒng)的數(shù)據(jù) 16. Config conf = new Config(); 17. conf.setNumWorkers(1); 18. LocalCluster cluster = new LocalCluster(); 19. cluster.submitTopology("word-count", conf, builder.createTopology()); 20. } }
* 假設(shè)的你幾行代碼可以完成某個(gè)功能,抽取成一個(gè)方法 * 假設(shè)在某個(gè)業(yè)務(wù)邏輯層可以共用,往上抽取, * 假設(shè)在多個(gè)業(yè)務(wù)層可以共用,提煉成工具類。 * 假設(shè)你的這個(gè)業(yè)務(wù)方法在多個(gè)系統(tǒng)需要被使用,發(fā)布成一個(gè)服務(wù).分類: storm