上周的面試中,被問及了幾個關(guān)于Java并發(fā)編程的問題,自己回答的都不是很系統(tǒng)和全面,可以說是“頭皮發(fā)麻”,哈哈。因此果斷購入《Java并發(fā)編程的藝術(shù)》一書,學(xué)習(xí)后的體會是要想快速上手Java并發(fā)編程,最需要掌握的是線程、線程池概念的理解和Executor框架的使用。
Tip:
實踐請見github-multiThread,不會介紹Java內(nèi)存模型等更底層的內(nèi)容??纯聪聢D的“糙漢”身上錯綜復(fù)雜的線[程],愿通過學(xué)習(xí),能化繁為簡,[高效]的編出[高效]的多線程代碼。
基本概念
在實踐中,為了更好的利用資源提高系統(tǒng)整體的吞吐量,會選擇并發(fā)編程。但由于上下文切換和死鎖等問題,并發(fā)編程不一定能提高性能,因此如何合理的進行并發(fā)編程時本文的重點,接下來介紹關(guān)于鎖最基本的一些知識(選學(xué))。
volatile:輕量,保證共享變量的可見性,使得多個線程對共享變量的變更都能及時獲取到。其包括兩個子過程,將當(dāng)前處理器緩存行的數(shù)據(jù)寫回到系統(tǒng)內(nèi)存,之后會使其他CPU里緩存了該內(nèi)存地址的數(shù)據(jù)無效。
synchronized:相對重量,其包含3種形式,針對普通同步方法,鎖是當(dāng)前實例對象;針對靜態(tài)同步方法,鎖是當(dāng)前類的Class對象;對于同步代碼塊,鎖是Synchonize括號內(nèi)配置的對象。此外,synchronize用的鎖存在ava對象頭中,編譯后會插入類似
monitorenter, monitorexit
的代碼。鎖狀態(tài):包括無鎖狀態(tài),偏向鎖狀態(tài),輕量級鎖狀態(tài),重量級鎖狀態(tài)。Tip,鎖可以升級但不能降級。
Java實現(xiàn)原子操作:可以通過鎖和循環(huán)CAS來實現(xiàn)原子操作,不過其也存在3個問題,包括ABA問題,通過版本號解決;循環(huán)時間長開銷大,通過
pause
指令減少自旋帶來的開銷;只能保證一個共享變量的原子操作,通過AtomicRefence
保證引用對象間的原子性,接下來看一個最簡單的CAS操作示例。protected void safeCount() { for (;;) { int i = atomicI.get(); if (atomicI.compareAndSet(i, ++i)) break; } }
線程
這部分和之后的鎖是基礎(chǔ)部分的核心內(nèi)容,需要好好理解。一般來說,線程都是操作系統(tǒng)最小的調(diào)度單元,一個進程中可以包含多個線程,每個線程都擁有自己的計數(shù)器、堆棧和局部變量。系統(tǒng)會采用分時的形式調(diào)度運行的線程,OS會分出一個個的時間片到線程,此外還可以給線程設(shè)置優(yōu)先級,來保證優(yōu)先級高的線程獲得更多的CPU時間。通過下面的示例代碼可以發(fā)現(xiàn),java程序的運行不僅就是main線程,還有清楚Reference的線程、調(diào)用對象finalize方法的線程、分發(fā)處理發(fā)送給JVM信息的線程、Attach Listener線程等。
// 獲取管理線程的MXbeanThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true);// 打印線程信息for (ThreadInfo threadInfo : threadInfos) { System.out.println("[" + threadInfo.getThreadId() + "]" + threadInfo.getThreadName()); }
線程的狀態(tài):Java線程的整個生命周期包括6種不同狀態(tài),分別是
NEW
初始狀態(tài),線程被構(gòu)建但未start;RUNNABLE
運行狀態(tài),Java線程將OS中的就緒和運行兩種狀態(tài)都稱作“運行中”;BLOCKED
阻塞狀態(tài),表示線程阻塞于鎖;WAITING
等待狀態(tài),表示線程進入等待狀態(tài),進入該狀態(tài)表示當(dāng)前線程需要等待其他線程做出特定動作(通知或中斷);TIME_WAITING
超時等待狀態(tài),該狀態(tài)不同于WAITING
,其會在指定的時候后返回;TERMINATED
終止?fàn)顟B(tài),可以使用interrupt()
合理的終止線程,表示當(dāng)前線程已經(jīng)執(zhí)行完畢,之后通過一張Java線程狀態(tài)圖來做個形象的了解。
Daemon守護線程概念非常簡單,java的虛擬機只有在不存在Daemon線程時才會退出。線程間通信有一個的經(jīng)典范式,等待/通知機制。一個線程修改了一個對象的值,而另一個線程感知到了變化,然后進行相應(yīng)的操作,整個過程開始于一個線程,而最終執(zhí)行的是另一個線程。
//等待方:1.獲取對象的鎖 2.如果條件不滿足,那么調(diào)用對象的wait方法,被通知后要檢查條件//3.條件滿足則執(zhí)行對應(yīng)的邏輯synchronized(lock){while(!flag){lock.wait();} }//通知方:1.獲取對象的鎖 2.改變條件 3.通知所有等待在對象上線程synchronized(lock){ flag = true;lock.notifyAll(); }
如果線程A執(zhí)行了Thread.join(),表示當(dāng)線程A等待的線程終止之后才從thread.join()返回,其還提供了join(long millis)和join(int millis, int nanos)方法,當(dāng)給點時間內(nèi)前驅(qū)線程未結(jié)束則強制返回。ThreadLocal
線程變量是以ThreadLocal
對象為鍵,任意對象為值的存儲結(jié)構(gòu)。此外,這部分常見的應(yīng)用實例包括等待超時模式,數(shù)據(jù)庫線程池,基于線程池的簡單Web服務(wù)器等。
鎖
鎖是用來控制多個線程訪問共享資源的方式,在Lock接口出現(xiàn)前都是通過synchronized
來處理線程間同步問題。鎖的主要方法包括lock
, tryLock
, unlock
, newCondition
獲取等待通知組件等方法。其相關(guān)的實現(xiàn)包括隊列同步器AbstractQueuedSynchronizer
、重入鎖ReentrantLock
、讀寫鎖ReentrantReadWriteLock
、LockSupport和Condition接口,這部分的重點講是可重入鎖ReenterLock。
重入鎖
ReentrantLock
表示該鎖可以支持一個線程對資源的重復(fù)加鎖,并支持獲取瑣時的公平性的選擇。默認(rèn)是非公平鎖,其特點是性能要遠高于公平鎖(嚴(yán)格按照請求時間順序獲取所,F(xiàn)IFO)。ReentrantLock lock = new ReentrantLock(true);lock.lock();try { // TODO} finally { lock.unlock(); }
讀寫鎖
ReentrantReadWriteLock
同時維護一個讀鎖和一個寫鎖,允許多個讀線程同時訪問共享數(shù)據(jù),只會在寫線程訪問時阻塞,和數(shù)據(jù)庫的鎖機制很類似,該方式使得并發(fā)性等到很大提升。其除了公平性選擇、可重入等特性外,還支持鎖降級,遵循獲取寫鎖、獲取讀鎖再釋放寫鎖的次序,寫鎖能降級為讀鎖。LockSupport提供
park
阻塞,unpark
喚醒的靜態(tài)方法。Condition接口:任意的Java對象,都擁有一組監(jiān)視器方法,包括
wait()
、notify()
等,這些方法與synchronized
關(guān)鍵字配合可以實現(xiàn)等待/通知模式,Condition
接口也提供了類似的監(jiān)視器方法,但功能更加強大。
進階概念
并發(fā)容器和框架
ConcurrentHashMap VS HashTable:之所以決定好好學(xué)學(xué)Java并發(fā)編程,可以說就是面試時被面試官懟住這個問題。過去只知道ConcurrentHashMap是HashMap的線程安全版本,但其與HashTable的區(qū)別卻從來沒關(guān)心過。簡答來說,前者通過
Segment
對HashEntry
進行包裝,達到了記錄級別的鎖粒度,和數(shù)據(jù)庫相關(guān)知識類似。HashTable由于只支持[表]級鎖,因此性能比較低下。ConcurrentLinkedQueue
則是隊列的線程安全版本,沒有什么特別要說的。BlockingQueue阻塞隊列是一種支持兩個附加操作的隊列,一個是支持阻塞插入,即當(dāng)隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿,另一個支持阻塞的移除方法,意思是隊列為空時,獲取元素的線程會等待隊列變?yōu)榉强?。其處理方式包括拋出異常、返回特殊值、一直阻塞和超時退出。Java7提供的阻塞隊列包括
ArrayBlockingQueue
,LinkedBlockingQueue
,DelayQueue
等,不是重點。Fork/Join框架:Java7中提供的類似Map/Reduce的并行開發(fā)框架,F(xiàn)ork可以將任務(wù)分解為子任務(wù),而Join則負(fù)責(zé)匯總結(jié)果。其中涉及一個工作竊取
work-stealing
算法,可以使得線程可以從其他隊列里竊取任務(wù)來執(zhí)行,優(yōu)點是充分利用線程進行并行計算,減少了線程間的競爭;缺點是在某些情況下存在競爭,比如雙端隊列里只有一個任務(wù)時,該算法會消耗更多的系統(tǒng)資源。
并發(fā)工具類
這部分的內(nèi)容非常重要,之后介紹的一些常見模式可以很好的應(yīng)用在日常的開發(fā)場景中,一定要掌握牢靠。
13個原子操作類:比較常見的有
AtomicBoolean
和AtomicInteger
,AtomicIntegerArray
,AutomicReference
等,接下來選擇一個比較復(fù)雜的作為示例。User user = new User("xionger", 30); atomicUserRef.set(user); User updateUser = new User("xiongerda", 32); atomicUserRef.compareAndSet(user, updateUser); System.out.println(atomicUserRef.get().getName()); System.out.println(atomicUserRef.get().getOld());
CountDownLatch:類似一個計數(shù)器,允許一個或多個線程等待其他線程完成操作,比如主線程需要等待2個子線程完成任務(wù)后返回。常見場景,比如我們解析Excel多個Sheet的數(shù)據(jù),那么可以由每個線程處理一個,再都完成后再通知系統(tǒng)解析完成。
static CountDownLatch latch = new CountDownLatch(3);public static void main(String[] args) throws InterruptedException { new Thread(new Runnable() { @Override public void run() { latch.countDown(); } }).start(); new Thread(new Runnable() { @Override public void run() { latch.countDown(); } }).start(); latch.await(); }
CyclicBarrier:其讓一組線程到達一個屏障,類似跑步的起跑線,直到最后一個線程到達屏障,屏障才會開門,所有被阻塞的線程才能繼續(xù)執(zhí)行。以可用于多線程計算數(shù)據(jù),最后合并計算數(shù)據(jù)的場景,例如用一個Excel保存用戶所有銀行流水,每個Sheet保存一個賬戶近一年的流水,現(xiàn)在要統(tǒng)計日均流水,那么可以先計算每個Sheet的日均流水,最后匯總。使用上和
CountDownLatch
有些相似,不過其特點是可以使用reset
方法重置,并通過isBroken()
判斷線程是否中斷。Semaphore信號量用于控制同時訪問特定資源的線程數(shù)量,常用與流量控制,比如數(shù)據(jù)庫連接的控制,有50個線程需要使用15數(shù)據(jù)庫連接。
private static ExecutorService executorService = Executors.newFixedThreadPool(50); private static Semaphore sema = new Semaphore(15); public static void main(String[] args) { for (int i = 0; i < 50; i++) { executorService.execute(new Runnable() { @Override public void run() { try { sema.acquire(); System.out.println("save data"); sema.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } executorService.shutdown(); }
Exchanger交換者可用于線程間數(shù)據(jù)交換,它提供一個同步點,在這個同步點,兩個線程可以交換彼此的數(shù)據(jù)。Exchange可以用于遺傳算法和校對工作等場景,比如需要將紙質(zhì)流水錄入到系統(tǒng),為了避免錯位,使用AB崗兩人進行錄入,錄入到Excel后,系統(tǒng)需要加載這兩個Excel并進行校對。
private static final Exchanger<String> exchanger = new Exchanger<>();private static ExecutorService threadPool = Executors.newFixedThreadPool(2);public static void main(String[] args) { threadPool.execute(new Runnable() { @Override public void run() { String a = "銀行流水A"; try { exchanger.exchange(a); } catch (InterruptedException e) { e.printStackTrace(); } } }); threadPool.execute(new Runnable() { @Override public void run() { String b = "銀行流水B"; try { String a = exchanger.exchange(b); System.out.println("a和b是否數(shù)據(jù)一致:" + a.equals(b) + ",a錄入的是: " + a + ",b錄入的是" + b); } catch (InterruptedException e) { e.printStackTrace(); } } }); }
Executor框架
線程池
在介紹Executor框架前,先介紹線程池相關(guān)的原理,其是并發(fā)編程中最為重要的部分,合理的使用線程池可以降低系統(tǒng)消耗、提高響應(yīng)速度、提高線程的可管理性,接下來介紹線程池的基礎(chǔ)處理流程。
1.如果當(dāng)前運行的線程少于corePoolSize直接創(chuàng)建新線程來執(zhí)行任務(wù),需要獲取全局鎖。
2.如果運行的線程等于或多余corePoolSize則將任務(wù)加入BlockingQueue。
3.如果由于隊列已滿,無法將任務(wù)加到BlockingQueue,則創(chuàng)建新的線程來處任務(wù),需要獲取全局鎖。
4.如果創(chuàng)建新線程將操作maximumPoolSize,任務(wù)將被拒絕,并調(diào)用RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor采用上述步驟,保證了執(zhí)行execute()
時,盡可能的避免了獲取全局鎖,大部分的可能都會執(zhí)行步驟2,而無需獲取全局鎖。
在引入Executor框架前,Java線程既是工作單元,也是執(zhí)行機制。而在Executor框架中,工作單元和執(zhí)行機制被分離開來,前者包括Runnable
和Callable
,而執(zhí)行機制由Executor框架提供。該框架是一個兩級的調(diào)度模型,在上層,通過調(diào)度器Executor將多個任務(wù)映射到固定數(shù)量的線程;在底層,操作系統(tǒng)內(nèi)核將這些線程再映射到處理器上。而我們的應(yīng)用程序只需通過E該框架控制上層的調(diào)度即可。
Tip:
在合理配置線程池時,需要根據(jù)具體場景給出對應(yīng)的解決方案,總體來說,推薦使用有界隊列,便于控制。
CPU密集型:配置盡可能少的線程,如cpu數(shù)量+1
,可以通過Runtime.getRuntime().availableProcessors()
獲取CPU個數(shù)
IO密集型:配置盡可能多的線程,如2*cpu數(shù)量
,常見場景,等待數(shù)據(jù)庫或服務(wù)接口的返回。
優(yōu)先級:可以通過PriorityBlockingQueue
來處理
監(jiān)控:可以通過taskCount
,completedTaskCount
,getActiveSize
等函數(shù)來監(jiān)控線程池的運行。Executor框架結(jié)構(gòu)主要由三部分組成
a.任務(wù),包括任務(wù)實現(xiàn)的接口Runnable
和Callable
b.任務(wù)的執(zhí)行,包括任務(wù)執(zhí)行機制的核心接口Executor
和其子類ExecutorService
,相關(guān)的實現(xiàn)類包括ThreadPoolExecutor
和ScheduledThreadPoolExecutor
。
c.異步計算的結(jié)果,包括Future
和其實現(xiàn)FutureTask
。ThreadPoolExecutor:框架的核心類,由
corePool
,maximumPool
,BlockingQueue
,RejectedExecutionHandler
4部分組成,可以由工具類Executors
創(chuàng)建。具體老說,工具類可以創(chuàng)建FixedThreadPool
固定線程數(shù)(最推薦)、SingleThreadExecutor
、CachedThreadPool
三種類型的ThreadPoolExecutor
。ScheduledThreadPoolExecutor:比基礎(chǔ)的
Timer
對象更加全面,其通過DelayQueue
來執(zhí)行周期性或定時的任務(wù)。FutureTask基于
AbstractQueuedSynchronizer
(AQS),之前介紹的ReentrantLock
、CountDownLatch
等其實都是基于AQS來實現(xiàn)的。AQS是一個同步框架,提供通用機制來原子性的管理同步狀態(tài)、阻塞&喚醒線程、維護被阻塞的線程隊列。每個基于AQS的實現(xiàn)都會包含兩類操作,acquire用于阻塞調(diào)用線程,對應(yīng)futureTask.get()
,知道AQS狀態(tài)允許這個線程才能繼續(xù)執(zhí)行;另一個為release,對應(yīng)futureTask.cancel()&run()
,該操作改變AQS狀態(tài),改變后的狀態(tài)允許一個或多個阻塞線程解除阻塞。public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newSingleThreadExecutor(); Future<BigDecimal> result = executor.submit(new Callable<BigDecimal>() { @Override public BigDecimal call() throws Exception { return getSalaryByService(); } }); System.out.println(result.get()); }
生產(chǎn)者消費者模式:該模式可以解決大部分的并發(fā)問題,其通過阻塞隊列,平衡生產(chǎn)線程和消費線程的工作能力來提高程序整體處理數(shù)據(jù)的速度。比如經(jīng)常會郵件來分享技術(shù)知識,就可以通過通過Job到郵箱中獲取到文章并放入阻塞隊列,之后消費者去獲取數(shù)據(jù)并插入到類似confluence的文檔管理工具中,接下來展示一個單個生產(chǎn)者,多個消費者的應(yīng)用場景實現(xiàn)。
Tip:
線上問題定位:Linux中可以通過top
命令查看進程的情況,之后可以使用交互命令1
查看CPU性能,H
查看每個線程的性能信息。
性能測試:比如使用Jmeter來做壓測,可以通過netstat -nat | grep 3306 -c
來查看數(shù)據(jù)的壓力情況。
參考資料
方騰飛. Java并發(fā)編程的藝術(shù)[M]. 上海:機械工業(yè)出版社, 2017.
作 者:熊二哥
出 處:http://www.cnblogs.com/wanliwang01/
版權(quán)聲明:本文版權(quán)歸作者和博客園共有,歡迎轉(zhuǎn)載,但未經(jīng)作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接。
http://www.cnblogs.com/wanliwang01/p/javacore_multiThread.html