如今框架橫行,Spring 已經(jīng)是非常成熟的容器體系,我們在日常開發(fā) JavaWeb 的工作中,大多已經(jīng)不需要考慮多線程的問題,這些問題都已經(jīng)在Spring容器中實(shí)現(xiàn),框架的意義就是讓程序員們可以專注于邏輯的實(shí)現(xiàn)。然而這種編程工作是非常無趣無味的,如果長期從事這個(gè)工作,技術(shù)不一定見長,業(yè)務(wù)知識(shí)一定很熟悉!= =但說實(shí)在的,我并不喜歡這類工作,因?yàn)檫@種工作大多情況下知識(shí)對(duì)代碼的簡單復(fù)制,或是簡單的一些編寫,并沒有什么真正的創(chuàng)造性,不會(huì)給人成就感。

  需求背景


  我們的項(xiàng)目,是 Mysql+ElasticSearch 做的一個(gè)數(shù)據(jù)庫和搜索引擎,項(xiàng)目經(jīng)理提出需要做一個(gè)用于重建 ES 搜索數(shù)據(jù)的接口,這個(gè)任務(wù)很光榮的交給了我。

  在功能的編寫過程當(dāng)中,我突然思考這樣一個(gè)問題,因?yàn)槲覀?Web 項(xiàng)目本身是多線程的,那如果在同一時(shí)間段,有多個(gè)請求同時(shí)發(fā)起,那同時(shí)發(fā)起 ES 的重建,對(duì)于 ES 來說,可能會(huì)產(chǎn)生一些莫名其妙的問題。

  所以我感到非常高興,因?yàn)檫@個(gè)問題,似乎不是聽起來的那么簡單。于是乎我想到了,要加入同步鎖了。

最開始的思考:


  最開始我只是很簡單的想,直接在對(duì)應(yīng)的 Service 層寫一個(gè)方法,然后直接加一個(gè)

synchronized(this)

在整個(gè)方法體上?!   ?/p>

1 @Override2     public synchronized int rebuiltBountyData() throws Exception {3         ...4     }

 

  可是問題來了:


  但是這個(gè)方法很快就聯(lián)想到了另一個(gè)問題:

  我們是希望不要多線程同時(shí)重建數(shù)據(jù),但是如果排隊(duì)重建呢?好像也不是我們想要的結(jié)果。我希望的是當(dāng)一個(gè)線程在執(zhí)行重建任務(wù)的時(shí)候,另一個(gè)線程要被拒絕開始任務(wù),而不是等待上一個(gè)任務(wù)做好后再開始。因?yàn)槲覀?tomcat 是采用線程池的概念,如果所有線程都執(zhí)行這個(gè)方法,最后每個(gè)線程都會(huì)處于等待狀態(tài),結(jié)果其他請求就會(huì)因?yàn)闆]有空閑的線程可用,而無法正常執(zhí)行。

  so,我們修改了一下思路:

  在 Service 的這個(gè)實(shí)現(xiàn)類中,添加一個(gè)私有類成員對(duì)象  flag = false,當(dāng)線程進(jìn)入時(shí),判斷 flag 是否為 true,是,則直接拋出異常,結(jié)束線程。否,則修改 flag 的值為 true,然后開始執(zhí)行線程任務(wù),并且,我們對(duì)這個(gè) flag 加上一個(gè)同步鎖,例如:我們在代碼中使用時(shí),加入這樣一段

synchronized(flag)

  由于 Spring 默認(rèn)是單例模式,所以這個(gè)flag 在多個(gè)線程中是共享的,這樣就不需要將這個(gè)flag 設(shè)置為 static 了,因?yàn)樗谶@個(gè)局部當(dāng)中實(shí)現(xiàn)了類似 static 的作用。但是這個(gè)時(shí)候,flag 不能是基礎(chǔ)類型,必須是 Boolean 包裝類型。那就會(huì)產(chǎn)生另一個(gè)隱患:包裝類的對(duì)象僅僅是一個(gè)引用,引用是可以被更換了,比如使用了這個(gè) flag 的 set 方法來修改值,但是同步鎖取得是引用的鎖,而不是引用對(duì)應(yīng)那個(gè)實(shí)例的鎖,鎖了引用卻沒鎖實(shí)例,但我們實(shí)際上卻要根據(jù)實(shí)例的狀態(tài)來判斷,這就會(huì)造成一個(gè)隱患,可能會(huì)使得同步鎖失效。

  那使用 this 來獲得整個(gè) Service 類的同步鎖,貌似可以解決問題(如下面這段代碼具體實(shí)現(xiàn)),但是如果萬一以后這個(gè) Service 還有其他需要用到同步鎖的需求怎么辦呢?這樣就會(huì)讓兩個(gè)不想干的業(yè)務(wù)邏輯因?yàn)橥芥i的問題產(chǎn)生互相的影響。添加同步鎖,要盡可能的縮小同步鎖的獲取范圍,和鎖內(nèi)代碼的代碼量,這樣才能減少?zèng)_突和線程獲取鎖時(shí)等待的時(shí)間,提高軟件的安全性和執(zhí)行效率。

  而且,我們的需求在這個(gè)時(shí)候,又有了變化,項(xiàng)目經(jīng)理說,有兩張表都需要做這種功能。就是說兩個(gè)業(yè)務(wù)內(nèi)容,都需要進(jìn)行ES 的數(shù)據(jù)重建。所以如果每次增加一個(gè),我就要單獨(dú)寫一個(gè)類似下面這段代碼,不僅代碼的可復(fù)用性降低了,而且以后換別人來維護(hù)的時(shí)候,說不定會(huì)寫錯(cuò)這些內(nèi)容。

電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),平面設(shè)計(jì)培訓(xùn),網(wǎng)頁設(shè)計(jì)培訓(xùn),美工培訓(xùn),Web培訓(xùn),Web前端開發(fā)培訓(xùn)

 1 @Override 2     public int rebuiltBountyData() throws Exception { 3         //鎖住資源防止多線程重復(fù)發(fā)起任務(wù) 4         synchronized (this) { 5             if (hasThread) { 6                 throw new RebuiltBountyEsException("搜索引擎重建任務(wù)已經(jīng)在執(zhí)行,請勿重復(fù)發(fā)起!"); 7             } else { 8                 hasThread = true; 9             }10         }11         //獲取總數(shù)12         int count = bountyMapper.countNum();13 14         int pageTotal, pageSize = 1000;15 16         if (count % pageSize != 0) {17             //若不能整除,則頁數(shù)加118             pageTotal = count / pageSize + 1;19         } else {20             pageTotal = count / pageSize;21         }22 23         try {24             for (int pageNum = 1; pageNum <= pageTotal; pageNum++) {25                 //分頁查詢數(shù)據(jù)庫的數(shù)據(jù)26                 PageHelper.startPage(pageNum, pageSize);27                 List<Bounty> bountyList = bountyMapper.selectForRebuiltES();28                 //添加到 ES 引擎29                 bountyDao.add4List(bountyList);30             }31         } catch (Exception e) {32             throw e;33         } finally {34             hasThread = false;35         }36         return count;37     }

電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),平面設(shè)計(jì)培訓(xùn),網(wǎng)頁設(shè)計(jì)培訓(xùn),美工培訓(xùn),Web培訓(xùn),Web前端開發(fā)培訓(xùn)

  那我們該怎么辦好呢?


  最好的辦法,就是把這個(gè)需要“加鎖”的邏輯,單獨(dú)賦予一個(gè)對(duì)象,讓這個(gè)鎖的范圍能夠縮小到只針對(duì)這個(gè)邏輯,這個(gè)功能,而不要跟其他的功能混在一起。 然后我們需要對(duì)這個(gè)功能,進(jìn)行進(jìn)一步的抽象。

  我們來好好觀察上面這段代碼,上面這段代碼,算是已經(jīng)實(shí)現(xiàn)了整個(gè)功能,從頭到尾分解一下這段代碼的功能,可以看得出如下:

  1.   單線程檢查

  2.   分頁處理

  3.   獲取數(shù)據(jù)

  4.   寫入 ES

  So,我們可以看到,其實(shí)不同業(yè)務(wù)場景下,線程檢查是一模一樣的代碼,而分頁處理中,獲取數(shù)據(jù)總條數(shù)會(huì)根據(jù)不同業(yè)務(wù)場景而不同,其他代碼也都是相同的,至于寫入 ES 的部分,如果數(shù)據(jù)結(jié)構(gòu)跟從數(shù)據(jù)庫中獲取的實(shí)體對(duì)象沒有區(qū)別的話,這個(gè)也是可以看做是相同的而不需要特別的處理,但是我們公司的項(xiàng)目中,因?yàn)榉N種原因,ES 中的數(shù)據(jù)結(jié)構(gòu)和實(shí)體對(duì)象是不同的(盡管數(shù)據(jù)字段都是相同,我表示我不知道怎么跟你們說這個(gè)歷史遺留的奇葩問題...)。在這里,我們要應(yīng)用一個(gè)設(shè)計(jì)模式,是模板模式,將固定的流程代碼封裝起來。再將可變的部分,留給子類實(shí)現(xiàn)。

電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),平面設(shè)計(jì)培訓(xùn),網(wǎng)頁設(shè)計(jì)培訓(xùn),美工培訓(xùn),Web培訓(xùn),Web前端開發(fā)培訓(xùn)

  1 /**  2  * 類說明:從 JDBC 中獲取重建 ES 的數(shù)據(jù)  3  */  4 @Service  5 public abstract class JdbcRebuiltEsService<E> extends BaseService{  6   7   8     protected Logger log = LoggerFactory.getLogger(getClass());  9  10     private boolean threadLock = false;//線程鎖 11  12     @Value("1") 13     private int startPage;//開始頁碼 14  15     @Value("1000") 16     private int pageSize;//頁面容量 17  18  19     protected abstract int countTotalData() throws Exception; 20  21     protected abstract Collection<E> loadDataSource(int pageSize, int pageNum) throws Exception; 22  23     protected abstract void writeToElasticSearch(Collection<E> collection) throws Exception; 24  25     /** 26      * 檢查線程鎖 27      * 28      * @throws Exception 29      */ 30     private void checkLock() throws Exception { 31  32         //這段代碼需要保證線程安全 33         synchronized (this) { 34             if (threadLock) { 35                 //如果已經(jīng)有線程占用,后續(xù)線程進(jìn)入則拋出異常,因?yàn)楸窘涌谥辉试S單線程執(zhí)行 36                 throw new RebuiltEsTaskExistException("已經(jīng)有重建任務(wù)正在執(zhí)行,請等待結(jié)束后再發(fā)起新任務(wù)!"); 37             } else { 38                 //如果沒有線程占用,則新線程進(jìn)入后將改成線程占用狀態(tài) 39                 threadLock = true; 40                 log.info("用戶[{}]發(fā)起 ES 重建任務(wù)!其他重建任務(wù)請求將被拒絕!", getUserJid()); 41             } 42         } 43     } 44  45     /** 46      * 數(shù)據(jù)重建 47      * 48      * @return 49      * @throws Exception 50      */ 51     public int rebuild() throws Exception { 52  53         checkLock(); 54         log.info("#=== ES 重建任務(wù)開始執(zhí)行"); 55  56         int totalNum = countTotalData(); 57         log.info("本次重建預(yù)計(jì)總記錄數(shù){}", totalNum); 58  59         int pageTotal; 60         int pageNum = this.startPage; 61         int pageSize = this.pageSize; 62  63         //根據(jù)條目總數(shù)計(jì)算總頁數(shù) 64         if (totalNum % pageSize != 0) { 65             //若不能整除,則頁數(shù)加1 66             pageTotal = totalNum / pageSize + 1; 67         } else { 68             pageTotal = totalNum / pageSize; 69         } 70  71         long startTime = System.currentTimeMillis();//任務(wù)開始計(jì)時(shí) 72  73         try { 74             while (pageNum <= pageTotal) { 75                 //分頁查詢數(shù)據(jù)庫的數(shù)據(jù)并同時(shí)發(fā)送到 ES 76                 writeToElasticSearch(loadDataSource(pageSize, pageNum)); 77                 pageNum++; 78             } 79         } catch (Exception e) { 80             Double progress = (Double) (pageNum * 1.0) / (Double) (pageSize * 1.0); 81             DecimalFormat decimalFormat = new DecimalFormat("##.00%"); 82             log.info("重建異常中斷,當(dāng)前已重建進(jìn)度為:{}", decimalFormat.format(progress)); 83             throw e; 84         } finally { 85             threadLock = false;//不論是否成功,當(dāng)線程退出時(shí),都需要將線程狀態(tài)改為非占用 86             long endTime = System.currentTimeMillis();//任務(wù)結(jié)束計(jì)時(shí) 87             log.info("#=== ES 重建任務(wù)執(zhí)行結(jié)束,耗時(shí):{}毫秒", endTime - startTime); 88         } 89  90         return totalNum; 91     } 92  93     public int getStartPage() { 94         return startPage; 95     } 96  97     public void setStartPage(int startPage) { 98         this.startPage = startPage; 99     }100 101     public int getPageSize() {102         return pageSize;103     }104 105     public void setPageSize(int pageSize) {106         this.pageSize = pageSize;107     }108 }

電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),平面設(shè)計(jì)培訓(xùn),網(wǎng)頁設(shè)計(jì)培訓(xùn),美工培訓(xùn),Web培訓(xùn),Web前端開發(fā)培訓(xùn)

  OK,這樣就解決了。復(fù)寫三個(gè)容易跟隨應(yīng)用場景不同,而改變的方法,分別是,獲取數(shù)據(jù)源,獲取數(shù)據(jù)總條目,寫入 ES。然后暴露 rebuild 方法給外部調(diào)用,在 rebuild 方法內(nèi)部,實(shí)現(xiàn)整個(gè)運(yùn)作流程,這樣也可以避免以后有人需要做新的實(shí)現(xiàn)的時(shí)候,修改到這部分有涉及到同步鎖的代碼,以避免安全隱患。

  實(shí)際使用的時(shí)候可以這樣用,創(chuàng)建一個(gè)子類繼承這個(gè) JdbcRebuiltEsService 

電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),平面設(shè)計(jì)培訓(xùn),網(wǎng)頁設(shè)計(jì)培訓(xùn),美工培訓(xùn),Web培訓(xùn),Web前端開發(fā)培訓(xùn)

 1 /** 2  * 類說明:商品信息 ES 重建所需要實(shí)現(xiàn)的具體方法 3  */ 4 @Service 5 public class GoodsRebuiltEsServiceImpl extends JdbcRebuiltEsService<Goods> { 6  7     @Autowired 8     private GoodsMapper goodsMapper; 9 10     @Autowired11     private DrawingDAO drawingDAO;12 13     @Override14     public int countTotalData() throws Exception {15         return goodsMapper.countNum();16     }17 18     @Override19     public Collection<Goods> loadDataSource(int pageSize, int pageNum) throws Exception {20         PageHelper.startPage(pageNum, pageSize);21         return goodsMapper.selectForRebuiltES();22     }23 24     @Override25     public void writeToElasticSearch(Collection<Goods> collection) throws Exception {26         drawingDAO.addBatch(collection);27     }28 }

電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),平面設(shè)計(jì)培訓(xùn),網(wǎng)頁設(shè)計(jì)培訓(xùn),美工培訓(xùn),Web培訓(xùn),Web前端開發(fā)培訓(xùn)

  這樣以后每次使用,都只需要實(shí)現(xiàn)一個(gè)新的子類,然后這樣調(diào)用:

電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),平面設(shè)計(jì)培訓(xùn),網(wǎng)頁設(shè)計(jì)培訓(xùn),美工培訓(xùn),Web培訓(xùn),Web前端開發(fā)培訓(xùn)

 1   @Autowired 2     @Qualifier("goodsRebuiltEsServiceImpl") 3     private JdbcRebuiltEsService<Goods> jdbcRebuiltEsService; 4  5  /** 6      * 數(shù)據(jù)重建 7      * 8      * @return 9      * @throws Exception10      */11     @Override12     public int rebuiltEsGoodsData() throws Exception {13         return jdbcRebuiltEsService.rebuiltd();14     }

電腦培訓(xùn),計(jì)算機(jī)培訓(xùn),平面設(shè)計(jì)培訓(xùn),網(wǎng)頁設(shè)計(jì)培訓(xùn),美工培訓(xùn),Web培訓(xùn),Web前端開發(fā)培訓(xùn)

  這樣 rebuilt 方法就很安全的被調(diào)用,將程序中不希望被修改的部分,用父類寫好,只留下希望被復(fù)寫的部分,這樣就可以很好的保護(hù)比較關(guān)鍵的部位,當(dāng)然了,public 方法也是可以重寫的,不過這就超出了我們“以防萬一,不小心寫錯(cuò)”的初衷了,如果需要重寫,那就重寫唄。