如今框架橫行,Spring 已經(jīng)是非常成熟的容器體系,我們在日常開發(fā) JavaWeb 的工作中,大多已經(jīng)不需要考慮多線程的問題,這些問題都已經(jīng)在Spring容器中實(shí)現(xiàn),框架的意義就是讓程序員們可以專注于邏輯的實(shí)現(xiàn)。然而這種編程工作是非常無趣無味的,如果長期從事這個(gè)工作,技術(shù)不一定見長,業(yè)務(wù)知識(shí)一定很熟悉!= =但說實(shí)在的,我并不喜歡這類工作,因?yàn)檫@種工作大多情況下知識(shí)對(duì)代碼的簡單復(fù)制,或是簡單的一些編寫,并沒有什么真正的創(chuàng)造性,不會(huì)給人成就感。
需求背景
我們的項(xiàng)目,是 Mysql+ElasticSearch 做的一個(gè)數(shù)據(jù)庫和搜索引擎,項(xiàng)目經(jīng)理提出需要做一個(gè)用于重建 ES 搜索數(shù)據(jù)的接口,這個(gè)任務(wù)很光榮的交給了我。
在功能的編寫過程當(dāng)中,我突然思考這樣一個(gè)問題,因?yàn)槲覀?Web 項(xiàng)目本身是多線程的,那如果在同一時(shí)間段,有多個(gè)請求同時(shí)發(fā)起,那同時(shí)發(fā)起 ES 的重建,對(duì)于 ES 來說,可能會(huì)產(chǎn)生一些莫名其妙的問題。
所以我感到非常高興,因?yàn)檫@個(gè)問題,似乎不是聽起來的那么簡單。于是乎我想到了,要加入同步鎖了。
最開始的思考:
最開始我只是很簡單的想,直接在對(duì)應(yīng)的 Service 層寫一個(gè)方法,然后直接加一個(gè)
synchronized(this)
在整個(gè)方法體上?! ?/p>
1 @Override2 public synchronized int rebuiltBountyData() throws Exception {3 ...4 }
可是問題來了:
但是這個(gè)方法很快就聯(lián)想到了另一個(gè)問題:
我們是希望不要多線程同時(shí)重建數(shù)據(jù),但是如果排隊(duì)重建呢?好像也不是我們想要的結(jié)果。我希望的是當(dāng)一個(gè)線程在執(zhí)行重建任務(wù)的時(shí)候,另一個(gè)線程要被拒絕開始任務(wù),而不是等待上一個(gè)任務(wù)做好后再開始。因?yàn)槲覀?tomcat 是采用線程池的概念,如果所有線程都執(zhí)行這個(gè)方法,最后每個(gè)線程都會(huì)處于等待狀態(tài),結(jié)果其他請求就會(huì)因?yàn)闆]有空閑的線程可用,而無法正常執(zhí)行。
so,我們修改了一下思路:
在 Service 的這個(gè)實(shí)現(xiàn)類中,添加一個(gè)私有類成員對(duì)象 flag = false,當(dāng)線程進(jìn)入時(shí),判斷 flag 是否為 true,是,則直接拋出異常,結(jié)束線程。否,則修改 flag 的值為 true,然后開始執(zhí)行線程任務(wù),并且,我們對(duì)這個(gè) flag 加上一個(gè)同步鎖,例如:我們在代碼中使用時(shí),加入這樣一段
synchronized(flag)
由于 Spring 默認(rèn)是單例模式,所以這個(gè)flag 在多個(gè)線程中是共享的,這樣就不需要將這個(gè)flag 設(shè)置為 static 了,因?yàn)樗谶@個(gè)局部當(dāng)中實(shí)現(xiàn)了類似 static 的作用。但是這個(gè)時(shí)候,flag 不能是基礎(chǔ)類型,必須是 Boolean 包裝類型。那就會(huì)產(chǎn)生另一個(gè)隱患:包裝類的對(duì)象僅僅是一個(gè)引用,引用是可以被更換了,比如使用了這個(gè) flag 的 set 方法來修改值,但是同步鎖取得是引用的鎖,而不是引用對(duì)應(yīng)那個(gè)實(shí)例的鎖,鎖了引用卻沒鎖實(shí)例,但我們實(shí)際上卻要根據(jù)實(shí)例的狀態(tài)來判斷,這就會(huì)造成一個(gè)隱患,可能會(huì)使得同步鎖失效。
那使用 this 來獲得整個(gè) Service 類的同步鎖,貌似可以解決問題(如下面這段代碼具體實(shí)現(xiàn)),但是如果萬一以后這個(gè) Service 還有其他需要用到同步鎖的需求怎么辦呢?這樣就會(huì)讓兩個(gè)不想干的業(yè)務(wù)邏輯因?yàn)橥芥i的問題產(chǎn)生互相的影響。添加同步鎖,要盡可能的縮小同步鎖的獲取范圍,和鎖內(nèi)代碼的代碼量,這樣才能減少?zèng)_突和線程獲取鎖時(shí)等待的時(shí)間,提高軟件的安全性和執(zhí)行效率。
而且,我們的需求在這個(gè)時(shí)候,又有了變化,項(xiàng)目經(jīng)理說,有兩張表都需要做這種功能。就是說兩個(gè)業(yè)務(wù)內(nèi)容,都需要進(jìn)行ES 的數(shù)據(jù)重建。所以如果每次增加一個(gè),我就要單獨(dú)寫一個(gè)類似下面這段代碼,不僅代碼的可復(fù)用性降低了,而且以后換別人來維護(hù)的時(shí)候,說不定會(huì)寫錯(cuò)這些內(nèi)容。
1 @Override 2 public int rebuiltBountyData() throws Exception { 3 //鎖住資源防止多線程重復(fù)發(fā)起任務(wù) 4 synchronized (this) { 5 if (hasThread) { 6 throw new RebuiltBountyEsException("搜索引擎重建任務(wù)已經(jīng)在執(zhí)行,請勿重復(fù)發(fā)起!"); 7 } else { 8 hasThread = true; 9 }10 }11 //獲取總數(shù)12 int count = bountyMapper.countNum();13 14 int pageTotal, pageSize = 1000;15 16 if (count % pageSize != 0) {17 //若不能整除,則頁數(shù)加118 pageTotal = count / pageSize + 1;19 } else {20 pageTotal = count / pageSize;21 }22 23 try {24 for (int pageNum = 1; pageNum <= pageTotal; pageNum++) {25 //分頁查詢數(shù)據(jù)庫的數(shù)據(jù)26 PageHelper.startPage(pageNum, pageSize);27 List<Bounty> bountyList = bountyMapper.selectForRebuiltES();28 //添加到 ES 引擎29 bountyDao.add4List(bountyList);30 }31 } catch (Exception e) {32 throw e;33 } finally {34 hasThread = false;35 }36 return count;37 }
那我們該怎么辦好呢?
最好的辦法,就是把這個(gè)需要“加鎖”的邏輯,單獨(dú)賦予一個(gè)對(duì)象,讓這個(gè)鎖的范圍能夠縮小到只針對(duì)這個(gè)邏輯,這個(gè)功能,而不要跟其他的功能混在一起。 然后我們需要對(duì)這個(gè)功能,進(jìn)行進(jìn)一步的抽象。
我們來好好觀察上面這段代碼,上面這段代碼,算是已經(jīng)實(shí)現(xiàn)了整個(gè)功能,從頭到尾分解一下這段代碼的功能,可以看得出如下:
單線程檢查
分頁處理
獲取數(shù)據(jù)
寫入 ES
So,我們可以看到,其實(shí)不同業(yè)務(wù)場景下,線程檢查是一模一樣的代碼,而分頁處理中,獲取數(shù)據(jù)總條數(shù)會(huì)根據(jù)不同業(yè)務(wù)場景而不同,其他代碼也都是相同的,至于寫入 ES 的部分,如果數(shù)據(jù)結(jié)構(gòu)跟從數(shù)據(jù)庫中獲取的實(shí)體對(duì)象沒有區(qū)別的話,這個(gè)也是可以看做是相同的而不需要特別的處理,但是我們公司的項(xiàng)目中,因?yàn)榉N種原因,ES 中的數(shù)據(jù)結(jié)構(gòu)和實(shí)體對(duì)象是不同的(盡管數(shù)據(jù)字段都是相同,我表示我不知道怎么跟你們說這個(gè)歷史遺留的奇葩問題...)。在這里,我們要應(yīng)用一個(gè)設(shè)計(jì)模式,是模板模式,將固定的流程代碼封裝起來。再將可變的部分,留給子類實(shí)現(xiàn)。
1 /** 2 * 類說明:從 JDBC 中獲取重建 ES 的數(shù)據(jù) 3 */ 4 @Service 5 public abstract class JdbcRebuiltEsService<E> extends BaseService{ 6 7 8 protected Logger log = LoggerFactory.getLogger(getClass()); 9 10 private boolean threadLock = false;//線程鎖 11 12 @Value("1") 13 private int startPage;//開始頁碼 14 15 @Value("1000") 16 private int pageSize;//頁面容量 17 18 19 protected abstract int countTotalData() throws Exception; 20 21 protected abstract Collection<E> loadDataSource(int pageSize, int pageNum) throws Exception; 22 23 protected abstract void writeToElasticSearch(Collection<E> collection) throws Exception; 24 25 /** 26 * 檢查線程鎖 27 * 28 * @throws Exception 29 */ 30 private void checkLock() throws Exception { 31 32 //這段代碼需要保證線程安全 33 synchronized (this) { 34 if (threadLock) { 35 //如果已經(jīng)有線程占用,后續(xù)線程進(jìn)入則拋出異常,因?yàn)楸窘涌谥辉试S單線程執(zhí)行 36 throw new RebuiltEsTaskExistException("已經(jīng)有重建任務(wù)正在執(zhí)行,請等待結(jié)束后再發(fā)起新任務(wù)!"); 37 } else { 38 //如果沒有線程占用,則新線程進(jìn)入后將改成線程占用狀態(tài) 39 threadLock = true; 40 log.info("用戶[{}]發(fā)起 ES 重建任務(wù)!其他重建任務(wù)請求將被拒絕!", getUserJid()); 41 } 42 } 43 } 44 45 /** 46 * 數(shù)據(jù)重建 47 * 48 * @return 49 * @throws Exception 50 */ 51 public int rebuild() throws Exception { 52 53 checkLock(); 54 log.info("#=== ES 重建任務(wù)開始執(zhí)行"); 55 56 int totalNum = countTotalData(); 57 log.info("本次重建預(yù)計(jì)總記錄數(shù){}", totalNum); 58 59 int pageTotal; 60 int pageNum = this.startPage; 61 int pageSize = this.pageSize; 62 63 //根據(jù)條目總數(shù)計(jì)算總頁數(shù) 64 if (totalNum % pageSize != 0) { 65 //若不能整除,則頁數(shù)加1 66 pageTotal = totalNum / pageSize + 1; 67 } else { 68 pageTotal = totalNum / pageSize; 69 } 70 71 long startTime = System.currentTimeMillis();//任務(wù)開始計(jì)時(shí) 72 73 try { 74 while (pageNum <= pageTotal) { 75 //分頁查詢數(shù)據(jù)庫的數(shù)據(jù)并同時(shí)發(fā)送到 ES 76 writeToElasticSearch(loadDataSource(pageSize, pageNum)); 77 pageNum++; 78 } 79 } catch (Exception e) { 80 Double progress = (Double) (pageNum * 1.0) / (Double) (pageSize * 1.0); 81 DecimalFormat decimalFormat = new DecimalFormat("##.00%"); 82 log.info("重建異常中斷,當(dāng)前已重建進(jìn)度為:{}", decimalFormat.format(progress)); 83 throw e; 84 } finally { 85 threadLock = false;//不論是否成功,當(dāng)線程退出時(shí),都需要將線程狀態(tài)改為非占用 86 long endTime = System.currentTimeMillis();//任務(wù)結(jié)束計(jì)時(shí) 87 log.info("#=== ES 重建任務(wù)執(zhí)行結(jié)束,耗時(shí):{}毫秒", endTime - startTime); 88 } 89 90 return totalNum; 91 } 92 93 public int getStartPage() { 94 return startPage; 95 } 96 97 public void setStartPage(int startPage) { 98 this.startPage = startPage; 99 }100 101 public int getPageSize() {102 return pageSize;103 }104 105 public void setPageSize(int pageSize) {106 this.pageSize = pageSize;107 }108 }
OK,這樣就解決了。復(fù)寫三個(gè)容易跟隨應(yīng)用場景不同,而改變的方法,分別是,獲取數(shù)據(jù)源,獲取數(shù)據(jù)總條目,寫入 ES。然后暴露 rebuild 方法給外部調(diào)用,在 rebuild 方法內(nèi)部,實(shí)現(xiàn)整個(gè)運(yùn)作流程,這樣也可以避免以后有人需要做新的實(shí)現(xiàn)的時(shí)候,修改到這部分有涉及到同步鎖的代碼,以避免安全隱患。
實(shí)際使用的時(shí)候可以這樣用,創(chuàng)建一個(gè)子類繼承這個(gè) JdbcRebuiltEsService
1 /** 2 * 類說明:商品信息 ES 重建所需要實(shí)現(xiàn)的具體方法 3 */ 4 @Service 5 public class GoodsRebuiltEsServiceImpl extends JdbcRebuiltEsService<Goods> { 6 7 @Autowired 8 private GoodsMapper goodsMapper; 9 10 @Autowired11 private DrawingDAO drawingDAO;12 13 @Override14 public int countTotalData() throws Exception {15 return goodsMapper.countNum();16 }17 18 @Override19 public Collection<Goods> loadDataSource(int pageSize, int pageNum) throws Exception {20 PageHelper.startPage(pageNum, pageSize);21 return goodsMapper.selectForRebuiltES();22 }23 24 @Override25 public void writeToElasticSearch(Collection<Goods> collection) throws Exception {26 drawingDAO.addBatch(collection);27 }28 }
這樣以后每次使用,都只需要實(shí)現(xiàn)一個(gè)新的子類,然后這樣調(diào)用:
1 @Autowired 2 @Qualifier("goodsRebuiltEsServiceImpl") 3 private JdbcRebuiltEsService<Goods> jdbcRebuiltEsService; 4 5 /** 6 * 數(shù)據(jù)重建 7 * 8 * @return 9 * @throws Exception10 */11 @Override12 public int rebuiltEsGoodsData() throws Exception {13 return jdbcRebuiltEsService.rebuiltd();14 }
這樣 rebuilt 方法就很安全的被調(diào)用,將程序中不希望被修改的部分,用父類寫好,只留下希望被復(fù)寫的部分,這樣就可以很好的保護(hù)比較關(guān)鍵的部位,當(dāng)然了,public 方法也是可以重寫的,不過這就超出了我們“以防萬一,不小心寫錯(cuò)”的初衷了,如果需要重寫,那就重寫唄。