滑動(dòng)窗口在監(jiān)控和統(tǒng)計(jì)應(yīng)用的場(chǎng)景比較廣泛,比如每隔一段時(shí)間(10s)統(tǒng)計(jì)最近30s的請(qǐng)求量或者異常次數(shù),根據(jù)請(qǐng)求或者異常次數(shù)采取相應(yīng)措施。在storm1.0版本之前,沒(méi)有提供關(guān)于滑動(dòng)窗口的實(shí)現(xiàn),需要開發(fā)者自己實(shí)現(xiàn)滑動(dòng)窗口的功能(storm1.0以前實(shí)現(xiàn)滑動(dòng)窗口的實(shí)現(xiàn)原理可以自行百度)。

原文和作者一起討論:http://www.cnblogs.com/intsmaze/p/6481588.html

微信:intsmaze

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

這里主要演示在storm1.0以后如何通過(guò)繼承storm1.0提供的類來(lái)快速開發(fā)出窗口滑動(dòng)的功能。窗口可以從時(shí)間或數(shù)量上來(lái)劃分,由如下兩個(gè)因素決定:窗口的長(zhǎng)度,可以是時(shí)間間隔或Tuple數(shù)量;滑動(dòng)間隔(sliding Interval),可以是時(shí)間間隔或Tuple數(shù)量。比如:每?jī)擅虢y(tǒng)計(jì)最近6秒的請(qǐng)求數(shù)量;每接收2個(gè)Tuple就統(tǒng)計(jì)最近接收的6個(gè)Tuple的平均值......。

storm1.0支持的時(shí)間和數(shù)量的排列組合有如下:


withWindow(Count windowLength, Count slidingInterval)

  每收到slidingInterval條數(shù)據(jù)統(tǒng)計(jì)最近的windowLength條數(shù)據(jù)。

withWindow(Count windowLength)

  每收到1條數(shù)據(jù)統(tǒng)計(jì)最近的windowLength條數(shù)據(jù)。

withWindow(Count windowLength, Duration slidingInterval)

  每過(guò)slidingInterval秒統(tǒng)計(jì)最近的windowLength條數(shù)據(jù)。

withWindow(Duration windowLength, Count slidingInterval)

  每收到slidingInterval條數(shù)據(jù)統(tǒng)計(jì)最近的windowLength秒的數(shù)據(jù)。

withWindow(Duration windowLength, Duration slidingInterval)

  每過(guò)slidingInterval秒統(tǒng)計(jì)最近的windowLength秒的數(shù)據(jù)。

public withWindow(Duration windowLength)

  每收到1條數(shù)據(jù)統(tǒng)計(jì)最近的windowLength秒的數(shù)據(jù)。


接下來(lái),簡(jiǎn)單的演示如何使用storm1.0實(shí)現(xiàn)滑動(dòng)窗口的功能,先編寫spout類,RandomSentenceSpout負(fù)責(zé)發(fā)送一個(gè)整形數(shù)值,數(shù)值每次發(fā)送都會(huì)自動(dòng)加一,且RandomSentenceSpout固定每隔兩秒向bolt發(fā)送一次數(shù)據(jù)。RandomSentenceSpout和前面關(guān)于spout的講解一樣。

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

1.public class RandomSentenceSpout extends BaseRichSpout { 2. 3.    private static final long serialVersionUID = 5028304756439810609L;   4. 5.    private SpoutOutputCollector collector;   6. 7.    int intsmaze=0; 8. 9.    public void declareOutputFields(OutputFieldsDeclarer declarer) { 10.        declarer.declare(new Fields("intsmaze")); 11.    } 12. 13.    public void open(Map conf, TopologyContext context,  14.                          SpoutOutputCollector collector) { 15.        this.collector = collector; 16.    } 17. 18.    public void nextTuple() { 19.        System.out.println("發(fā)送數(shù)據(jù):"+intsmaze); 20.        collector.emit(new Values(intsmaze++)); 21.        try { 22.            Thread.sleep(2000); 23.//         Thread.sleep(1000); 24.        } catch (InterruptedException e) { 25.            e.printStackTrace(); 26.        } 27.    } }

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)



滑動(dòng)窗口的邏輯實(shí)現(xiàn)的重點(diǎn)是bolt類,這里我們編寫SlidingWindowBolt類讓它繼承一個(gè)新的類名為BaseWindowedBolt來(lái)獲得窗口計(jì)數(shù)的功能。BaseWindowedBolt和前面的BaseBaseBoltBaseWindowedBolt提供的方法名都一樣,只是execute方法的參數(shù)類型為TupleWindow,TupleWindow參數(shù)里面裝載了一個(gè)窗口長(zhǎng)度類的tuple數(shù)據(jù)。通過(guò)對(duì)TupleWindow遍歷,我們可以計(jì)算這一個(gè)窗口內(nèi)tuple數(shù)的平均值或總和等指標(biāo)。具體見代碼12-16行,統(tǒng)計(jì)了一個(gè)窗口內(nèi)的數(shù)值型數(shù)據(jù)的總和。

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

1.public class SlidingWindowBolt extends BaseWindowedBolt { 2. 3.    private OutputCollector collector; 4. 5.    @Override 6.    public void prepare(Map stormConf, TopologyContext context,  7.            OutputCollector collector) { 8.        this.collector = collector; 9.    } 10. 11.    public void execute(TupleWindow inputWindow) {         12.        int sum=0; 13.        System.out.print("一個(gè)窗口內(nèi)的數(shù)據(jù)"); 14.        for(Tuple tuple: inputWindow.get()) { 15.            int str=(Integer) tuple.getValueByField("intsmaze"); 16.            System.out.print(" "+str); 17.            sum+=str; 18.        } 19.        System.out.println("======="+sum); 20. //        collector.emit(new Values(sum)); 21.    } 22. 23.    @Override 24.    public void declareOutputFields(OutputFieldsDeclarer declarer) { 25.//       declarer.declare(new Fields("count")); 26.    } }

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)



我們已經(jīng)實(shí)現(xiàn)了窗口計(jì)數(shù)的邏輯代碼,現(xiàn)在我們需要提供topology來(lái)指明各個(gè)組件的關(guān)系,以及指定SlidingWindowBolt的窗口的組合,這里我們演示了如何每?jī)擅虢y(tǒng)計(jì)最近6秒的數(shù)值總和,如果注釋掉10-13行代碼,去掉5-8行的注釋,這個(gè)topology就是告訴SlidingWindowBolt每接收到兩條tuple就統(tǒng)計(jì)最近接收到的6條tuple的數(shù)值的總和。

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

1.public class WindowsTopology { 2. 3.    public static void main(String[] args) throws Exception { 4.       TopologyBuilder builder = new TopologyBuilder(); 5.       builder.setSpout("spout1", new RandomSentenceSpout(), 1); 6.//       builder.setBolt("slidingwindowbolt", new SlidingWindowBolt() 7.//       .withWindow(new Count(6), new Count(2)),1) 8.//       .shuffleGrouping("spout"); 9.//滑窗 窗口長(zhǎng)度:tuple數(shù), 滑動(dòng)間隔: tuple數(shù) 每收到2條數(shù)據(jù)統(tǒng)計(jì)當(dāng)前6條數(shù)據(jù)的總和。   10.      11.       builder.setBolt("slidingwindowbolt", new SlidingWindowBolt() 12.       .withWindow(new Duration(6, TimeUnit.SECONDS),  13.               new Duration(2, TimeUnit.SECONDS)),1) 14.       .shuffleGrouping("spout");//每?jī)擅虢y(tǒng)計(jì)最近6秒的數(shù)據(jù)        15. 16.       Config conf = new Config(); 17.       conf.setNumWorkers(1); 18.       LocalCluster cluster = new LocalCluster(); 19.       cluster.submitTopology("word-count", conf, builder.createTopology()); 20.   } }

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)



這里演示的是bolt節(jié)點(diǎn)并發(fā)度為1的窗口功能,實(shí)際生產(chǎn)中,因?yàn)閿?shù)據(jù)量很大,往往將bolt節(jié)點(diǎn)的并發(fā)度設(shè)置為多個(gè),這個(gè)時(shí)候我們的SlidingWindowBolt就無(wú)法統(tǒng)計(jì)出一個(gè)窗口的數(shù)值總和了。因?yàn)槊恳粋€(gè)bolt的并行節(jié)點(diǎn)只能統(tǒng)計(jì)自己一個(gè)窗口接收到數(shù)據(jù)的總和,無(wú)法統(tǒng)計(jì)出一個(gè)窗口內(nèi)全局?jǐn)?shù)據(jù)的總和,借助redis來(lái)實(shí)現(xiàn)是可以的,但是必須引入redis的事務(wù)機(jī)制或者借助分布式鎖,否則會(huì)出現(xiàn)臟數(shù)據(jù)的情況。在這里我們介紹另一種實(shí)現(xiàn)方式就是靈活的使用storm提供的窗口功能,只是窗口的tuple數(shù)。

仍然是使用上面提供的類,只是我們?cè)黾右粋€(gè)bolt類,來(lái)統(tǒng)計(jì)每個(gè)SlidingWindowBolt節(jié)點(diǎn)發(fā)送給它的數(shù)值。

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

1.public class CountWord extends BaseWindowedBolt{ 2.     3.    private static final long serialVersionUID = -5283595260540124273L; 4.     5.    private OutputCollector collector; 6.     7.    public void prepare(Map stormConf, TopologyContext context 8.                             , OutputCollector collector) { 9.        this.collector = collector; 10.    } 11.     12.    public void execute(TupleWindow inputWindow) { 13.         int sum=0; 14.         for(Tuple tuple: inputWindow.get()) { 15.             int i=(Integer) tuple.getValueByField("count"); 16.               System.out.println("接收到一個(gè)bolt的總和值為:"+i); 17.               sum+=i; 18.          } 19.         System.out.println("一個(gè)窗口內(nèi)的總值為:"+sum); 20.    } 21. 22.    public void declareOutputFields(OutputFieldsDeclarer declarer) { 23.    } }

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)



然后我們注釋RandomSentenceSpout22行代碼,取消對(duì)23行代碼的注釋,方便觀察結(jié)果。去掉SlidingWindowBolt20和25行代碼。

topology啟動(dòng)類如下:

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

1.public class WindowsTopology { 2. 3.    public static void main(String[] args) throws Exception { 4.       TopologyBuilder builder = new TopologyBuilder(); 5.       builder.setSpout("spout", new RandomSentenceSpout(), 1); 6.        7.       builder.setBolt("slidingwindowbolt", new SlidingWindowBolt() 8.       .withWindow(new Duration(6, TimeUnit.SECONDS),  9.               new Duration(2, TimeUnit.SECONDS)),2) 10.       .shuffleGrouping("spout");//每?jī)擅虢y(tǒng)計(jì)最近6秒的數(shù)據(jù) 11.        12.       builder.setBolt("countwordbolt", new CountWord() 13.       .withWindow(new Count(2), new Count(2)),1) 14.       .shuffleGrouping("slidingwindowbolt"); 15.       //每收到2條tuple就統(tǒng)計(jì)最近兩條統(tǒng)的數(shù)據(jù) 16.       Config conf = new Config(); 17.       conf.setNumWorkers(1); 18.       LocalCluster cluster = new LocalCluster(); 19.       cluster.submitTopology("word-count", conf, builder.createTopology()); 20.   } }

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

 

* 假設(shè)的你幾行代碼可以完成某個(gè)功能,抽取成一個(gè)方法 * 假設(shè)在某個(gè)業(yè)務(wù)邏輯層可以共用,往上抽取, * 假設(shè)在多個(gè)業(yè)務(wù)層可以共用,提煉成工具類。 * 假設(shè)你的這個(gè)業(yè)務(wù)方法在多個(gè)系統(tǒng)需要被使用,發(fā)布成一個(gè)服務(wù).分類: storm