概要:在使用storm分布式計算框架進(jìn)行數(shù)據(jù)處理時,如何保證進(jìn)入storm的消息的一定會被處理,且不會被重復(fù)處理。這個時候僅僅開啟storm的ack機制并不能解決上述問題。那么該如何設(shè)計出一個好的方案來解決上述問題?

  現(xiàn)有架構(gòu)背景:本人所在項目組的實時系統(tǒng)負(fù)責(zé)為XXX的實時產(chǎn)生的交易記錄進(jìn)行處理,根據(jù)處理的結(jié)果向用戶推送不同的信息。實時系統(tǒng)平時接入量每秒1000條,雙十一的時候,最大幾十萬條。

原文和作者一起討論:http://www.cnblogs.com/intsmaze/p/6219878.html

可接網(wǎng)站開發(fā),java開發(fā)。

新浪微博:intsmaze劉洋洋哥

微信:intsmaze

iOS培訓(xùn),Swift培訓(xùn),蘋果開發(fā)培訓(xùn),移動開發(fā)培訓(xùn)

  架構(gòu)設(shè)計:

iOS培訓(xùn),Swift培訓(xùn),蘋果開發(fā)培訓(xùn),移動開發(fā)培訓(xùn)

  storm設(shè)置的超時時間為3分鐘;kafkaspout的pending的長度為2000;storm開啟ack機制,拓?fù)涑绦蛑腥绻霈F(xiàn)異常則調(diào)用ack方法,向spout發(fā)出ack消息;每一個交易數(shù)據(jù)會有一個全局唯一性di。

  處理流程:

  交易數(shù)據(jù)會發(fā)送到kafka,然后拓?fù)銩去kafka取數(shù)據(jù)進(jìn)行處理,拓?fù)銩中的OnceBolt會先對從kafka取出的消息進(jìn)行一個唯一性過濾(根據(jù)該消息的全局id判斷該消息是否存儲在redis中,如果有,則說明拓?fù)銩已經(jīng)對該消息處理過了,則不會把該消息發(fā)送該下游的calculateBolt,直接向spout發(fā)送ack響應(yīng);如果沒有,則把該消息發(fā)送該下游的calculateBolt。),calculateBolt對接收到來自上游的數(shù)據(jù)進(jìn)行規(guī)則的匹配,根據(jù)該消息所符合的規(guī)則推送到不同的kafka通知主題中。

  拓?fù)銪則是不同的通知拓?fù)?,去kafka讀取對應(yīng)通知的主題,然后把該消息推送到不同的客戶端(微信客戶端,支付寶客戶端等)。

  架構(gòu)設(shè)計的意義:

  通過借用redis,來保證消息不會被重復(fù)處理,對異常的消息,我們不讓該消息重發(fā)。

  因為系統(tǒng)只是對交易成功后的數(shù)據(jù)通過配置的規(guī)則進(jìn)行區(qū)分來向用戶推送不同的活動信息,從業(yè)務(wù)上看,系統(tǒng)并不需要保證所有交易的用戶都一定要收到活動信息,只需要保證交易的用戶不會收到重復(fù)的數(shù)據(jù)即可。

  但是在線上運行半年后,還是發(fā)現(xiàn)了消息重復(fù)處理的問題,某些用戶還是會收到兩條甚至多條重復(fù)信息。

  通過對現(xiàn)有架構(gòu)的查看,我們發(fā)現(xiàn)問題出在拓?fù)銪中(各個不同的通知拓?fù)洌?,原因是拓?fù)銪沒有添加唯一性過濾bolt,雖然上游的拓?fù)鋵ο⑦M(jìn)行唯一性過濾了(保證了外部系統(tǒng)向kafka生產(chǎn)消息出現(xiàn)重復(fù)下,拓?fù)銩不進(jìn)行重復(fù)處理),但是回看拓?fù)銪,我們可以知道消息重發(fā)絕對不是kafka主題中存在重復(fù)的兩條消息,且拓?fù)銪消息重復(fù)不是系統(tǒng)異常導(dǎo)致的(我們隊異常進(jìn)行ack應(yīng)答),那么導(dǎo)致消息重復(fù)處理的原因就一定是消息超時導(dǎo)致的。ps:消息在storm中被處理,沒有發(fā)生異常,而是由于集群硬件資源的爭搶或者下游接口瓶頸無法快速處理拓?fù)銪推送出去的消息,導(dǎo)致一條消息在3分鐘內(nèi)沒有處理完,spout就認(rèn)為該消息fail,而重新發(fā)該消息,但是超時的那一條消息并不是說不會處理,當(dāng)他獲得資源了,仍然會處理結(jié)束的。

   解決方案:在拓?fù)銪中添加唯一性過濾bolt即可解決。

  個人推測:當(dāng)時實時系統(tǒng)架構(gòu)設(shè)計時,設(shè)計唯一性過濾bolt時,可能僅僅是考慮到外部系統(tǒng)向kafka推送數(shù)據(jù)可能會存在相同的消息,并沒有想到storm本身tuple超時導(dǎo)致的消息重復(fù)處理。

  該系統(tǒng)改進(jìn):雖然從業(yè)務(wù)的角度來說,并不需要保證每一個交易用戶都一定要收到活動信息,但是我們完全可以做到每一個用戶都收到活動信息,且收到的消息不重復(fù)。

我們可以做到對程序的異常進(jìn)行控制,但是超時導(dǎo)致的fail我們無法控制。

  我們對消息處理異??刂疲?dāng)發(fā)生異常信息,我們在發(fā)送fail應(yīng)答前,把該異常的消息存儲到redis中,這樣唯一性過濾的bolt就會對收到的每一條消息進(jìn)行判斷,如果在redis中,我們就知道該消息是異常導(dǎo)致的失敗,就讓該消息繼續(xù)處理,如果該消息不在redis中,我們就知道該消息是超時導(dǎo)致的fail,那么我們就過濾掉該消息,不進(jìn)行下一步處理。

這樣我們就做到了消息的可靠處理且不會重復(fù)處理。

 

* 假設(shè)的你幾行代碼可以完成某個功能,抽取成一個方法 * 假設(shè)在某個業(yè)務(wù)邏輯層可以共用,往上抽取, * 假設(shè)在多個業(yè)務(wù)層可以共用,提煉成工具類。 * 假設(shè)你的這個業(yè)務(wù)方法在多個系統(tǒng)需要被使用,發(fā)布成一個服務(wù).