目錄
Java并發(fā)包(JUC)中提供了很多并發(fā)工具,這其中,很多我們耳熟能詳?shù)牟l(fā)工具,譬如ReentrangLock、Semaphore,它們的實現(xiàn)都用到了一個共同的基類--AbstractQueuedSynchronizer,簡稱AQS。AQS是一個用來構建鎖和同步器的框架,使用AQS能簡單且高效地構造出應用廣泛的大量的同步器,比如我們提到的ReentrantLock,Semaphore,其他的諸如ReentrantReadWriteLock,SynchronousQueue,F(xiàn)utureTask等等皆是基于AQS的。當然,我們自己也能利用AQS非常輕松容易地構造出符合我們自己需求的同步器。
本章我們就一起探究下這個神奇的東東,并對其實現(xiàn)原理進行剖析理解
基本實現(xiàn)原理
AQS使用一個int成員變量來表示同步狀態(tài),通過內置的FIFO隊列來完成獲取資源線程的排隊工作。
private volatile int state;//共享變量,使用volatile修飾保證線程可見性
狀態(tài)信息通過procted類型的getState,setState,compareAndSetState進行操作
AQS支持兩種同步方式:
1.獨占式
2.共享式
這樣方便使用者實現(xiàn)不同類型的同步組件,獨占式如ReentrantLock,共享式如Semaphore,CountDownLatch,組合式的如ReentrantReadWriteLock??傊?,AQS為使用提供了底層支撐,如何組裝實現(xiàn),使用者可以自由發(fā)揮。
同步器的設計是基于模板方法模式的,一般的使用方式是這樣:
1.使用者繼承AbstractQueuedSynchronizer并重寫指定的方法。(這些重寫方法很簡單,無非是對于共享資源state的獲取和釋放)
2.將AQS組合在自定義同步組件的實現(xiàn)中,并調用其模板方法,而這些模板方法會調用使用者重寫的方法。
這其實是模板方法模式的一個很經(jīng)典的應用。
我們來看看AQS定義的這些可重寫的方法:
protected boolean tryAcquire(int arg) : 獨占式獲取同步狀態(tài),試著獲取,成功返回true,反之為false
protected boolean tryRelease(int arg) :獨占式釋放同步狀態(tài),等待中的其他線程此時將有機會獲取到同步狀態(tài);
protected int tryAcquireShared(int arg) :共享式獲取同步狀態(tài),返回值大于等于0,代表獲取成功;反之獲取失??;
protected boolean tryReleaseShared(int arg) :共享式釋放同步狀態(tài),成功為true,失敗為false
protected boolean isHeldExclusively() : 是否在獨占模式下被線程占用。
關于AQS的使用,我們來簡單總結一下:
首先,我們需要去繼承AbstractQueuedSynchronizer這個類,然后我們根據(jù)我們的需求去重寫相應的方法,比如要實現(xiàn)一個獨占鎖,那就去重寫tryAcquire,tryRelease方法,要實現(xiàn)共享鎖,就去重寫tryAcquireShared,tryReleaseShared;最后,在我們的組件中調用AQS中的模板方法就可以了,而這些模板方法是會調用到我們之前重寫的那些方法的。也就是說,我們只需要很小的工作量就可以實現(xiàn)自己的同步組件,重寫的那些方法,僅僅是一些簡單的對于共享資源state的獲取和釋放操作,至于像是獲取資源失敗,線程需要阻塞之類的操作,自然是AQS幫我們完成了。
對于使用者來講,我們無需關心獲取資源失敗,線程排隊,線程阻塞/喚醒等一系列復雜的實現(xiàn),這些都在AQS中為我們處理好了。我們只需要負責好自己的那個環(huán)節(jié)就好,也就是獲取/釋放共享資源state的姿勢T_T。很經(jīng)典的模板方法設計模式的應用,AQS為我們定義好頂級邏輯的骨架,并提取出公用的線程入隊列/出隊列,阻塞/喚醒等一系列復雜邏輯的實現(xiàn),將部分簡單的可由使用者決定的操作邏輯延遲到子類中去實現(xiàn)即可。
自定義同步器
上面大概講了一些關于AQS如何使用的理論性的東西,接下來,我們就來看下實際如何使用,直接采用JDK官方文檔中的小例子來說明問題
1 package juc; 2 3 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 4 5 /** 6 * Created by chengxiao on 2017/3/28. 7 */ 8 public class Mutex implements java.io.Serializable { 9 //靜態(tài)內部類,繼承AQS10 private static class Sync extends AbstractQueuedSynchronizer {11 //是否處于占用狀態(tài)12 protected boolean isHeldExclusively() {13 return getState() == 1;14 }15 //當狀態(tài)為0的時候獲取鎖,CAS操作成功,則state狀態(tài)為1,16 public boolean tryAcquire(int acquires) {17 if (compareAndSetState(0, 1)) {18 setExclusiveOwnerThread(Thread.currentThread());19 return true;20 }21 return false;22 }23 //釋放鎖,將同步狀態(tài)置為024 protected boolean tryRelease(int releases) {25 if (getState() == 0) throw new IllegalMonitorStateException();26 setExclusiveOwnerThread(null);27 setState(0);28 return true;29 }30 }31 //同步對象完成一系列復雜的操作,我們僅需指向它即可32 private final Sync sync = new Sync();33 //加鎖操作,代理到acquire(模板方法)上就行,acquire會調用我們重寫的tryAcquire方法34 public void lock() {35 sync.acquire(1);36 }37 public boolean tryLock() {38 return sync.tryAcquire(1);39 }40 //釋放鎖,代理到release(模板方法)上就行,release會調用我們重寫的tryRelease方法。41 public void unlock() {42 sync.release(1);43 }44 public boolean isLocked() {45 return sync.isHeldExclusively();46 }47 }
測試下這個自定義的同步器,我們使用之前文章中做過的并發(fā)環(huán)境下a++的例子來說明問題(a++的原子性其實最好使用原子類AtomicInteger來解決,此處用Mutex有點大炮打蚊子的意味,好在能說明問題就好)
TestMutex
測試結果:
加鎖前,a=279204 加鎖后,a=300000
源碼分析
我們先來簡單描述下AQS的基本實現(xiàn),前面我們提到過,AQS維護一個共享資源state,通過內置的FIFO來完成獲取資源線程的排隊工作。(這個內置的同步隊列稱為"CLH"隊列)。該隊列由一個一個的Node結點組成,每個Node結點維護一個prev引用和next引用,分別指向自己的前驅和后繼結點。AQS維護兩個指針,分別指向隊列頭部head和尾部tail。
其實就是個雙端雙向鏈表。
當線程獲取資源失敗(比如tryAcquire時試圖設置state狀態(tài)失?。瑫粯嬙斐梢粋€結點加入CLH隊列中,同時當前線程會被阻塞在隊列中(通過LockSupport.park實現(xiàn),其實是等待態(tài))。當持有同步狀態(tài)的線程釋放同步狀態(tài)時,會喚醒后繼結點,然后此結點線程繼續(xù)加入到對同步狀態(tài)的爭奪中。
Node結點是AbstractQueuedSynchronizer中的一個靜態(tài)內部類,我們撿Node的幾個重要屬性來說一下
1 static final class Node { 2 /** waitStatus值,表示線程已被取消(等待超時或者被中斷)*/ 3 static final int CANCELLED = 1; 4 /** waitStatus值,表示后繼線程需要被喚醒(unpaking)*/ 5 static final int SIGNAL = -1; 6 /**waitStatus值,表示結點線程等待在condition上,當被signal后,會從等待隊列轉移到同步到隊列中 */ 7 /** waitStatus value to indicate thread is waiting on condition */ 8 static final int CONDITION = -2; 9 /** waitStatus值,表示下一次共享式同步狀態(tài)會被無條件地傳播下去10 static final int PROPAGATE = -3;11 /** 等待狀態(tài),初始為0 */12 volatile int waitStatus;13 /**當前結點的前驅結點 */14 volatile Node prev;15 /** 當前結點的后繼結點 */16 volatile Node next;17 /** 與當前結點關聯(lián)的排隊中的線程 */18 volatile Thread thread;19 /** ...... */20 }
獨占式
獲取同步狀態(tài)--acquire()
來看看acquire方法,lock方法一般會直接代理到acquire上
1 public final void acquire(int arg) {2 if (!tryAcquire(arg) &&3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))4 selfInterrupt();5 }
我們來簡單理一下代碼邏輯:
a.首先,調用使用者重寫的tryAcquire方法,若返回true,意味著獲取同步狀態(tài)成功,后面的邏輯不再執(zhí)行;若返回false,也就是獲取同步狀態(tài)失敗,進入b步驟;
b.此時,獲取同步狀態(tài)失敗,構造獨占式同步結點,通過addWatiter將此結點添加到同步隊列的尾部(此時可能會有多個線程結點試圖加入同步隊列尾部,需要以線程安全的方 式添加);
c.該結點以在隊列中嘗試獲取同步狀態(tài),若獲取不到,則阻塞結點線程,直到被前驅結點喚醒或者被中斷。
addWaiter
為獲取同步狀態(tài)失敗的線程,構造成一個Node結點,添加到同步隊列尾部
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode);//構造結點 //指向尾結點tail Node pred = tail; //如果尾結點不為空,CAS快速嘗試在尾部添加,若CAS設置成功,返回;否則,eng。 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
先cas快速設置,若失敗,進入enq方法
將結點添加到同步隊列尾部這個操作,同時可能會有多個線程嘗試添加到尾部,是非線程安全的操作。
以上代碼可以看出,使用了compareAndSetTail這個cas操作保證安全添加尾結點。
enq方法
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { //如果隊列為空,創(chuàng)建結點,同時被head和tail引用 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) {//cas設置尾結點,不成功就一直重試 t.next = node; return t; } } } }
enq內部是個死循環(huán),通過CAS設置尾結點,不成功就一直重試。很經(jīng)典的CAS自旋的用法,我們在之前關于原子類的源碼分析中也提到過。這是一種樂觀的并發(fā)策略。
最后,看下acquireQueued方法
acquireQueued
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) {//死循環(huán) final Node p = node.predecessor();//找到當前結點的前驅結點 if (p == head && tryAcquire(arg)) {//如果前驅結點是頭結點,才tryAcquire,其他結點是沒有機會tryAcquire的。 setHead(node);//獲取同步狀態(tài)成功,將當前結點設置為頭結點。 p.next = null; // 方便GC failed = false; return interrupted; } // 如果沒有獲取到同步狀態(tài),通過shouldParkAfterFailedAcquire判斷是否應該阻塞,parkAndCheckInterrupt用來阻塞線程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
acquireQueued內部也是一個死循環(huán),只有前驅結點是頭結點的結點,也就是老二結點,才有機會去tryAcquire;若tryAcquire成功,表示獲取同步狀態(tài)成功,將此結點設置為頭結點;若是非老二結點,或者tryAcquire失敗,則進入shouldParkAfterFailedAcquire去判斷判斷當前線程是否應該阻塞,若可以,調用parkAndCheckInterrupt阻塞當前線程,直到被中斷或者被前驅結點喚醒。若還不能休息,繼續(xù)循環(huán)。
shouldParkAfterFailedAcquire
shouldParkAfterFailedAcquire用來判斷當前結點線程是否能休息
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //獲取前驅結點的wait值 int ws = pred.waitStatus; if (ws == Node.SIGNAL)//若前驅結點的狀態(tài)是SIGNAL,意味著當前結點可以被安全地park return true; if (ws > 0) { // ws>0,只有CANCEL狀態(tài)ws才大于0。若前驅結點處于CANCEL狀態(tài),也就是此結點線程已經(jīng)無效,從后往前遍歷,找到一個非CANCEL狀態(tài)的結點,將自己設置為它的后繼結點 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 若前驅結點為其他狀態(tài),將其設置為SIGNAL狀態(tài) compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
若shouldParkAfterFailedAcquire返回true,也就是當前結點的前驅結點為SIGNAL狀態(tài),則意味著當前結點可以放心休息,進入parking狀態(tài)了。parkAncCheckInterrupt阻塞線程并處理中斷。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this);//使用LockSupport使線程進入阻塞狀態(tài) return Thread.interrupted();// 線程是否被中斷過 }
至此,關于acquire的方法源碼已經(jīng)分析完畢,我們來簡單總結下
a.首先tryAcquire獲取同步狀態(tài),成功則直接返回;否則,進入下一環(huán)節(jié);
b.線程獲取同步狀態(tài)失敗,就構造一個結點,加入同步隊列中,這個過程要保證線程安全;
c.加入隊列中的結點線程進入自旋狀態(tài),若是老二結點(即前驅結點為頭結點),才有機會嘗試去獲取同步狀態(tài);否則,當其前驅結點的狀態(tài)為SIGNAL,線程便可安心休息,進入阻塞狀態(tài),直到被中斷或者被前驅結點喚醒。
釋放同步狀態(tài)--release()
當前線程執(zhí)行完自己的邏輯之后,需要釋放同步狀態(tài),來看看release方法的邏輯
public final boolean release(int arg) { if (tryRelease(arg)) {//調用使用者重寫的tryRelease方法,若成功,喚醒其后繼結點,失敗則返回false Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h);//喚醒后繼結點 return true; } return false;
出處: <http://www.cnblogs.com/chengxiao/>
本文版權歸作者和博客園共有,歡迎轉載,但未經(jīng)作者同意必須保留此段聲明,且在頁面明顯位置給出原文鏈接。
http://www.cnblogs.com/chengxiao/p/7141160.html