目錄

    1 基本實現(xiàn)原理

      1.1 如何使用

       1.2 設計思想

    2 自定義同步器

      2.1 同步器代碼實現(xiàn)

       2.2 同步器代碼測試

    3 源碼分析

      3.1 Node結點

       3.2 獨占式

       3.3 共享式

    4 總結 

 Java并發(fā)包(JUC)中提供了很多并發(fā)工具,這其中,很多我們耳熟能詳?shù)牟l(fā)工具,譬如ReentrangLock、Semaphore,它們的實現(xiàn)都用到了一個共同的基類--AbstractQueuedSynchronizer,簡稱AQS。AQS是一個用來構建鎖和同步器的框架,使用AQS能簡單且高效地構造出應用廣泛的大量的同步器,比如我們提到的ReentrantLock,Semaphore,其他的諸如ReentrantReadWriteLock,SynchronousQueue,F(xiàn)utureTask等等皆是基于AQS的。當然,我們自己也能利用AQS非常輕松容易地構造出符合我們自己需求的同步器。

  本章我們就一起探究下這個神奇的東東,并對其實現(xiàn)原理進行剖析理解

基本實現(xiàn)原理

  AQS使用一個int成員變量來表示同步狀態(tài),通過內置的FIFO隊列來完成獲取資源線程的排隊工作。

    private volatile int state;//共享變量,使用volatile修飾保證線程可見性

狀態(tài)信息通過procted類型的getState,setState,compareAndSetState進行操作

AQS支持兩種同步方式:

  1.獨占式

  2.共享式

  這樣方便使用者實現(xiàn)不同類型的同步組件,獨占式如ReentrantLock,共享式如Semaphore,CountDownLatch,組合式的如ReentrantReadWriteLock??傊?,AQS為使用提供了底層支撐,如何組裝實現(xiàn),使用者可以自由發(fā)揮。

同步器的設計是基于模板方法模式的,一般的使用方式是這樣:

  1.使用者繼承AbstractQueuedSynchronizer并重寫指定的方法。(這些重寫方法很簡單,無非是對于共享資源state的獲取和釋放)

  2.將AQS組合在自定義同步組件的實現(xiàn)中,并調用其模板方法,而這些模板方法會調用使用者重寫的方法。

這其實是模板方法模式的一個很經(jīng)典的應用。

我們來看看AQS定義的這些可重寫的方法:

    protected boolean tryAcquire(int arg) : 獨占式獲取同步狀態(tài),試著獲取,成功返回true,反之為false

    protected boolean tryRelease(int arg) :獨占式釋放同步狀態(tài),等待中的其他線程此時將有機會獲取到同步狀態(tài);

    protected int tryAcquireShared(int arg) :共享式獲取同步狀態(tài),返回值大于等于0,代表獲取成功;反之獲取失??;

    protected boolean tryReleaseShared(int arg) :共享式釋放同步狀態(tài),成功為true,失敗為false

    protected boolean isHeldExclusively() : 是否在獨占模式下被線程占用。

關于AQS的使用,我們來簡單總結一下:

  如何使用

  首先,我們需要去繼承AbstractQueuedSynchronizer這個類,然后我們根據(jù)我們的需求去重寫相應的方法,比如要實現(xiàn)一個獨占鎖,那就去重寫tryAcquire,tryRelease方法,要實現(xiàn)共享鎖,就去重寫tryAcquireShared,tryReleaseShared;最后,在我們的組件中調用AQS中的模板方法就可以了,而這些模板方法是會調用到我們之前重寫的那些方法的。也就是說,我們只需要很小的工作量就可以實現(xiàn)自己的同步組件,重寫的那些方法,僅僅是一些簡單的對于共享資源state的獲取和釋放操作,至于像是獲取資源失敗,線程需要阻塞之類的操作,自然是AQS幫我們完成了。

  設計思想

  對于使用者來講,我們無需關心獲取資源失敗,線程排隊,線程阻塞/喚醒等一系列復雜的實現(xiàn),這些都在AQS中為我們處理好了。我們只需要負責好自己的那個環(huán)節(jié)就好,也就是獲取/釋放共享資源state的姿勢T_T。很經(jīng)典的模板方法設計模式的應用,AQS為我們定義好頂級邏輯的骨架,并提取出公用的線程入隊列/出隊列,阻塞/喚醒等一系列復雜邏輯的實現(xiàn),將部分簡單的可由使用者決定的操作邏輯延遲到子類中去實現(xiàn)即可。

自定義同步器

  同步器代碼實現(xiàn)

上面大概講了一些關于AQS如何使用的理論性的東西,接下來,我們就來看下實際如何使用,直接采用JDK官方文檔中的小例子來說明問題

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設計培訓,網(wǎng)站建設培訓

 1 package juc; 2  3 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 4  5 /** 6  * Created by chengxiao on 2017/3/28. 7  */ 8 public class Mutex implements java.io.Serializable { 9     //靜態(tài)內部類,繼承AQS10     private static class Sync extends AbstractQueuedSynchronizer {11         //是否處于占用狀態(tài)12         protected boolean isHeldExclusively() {13             return getState() == 1;14         }15         //當狀態(tài)為0的時候獲取鎖,CAS操作成功,則state狀態(tài)為1,16         public boolean tryAcquire(int acquires) {17             if (compareAndSetState(0, 1)) {18                 setExclusiveOwnerThread(Thread.currentThread());19                 return true;20             }21             return false;22         }23         //釋放鎖,將同步狀態(tài)置為024         protected boolean tryRelease(int releases) {25             if (getState() == 0) throw new IllegalMonitorStateException();26             setExclusiveOwnerThread(null);27             setState(0);28             return true;29         }30     }31         //同步對象完成一系列復雜的操作,我們僅需指向它即可32         private final Sync sync = new Sync();33         //加鎖操作,代理到acquire(模板方法)上就行,acquire會調用我們重寫的tryAcquire方法34         public void lock() {35             sync.acquire(1);36         }37         public boolean tryLock() {38             return sync.tryAcquire(1);39         }40         //釋放鎖,代理到release(模板方法)上就行,release會調用我們重寫的tryRelease方法。41         public void unlock() {42             sync.release(1);43         }44         public boolean isLocked() {45             return sync.isHeldExclusively();46         }47 }

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設計培訓,網(wǎng)站建設培訓

  同步器代碼測試

測試下這個自定義的同步器,我們使用之前文章中做過的并發(fā)環(huán)境下a++的例子來說明問題(a++的原子性其實最好使用原子類AtomicInteger來解決,此處用Mutex有點大炮打蚊子的意味,好在能說明問題就好)

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設計培訓,網(wǎng)站建設培訓 TestMutex

測試結果:

加鎖前,a=279204
加鎖后,a=300000

源碼分析

   我們先來簡單描述下AQS的基本實現(xiàn),前面我們提到過,AQS維護一個共享資源state,通過內置的FIFO來完成獲取資源線程的排隊工作。(這個內置的同步隊列稱為"CLH"隊列)。該隊列由一個一個的Node結點組成,每個Node結點維護一個prev引用和next引用,分別指向自己的前驅和后繼結點。AQS維護兩個指針,分別指向隊列頭部head和尾部tail。

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設計培訓,網(wǎng)站建設培訓

  其實就是個雙端雙向鏈表。

  當線程獲取資源失敗(比如tryAcquire時試圖設置state狀態(tài)失?。瑫粯嬙斐梢粋€結點加入CLH隊列中,同時當前線程會被阻塞在隊列中(通過LockSupport.park實現(xiàn),其實是等待態(tài))。當持有同步狀態(tài)的線程釋放同步狀態(tài)時,會喚醒后繼結點,然后此結點線程繼續(xù)加入到對同步狀態(tài)的爭奪中。

  Node結點

  Node結點是AbstractQueuedSynchronizer中的一個靜態(tài)內部類,我們撿Node的幾個重要屬性來說一下

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設計培訓,網(wǎng)站建設培訓

 1 static final class Node { 2         /** waitStatus值,表示線程已被取消(等待超時或者被中斷)*/ 3         static final int CANCELLED =  1; 4         /** waitStatus值,表示后繼線程需要被喚醒(unpaking)*/ 5         static final int SIGNAL    = -1; 6         /**waitStatus值,表示結點線程等待在condition上,當被signal后,會從等待隊列轉移到同步到隊列中 */ 7         /** waitStatus value to indicate thread is waiting on condition */ 8         static final int CONDITION = -2; 9        /** waitStatus值,表示下一次共享式同步狀態(tài)會被無條件地傳播下去10         static final int PROPAGATE = -3;11         /** 等待狀態(tài),初始為0 */12         volatile int waitStatus;13         /**當前結點的前驅結點 */14         volatile Node prev;15         /** 當前結點的后繼結點 */16         volatile Node next;17         /** 與當前結點關聯(lián)的排隊中的線程 */18         volatile Thread thread;19         /** ...... */20     }

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設計培訓,網(wǎng)站建設培訓

獨占式

  獲取同步狀態(tài)--acquire()

  來看看acquire方法,lock方法一般會直接代理到acquire上

1  public final void acquire(int arg) {2         if (!tryAcquire(arg) &&3             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))4             selfInterrupt();5     }

  我們來簡單理一下代碼邏輯:

    a.首先,調用使用者重寫的tryAcquire方法,若返回true,意味著獲取同步狀態(tài)成功,后面的邏輯不再執(zhí)行;若返回false,也就是獲取同步狀態(tài)失敗,進入b步驟;

    b.此時,獲取同步狀態(tài)失敗,構造獨占式同步結點,通過addWatiter將此結點添加到同步隊列的尾部(此時可能會有多個線程結點試圖加入同步隊列尾部,需要以線程安全的方  式添加);

    c.該結點以在隊列中嘗試獲取同步狀態(tài),若獲取不到,則阻塞結點線程,直到被前驅結點喚醒或者被中斷。

  addWaiter

    為獲取同步狀態(tài)失敗的線程,構造成一個Node結點,添加到同步隊列尾部

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設計培訓,網(wǎng)站建設培訓

 private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);//構造結點        //指向尾結點tail
        Node pred = tail;        //如果尾結點不為空,CAS快速嘗試在尾部添加,若CAS設置成功,返回;否則,eng。
        if (pred != null) {
            node.prev = pred;            if (compareAndSetTail(pred, node)) {
                pred.next = node;                return node;
            }
        }
        enq(node);        return node;
    }

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設計培訓,網(wǎng)站建設培訓

  先cas快速設置,若失敗,進入enq方法  

  將結點添加到同步隊列尾部這個操作,同時可能會有多個線程嘗試添加到尾部,是非線程安全的操作。

  以上代碼可以看出,使用了compareAndSetTail這個cas操作保證安全添加尾結點。

  enq方法

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設計培訓,網(wǎng)站建設培訓

 private Node enq(final Node node) {        for (;;) {
            Node t = tail;            if (t == null) { //如果隊列為空,創(chuàng)建結點,同時被head和tail引用
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;                if (compareAndSetTail(t, node)) {//cas設置尾結點,不成功就一直重試
                    t.next = node;                    return t;
                }
            }
        }
    }

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設計培訓,網(wǎng)站建設培訓

  enq內部是個死循環(huán),通過CAS設置尾結點,不成功就一直重試。很經(jīng)典的CAS自旋的用法,我們在之前關于原子類的源碼分析中也提到過。這是一種樂觀的并發(fā)策略。

  最后,看下acquireQueued方法

  acquireQueued

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設計培訓,網(wǎng)站建設培訓

final boolean acquireQueued(final Node node, int arg) {        boolean failed = true;        try {            boolean interrupted = false;            for (;;) {//死循環(huán)
                final Node p = node.predecessor();//找到當前結點的前驅結點
                if (p == head && tryAcquire(arg)) {//如果前驅結點是頭結點,才tryAcquire,其他結點是沒有機會tryAcquire的。
                    setHead(node);//獲取同步狀態(tài)成功,將當前結點設置為頭結點。
                    p.next = null; // 方便GC
                    failed = false;                    return interrupted;
                }                // 如果沒有獲取到同步狀態(tài),通過shouldParkAfterFailedAcquire判斷是否應該阻塞,parkAndCheckInterrupt用來阻塞線程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {            if (failed)
                cancelAcquire(node);
        }
    }

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設計培訓,網(wǎng)站建設培訓

  acquireQueued內部也是一個死循環(huán),只有前驅結點是頭結點的結點,也就是老二結點,才有機會去tryAcquire;若tryAcquire成功,表示獲取同步狀態(tài)成功,將此結點設置為頭結點;若是非老二結點,或者tryAcquire失敗,則進入shouldParkAfterFailedAcquire去判斷判斷當前線程是否應該阻塞,若可以,調用parkAndCheckInterrupt阻塞當前線程,直到被中斷或者被前驅結點喚醒。若還不能休息,繼續(xù)循環(huán)。

 shouldParkAfterFailedAcquire

shouldParkAfterFailedAcquire用來判斷當前結點線程是否能休息

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設計培訓,網(wǎng)站建設培訓

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {        //獲取前驅結點的wait值 
        int ws = pred.waitStatus;        if (ws == Node.SIGNAL)//若前驅結點的狀態(tài)是SIGNAL,意味著當前結點可以被安全地park
            return true;        if (ws > 0) {        // ws>0,只有CANCEL狀態(tài)ws才大于0。若前驅結點處于CANCEL狀態(tài),也就是此結點線程已經(jīng)無效,從后往前遍歷,找到一個非CANCEL狀態(tài)的結點,將自己設置為它的后繼結點
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {  
            // 若前驅結點為其他狀態(tài),將其設置為SIGNAL狀態(tài)            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }        return false;
    }

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設計培訓,網(wǎng)站建設培訓

  若shouldParkAfterFailedAcquire返回true,也就是當前結點的前驅結點為SIGNAL狀態(tài),則意味著當前結點可以放心休息,進入parking狀態(tài)了。parkAncCheckInterrupt阻塞線程并處理中斷。

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);//使用LockSupport使線程進入阻塞狀態(tài)
        return Thread.interrupted();// 線程是否被中斷過
    }

  至此,關于acquire的方法源碼已經(jīng)分析完畢,我們來簡單總結下

    a.首先tryAcquire獲取同步狀態(tài),成功則直接返回;否則,進入下一環(huán)節(jié);

    b.線程獲取同步狀態(tài)失敗,就構造一個結點,加入同步隊列中,這個過程要保證線程安全;

    c.加入隊列中的結點線程進入自旋狀態(tài),若是老二結點(即前驅結點為頭結點),才有機會嘗試去獲取同步狀態(tài);否則,當其前驅結點的狀態(tài)為SIGNAL,線程便可安心休息,進入阻塞狀態(tài),直到被中斷或者被前驅結點喚醒。

  釋放同步狀態(tài)--release()

  當前線程執(zhí)行完自己的邏輯之后,需要釋放同步狀態(tài),來看看release方法的邏輯

photoshop培訓,電腦培訓,電腦維修培訓,移動軟件開發(fā)培訓,網(wǎng)站設計培訓,網(wǎng)站建設培訓

 public final boolean release(int arg) {        if (tryRelease(arg)) {//調用使用者重寫的tryRelease方法,若成功,喚醒其后繼結點,失敗則返回false
            Node h = head;            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//喚醒后繼結點
            return true;
        }        return false;

出處: <http://www.cnblogs.com/chengxiao/>

本文版權歸作者和博客園共有,歡迎轉載,但未經(jīng)作者同意必須保留此段聲明,且在頁面明顯位置給出原文鏈接。

http://www.cnblogs.com/chengxiao/p/7141160.html