本篇是這個內(nèi)容的第一篇,主要是寫:遇到的問題,和自己摸索實現(xiàn)的方法。后面還會有一篇是總結(jié)性地寫線程池的相關(guān)內(nèi)容(偏理論的)。
一、背景介紹
朋友的項目開發(fā)到一定程度之后,又遇到了一些問題:在某些流程中的一些節(jié)點,由于是串聯(lián)執(zhí)行的。上一步要等下一步執(zhí)行完畢;或者提交數(shù)據(jù)之后要等待后臺其他系統(tǒng)處理完成之后,才能返回結(jié)果。這樣就會導致,請求發(fā)起方不得不一直等待結(jié)果,用戶體驗很不好;從項目優(yōu)化來說,模塊與模塊之間構(gòu)成了強耦合,這也是不利于以后擴展的,更不用說訪問量上來之后,肯定會抓瞎的問題。所以,我就著手開始,利用異步線程池來解決這個問題。
剛開始的時候,我準備只是在節(jié)點處另外起線程去執(zhí)行異步操作。但是,考慮到以后的擴展,同時利用“池化”技術(shù),更加高效地重復利用線程,節(jié)省資源。在這里就選定了,使用線程池的方法。
二、實現(xiàn)步驟
實現(xiàn)總共分為四步:
第一步,在啟動服務的時候初始化線程池;
第二步,建立有隊列的線程池;
第三步,將業(yè)務邏輯方法與線程池聯(lián)系起來;
第四步,調(diào)整原有代碼邏輯結(jié)構(gòu),將可以異步的操作放入第三步的業(yè)務邏輯方法,并將請求放入線程池的隊列中,等待執(zhí)行。
三、具體實現(xiàn)
首先,第一步我們在web項目的起源之處web.xml中加入這么一行
1 <listener>2 3 <listener-class>com.jptec.kevin.thread.listener.InitThreadPoolListener</listener-class>4 5 </listener>
這里的路徑實際上就是,在啟動項目之后,會加載的初始化函數(shù)。這個函數(shù)主要的作用就是:將線程池啟動起來。實現(xiàn)代碼如下:
1 public class InitThreadPoolListener implements ServletContextListener { 2 3 @Override 4 public void contextInitialized(ServletContextEvent sce) { 5 6 new TestThreadPool().runThread(); 7 } 8 9 @Override10 public void contextDestroyed(ServletContextEvent sce) {11 }12 13 }
好了,第一步就算完工了。
然后,我們開始第二步,建立有隊列的線程池(這里有很多,理論上的內(nèi)容,會放在第二篇中詳細說)。在這里主要是,定義了一個ArrayBlockingQueue隊列(先進先出,有限阻塞),使用Executor定義了一個線程池。具體代碼如下:
1 public class TestThreadPool { 2 3 protected final static Logger log = LoggerFactory.getLogger(TestThreadPool.class); 4 5 // 線程休眠時間(秒) 6 // 存放需要發(fā)送的信息 7 public static BlockingQueue<Runnable> addressBqueue = new ArrayBlockingQueue<Runnable>( 8 10000); 9 10 public static final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 200, 25, TimeUnit.SECONDS,11 addressBqueue);12 13 public TestThreadPool() {14 }15 16 17 public void runThread() {18 19 try {20 executor.prestartCoreThread();21 log.info("隊列大小:" + executor.getQueue().size());22 23 } catch (Exception e) {24 log.error("啟動子線程異常", e);25 }26 27 }28 29 }
完成第二步之后,我們繼續(xù)第三步。我們有了線程池,那么實際代碼如何將請求放入其中,并等待執(zhí)行呢。于是,這里分為兩個類,一個是負責業(yè)務代碼中調(diào)用的,負責向隊列中插入請求,一個是單個線程的實現(xiàn)類。具體實現(xiàn)如下:
插入請求實現(xiàn):
1 /** 2 * 線程隊列 3 */ 4 public class TestQueue { 5 protected final static Logger log = LoggerFactory.getLogger(TestQueue.class); 6 7 public static boolean put(String userId,String tradeId, String amount, String flag, String term) { 8 log.debug("添加入隊列開始... 額度申請用戶tradeId=[{}]", tradeId); 9 try {10 TestThreadPool.executor.execute(new TestThread( tradeId, amount, flag, term, userId));11 log.debug("添加入隊列結(jié)束...");12 } catch (Exception e) {13 log.error("添加入隊列異常...", e);14 return false;15 }16 return true;17 }18 19 }
單個線程實現(xiàn)(第八行的引用在下文細說):
1 /** 2 * 發(fā)送信息線程處理類 3 */ 4 public class TestThread implements Runnable { 5 6 protected final static Logger log = LoggerFactory.getLogger(TestThread.class); 7 8 private TradeService tradeService = (TradeService) SpringHandle.getBean("tradeService"); 9 10 String tradeId;11 String amount;12 String flag;13 String term;14 String userId;15 16 17 18 /** 19 * <p>Title: </p>20 * <p>Description: </p>21 * @param tradeId22 * @param amount23 * @param flag24 * @param term25 * @param userId 26 */ 27 28 public TestThread(String tradeId, String amount, String flag, String term,29 String userId) {30 super();31 this.tradeId = tradeId;32 this.amount = amount;33 this.flag = flag;34 this.term = term;35 this.userId = userId;36 }37 38 @Override39 public void run() {40 log.info("線程開始tradeId={}", tradeId);41 log.info("線程名:={}", Thread.currentThread().getId());42 log.info("隊列大小:" + TestThreadPool.executor.getPoolSize() + ","43 + TestThreadPool.executor.getCompletedTaskCount());44 putTradeConfirm(userId,tradeId, amount, flag, term);45 try {46 Thread.sleep(1000L);47 } catch (InterruptedException e) {48 e.printStackTrace();49 }50 }51 52 private void putTradeConfirm(String userId,String tradeId, String amount, String flag, String term) {53 54 tradeService.getMatchFundInfo(userId,amount, tradeId, flag, term);55 56 }57 58 }
這里需要注意的是,我需要獲得一個Service的實例來調(diào)用具體的方法。但是,注釋的方法不起作用,于是在朋友的幫助下,使用了輔助類。具體實現(xiàn)如下:
1 @Component 2 public final class SpringHandle implements BeanFactoryPostProcessor { 3 4 private static ConfigurableListableBeanFactory beanFactory; // Spring應用上下文環(huán)境 5 6 public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { 7 SpringHandle.beanFactory = beanFactory; 8 } 9 10 /**11 * 獲取對象12 * 13 * @param name14 * @return Object 一個以所給名字注冊的bean的實例15 * @throws org.springframework.beans.BeansException16 * 17 */18 @SuppressWarnings("unchecked")19 public static <T> T getBean(String name) throws BeansException {20 return (T) beanFactory.getBean(name);21 }22 23 /**24 * 獲取類型為requiredType的對象25 * 26 * @param clz27 * @return28 * @throws org.springframework.beans.BeansException29 * 30 */31 public static <T> T getBean(Class<T> clz) throws BeansException {32 T result = (T) beanFactory.getBean(clz);33 return result;34 }35 36 /**37 * 如果BeanFactory包含一個與所給名稱匹配的bean定義,則返回true38 * 39 * @param name40 * @return boolean41 */42 public static boolean containsBean(String name) {43 return beanFactory.containsBean(name);44 }45 46 /**47 * 判斷以給定名字注冊的bean定義是一個singleton還是一個prototype。48 * 如果與給定名字相應的bean定義沒有被找到,將會拋出一個異常(NoSuchBeanDefinitionException)49 * 50 * @param name51 * @return boolean52 * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException53 * 54 */55 public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {56 return beanFactory.isSingleton(name);57 }58 59 /**60 * @param name61 * @return Class 注冊對象的類型62 * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException63 * 64 */65 public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {66 return beanFactory.getType(name);67 }68 69 /**70 * 如果給定的bean名字在bean定義中有別名,則返回這些別名71 * 72 * @param name73 * @return74 * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException75 * 76 */77 public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {78 return beanFactory.getAliases(name);79 }80 81 }
最后,在具體業(yè)務邏輯中,調(diào)用插入請求的方法,即可。
TradeGetFundInfoQueue.put(userId, tradeId, quota, repaymentType, timeLimit);
四、測試函數(shù)
由于在項目中,所以我寫了另外一個測試函數(shù)(這個測試函數(shù),會在下一篇文章中再次遇到),放在這里。供大家參考:
1 public class TestThreadPool { 2 3 public static BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>( 4 10000); 5 6 public static void main(String[] args) { 7 for (int i = 0; i < 2; i++) { 8 queue.add(new TestThread("初始化")); 9 }10 11 final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 15, TimeUnit.SECONDS, queue);12 13 executor.prestartCoreThread();14 15 16 new Thread(new Runnable() {17 @Override18 public void run() {19 while (true) {20 System.out.println("getActiveCount=" + executor.getActiveCount()21 + ";getKeepAliveTime=" + executor.getKeepAliveTime(TimeUnit.SECONDS)22 + ";getCompletedTaskCount=" + executor.getCompletedTaskCount()23 + ";getCorePoolSize=" + executor.getCorePoolSize()24 + ";getLargestPoolSize=" + executor.getLargestPoolSize()25 + ";getMaximumPoolSize=" + executor.getMaximumPoolSize()26 + ";getPoolSize=" + executor.getPoolSize()27 + ";getTaskCount=" + executor.getTaskCount()28 + ";getQueue().size()=" + executor.getQueue().size()29 );30 try {31 Thread.currentThread().sleep(200L);32 } catch (InterruptedException e) {33 e.printStackTrace();34
http://www.cnblogs.com/iceworld520/p/7066388.html