上周的面試中,被問及了幾個關(guān)于Java并發(fā)編程的問題,自己回答的都不是很系統(tǒng)和全面,可以說是“頭皮發(fā)麻”,哈哈。因此果斷購入《Java并發(fā)編程的藝術(shù)》一書,學(xué)習(xí)后的體會是要想快速上手Java并發(fā)編程,最需要掌握的是線程、線程池概念的理解和Executor框架的使用
Tip:
實踐請見github-multiThread,不會介紹Java內(nèi)存模型等更底層的內(nèi)容??纯聪聢D的“糙漢”身上錯綜復(fù)雜的線[程],愿通過學(xué)習(xí),能化繁為簡,[高效]的編出[高效]的多線程代碼。
seo優(yōu)化培訓(xùn),網(wǎng)絡(luò)推廣培訓(xùn),網(wǎng)絡(luò)營銷培訓(xùn),SEM培訓(xùn),網(wǎng)絡(luò)優(yōu)化,在線營銷培訓(xùn)

基本概念

在實踐中,為了更好的利用資源提高系統(tǒng)整體的吞吐量,會選擇并發(fā)編程。但由于上下文切換和死鎖等問題,并發(fā)編程不一定能提高性能,因此如何合理的進行并發(fā)編程時本文的重點,接下來介紹關(guān)于鎖最基本的一些知識(選學(xué))。

  • volatile:輕量,保證共享變量的可見性,使得多個線程對共享變量的變更都能及時獲取到。其包括兩個子過程,將當(dāng)前處理器緩存行的數(shù)據(jù)寫回到系統(tǒng)內(nèi)存,之后會使其他CPU里緩存了該內(nèi)存地址的數(shù)據(jù)無效。

  • synchronized:相對重量,其包含3種形式,針對普通同步方法,鎖是當(dāng)前實例對象;針對靜態(tài)同步方法,鎖是當(dāng)前類的Class對象;對于同步代碼塊,鎖是Synchonize括號內(nèi)配置的對象。此外,synchronize用的鎖存在ava對象頭中,編譯后會插入類似monitorenter, monitorexit的代碼。

  • 鎖狀態(tài):包括無鎖狀態(tài),偏向鎖狀態(tài),輕量級鎖狀態(tài),重量級鎖狀態(tài)。Tip,鎖可以升級但不能降級。

  • Java實現(xiàn)原子操作:可以通過鎖和循環(huán)CAS來實現(xiàn)原子操作,不過其也存在3個問題,包括ABA問題,通過版本號解決;循環(huán)時間長開銷大,通過pause指令減少自旋帶來的開銷;只能保證一個共享變量的原子操作,通過AtomicRefence保證引用對象間的原子性,接下來看一個最簡單的CAS操作示例。

    protected void safeCount() {    for (;;) {        int i = atomicI.get();        if (atomicI.compareAndSet(i, ++i))            break;
        }
    }

線程

這部分和之后的鎖是基礎(chǔ)部分的核心內(nèi)容,需要好好理解。一般來說,線程都是操作系統(tǒng)最小的調(diào)度單元,一個進程中可以包含多個線程,每個線程都擁有自己的計數(shù)器、堆棧和局部變量。系統(tǒng)會采用分時的形式調(diào)度運行的線程,OS會分出一個個的時間片到線程,此外還可以給線程設(shè)置優(yōu)先級,來保證優(yōu)先級高的線程獲得更多的CPU時間。通過下面的示例代碼可以發(fā)現(xiàn),java程序的運行不僅就是main線程,還有清楚Reference的線程、調(diào)用對象finalize方法的線程、分發(fā)處理發(fā)送給JVM信息的線程、Attach Listener線程等。

// 獲取管理線程的MXbeanThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true);// 打印線程信息for (ThreadInfo threadInfo : threadInfos) {    System.out.println("[" + threadInfo.getThreadId() + "]" + threadInfo.getThreadName());
}
  • 線程的狀態(tài):Java線程的整個生命周期包括6種不同狀態(tài),分別是NEW初始狀態(tài),線程被構(gòu)建但未start;RUNNABLE運行狀態(tài),Java線程將OS中的就緒和運行兩種狀態(tài)都稱作“運行中”;BLOCKED阻塞狀態(tài),表示線程阻塞于鎖;WAITING等待狀態(tài),表示線程進入等待狀態(tài),進入該狀態(tài)表示當(dāng)前線程需要等待其他線程做出特定動作(通知或中斷);TIME_WAITING超時等待狀態(tài),該狀態(tài)不同于WAITING,其會在指定的時候后返回;TERMINATED終止?fàn)顟B(tài),可以使用interrupt()合理的終止線程,表示當(dāng)前線程已經(jīng)執(zhí)行完畢,之后通過一張Java線程狀態(tài)圖來做個形象的了解。
    seo優(yōu)化培訓(xùn),網(wǎng)絡(luò)推廣培訓(xùn),網(wǎng)絡(luò)營銷培訓(xùn),SEM培訓(xùn),網(wǎng)絡(luò)優(yōu)化,在線營銷培訓(xùn)
    Daemon守護線程概念非常簡單,java的虛擬機只有在不存在Daemon線程時才會退出。

  • 線程間通信有一個的經(jīng)典范式,等待/通知機制。一個線程修改了一個對象的值,而另一個線程感知到了變化,然后進行相應(yīng)的操作,整個過程開始于一個線程,而最終執(zhí)行的是另一個線程。

    //等待方:1.獲取對象的鎖 2.如果條件不滿足,那么調(diào)用對象的wait方法,被通知后要檢查條件//3.條件滿足則執(zhí)行對應(yīng)的邏輯synchronized(lock){while(!flag){lock.wait();}
    }//通知方:1.獲取對象的鎖 2.改變條件 3.通知所有等待在對象上線程synchronized(lock){
    flag = true;lock.notifyAll();
    }

    seo優(yōu)化培訓(xùn),網(wǎng)絡(luò)推廣培訓(xùn),網(wǎng)絡(luò)營銷培訓(xùn),SEM培訓(xùn),網(wǎng)絡(luò)優(yōu)化,在線營銷培訓(xùn)
    如果線程A執(zhí)行了Thread.join(),表示當(dāng)線程A等待的線程終止之后才從thread.join()返回,其還提供了join(long millis)和join(int millis, int nanos)方法,當(dāng)給點時間內(nèi)前驅(qū)線程未結(jié)束則強制返回。
    ThreadLocal線程變量是以ThreadLocal對象為鍵,任意對象為值的存儲結(jié)構(gòu)。此外,這部分常見的應(yīng)用實例包括等待超時模式,數(shù)據(jù)庫線程池,基于線程池的簡單Web服務(wù)器等。

鎖是用來控制多個線程訪問共享資源的方式,在Lock接口出現(xiàn)前都是通過synchronized來處理線程間同步問題。鎖的主要方法包括locktryLockunlocknewCondition獲取等待通知組件等方法。其相關(guān)的實現(xiàn)包括隊列同步器AbstractQueuedSynchronizer、重入鎖ReentrantLock、讀寫鎖ReentrantReadWriteLock、LockSupport和Condition接口,這部分的重點講是可重入鎖ReenterLock

  • 重入鎖ReentrantLock表示該鎖可以支持一個線程對資源的重復(fù)加鎖,并支持獲取瑣時的公平性的選擇。默認(rèn)是非公平鎖,其特點是性能要遠高于公平鎖(嚴(yán)格按照請求時間順序獲取所,F(xiàn)IFO)。

    ReentrantLock lock = new ReentrantLock(true);lock.lock();try {    // TODO} finally {    lock.unlock();
    }
  • 讀寫鎖ReentrantReadWriteLock同時維護一個讀鎖和一個寫鎖,允許多個讀線程同時訪問共享數(shù)據(jù),只會在寫線程訪問時阻塞,和數(shù)據(jù)庫的鎖機制很類似,該方式使得并發(fā)性等到很大提升。其除了公平性選擇、可重入等特性外,還支持鎖降級,遵循獲取寫鎖、獲取讀鎖再釋放寫鎖的次序,寫鎖能降級為讀鎖。

  • LockSupport提供park阻塞,unpark喚醒的靜態(tài)方法。

  • Condition接口:任意的Java對象,都擁有一組監(jiān)視器方法,包括wait()notify()等,這些方法與synchronized關(guān)鍵字配合可以實現(xiàn)等待/通知模式,Condition接口也提供了類似的監(jiān)視器方法,但功能更加強大。

進階概念

并發(fā)容器和框架

  • ConcurrentHashMap VS HashTable:之所以決定好好學(xué)學(xué)Java并發(fā)編程,可以說就是面試時被面試官懟住這個問題。過去只知道ConcurrentHashMap是HashMap的線程安全版本,但其與HashTable的區(qū)別卻從來沒關(guān)心過。簡答來說,前者通過SegmentHashEntry進行包裝,達到了記錄級別的鎖粒度,和數(shù)據(jù)庫相關(guān)知識類似。HashTable由于只支持[表]級鎖,因此性能比較低下。ConcurrentLinkedQueue則是隊列的線程安全版本,沒有什么特別要說的。

  • BlockingQueue阻塞隊列是一種支持兩個附加操作的隊列,一個是支持阻塞插入,即當(dāng)隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿,另一個支持阻塞的移除方法,意思是隊列為空時,獲取元素的線程會等待隊列變?yōu)榉强?。其處理方式包括拋出異常、返回特殊值、一直阻塞和超時退出。Java7提供的阻塞隊列包括ArrayBlockingQueue,LinkedBlockingQueue,DelayQueue等,不是重點。

  • Fork/Join框架:Java7中提供的類似Map/Reduce的并行開發(fā)框架,F(xiàn)ork可以將任務(wù)分解為子任務(wù),而Join則負(fù)責(zé)匯總結(jié)果。其中涉及一個工作竊取work-stealing算法,可以使得線程可以從其他隊列里竊取任務(wù)來執(zhí)行,優(yōu)點是充分利用線程進行并行計算,減少了線程間的競爭;缺點是在某些情況下存在競爭,比如雙端隊列里只有一個任務(wù)時,該算法會消耗更多的系統(tǒng)資源。

并發(fā)工具類

這部分的內(nèi)容非常重要,之后介紹的一些常見模式可以很好的應(yīng)用在日常的開發(fā)場景中,一定要掌握牢靠。

  • 13個原子操作類:比較常見的有AtomicBooleanAtomicInteger,AtomicIntegerArray,AutomicReference等,接下來選擇一個比較復(fù)雜的作為示例。

    User user = new User("xionger", 30);
    atomicUserRef.set(user);
    User updateUser = new User("xiongerda", 32);
    atomicUserRef.compareAndSet(user, updateUser);
    System.out.println(atomicUserRef.get().getName());
    System.out.println(atomicUserRef.get().getOld());
  • CountDownLatch:類似一個計數(shù)器,允許一個或多個線程等待其他線程完成操作,比如主線程需要等待2個子線程完成任務(wù)后返回。常見場景,比如我們解析Excel多個Sheet的數(shù)據(jù),那么可以由每個線程處理一個,再都完成后再通知系統(tǒng)解析完成。

    static CountDownLatch latch = new CountDownLatch(3);public static void main(String[] args) throws InterruptedException {    new Thread(new Runnable() {        @Override
            public void run() {
                latch.countDown();
            }
        }).start();    new Thread(new Runnable() {        @Override
            public void run() {
                latch.countDown();
            }
        }).start();
        latch.await();
    }
  • CyclicBarrier:其讓一組線程到達一個屏障,類似跑步的起跑線,直到最后一個線程到達屏障,屏障才會開門,所有被阻塞的線程才能繼續(xù)執(zhí)行。以可用于多線程計算數(shù)據(jù),最后合并計算數(shù)據(jù)的場景,例如用一個Excel保存用戶所有銀行流水,每個Sheet保存一個賬戶近一年的流水,現(xiàn)在要統(tǒng)計日均流水,那么可以先計算每個Sheet的日均流水,最后匯總。使用上和CountDownLatch有些相似,不過其特點是可以使用reset方法重置,并通過isBroken()判斷線程是否中斷。

  • Semaphore信號量用于控制同時訪問特定資源的線程數(shù)量,常用與流量控制,比如數(shù)據(jù)庫連接的控制,有50個線程需要使用15數(shù)據(jù)庫連接。

    private static ExecutorService executorService = Executors.newFixedThreadPool(50);    private static Semaphore sema = new Semaphore(15);    public static void main(String[] args) {        for (int i = 0; i < 50; i++) {
                executorService.execute(new Runnable() {
                    @Override                public void run() {                    try {
                            sema.acquire();
                            System.out.println("save data");
                            sema.release();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
            executorService.shutdown();
        }
  • Exchanger交換者可用于線程間數(shù)據(jù)交換,它提供一個同步點,在這個同步點,兩個線程可以交換彼此的數(shù)據(jù)。Exchange可以用于遺傳算法和校對工作等場景,比如需要將紙質(zhì)流水錄入到系統(tǒng),為了避免錯位,使用AB崗兩人進行錄入,錄入到Excel后,系統(tǒng)需要加載這兩個Excel并進行校對。

    private static final Exchanger<String> exchanger = new Exchanger<>();private static ExecutorService threadPool = Executors.newFixedThreadPool(2);public static void main(String[] args) {
        threadPool.execute(new Runnable() {        @Override
            public void run() {
                String a = "銀行流水A";            try {
                    exchanger.exchange(a);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        threadPool.execute(new Runnable() {        @Override
            public void run() {
                String b = "銀行流水B";            try {
                    String a = exchanger.exchange(b);
                    System.out.println("a和b是否數(shù)據(jù)一致:" + a.equals(b) + ",a錄入的是: " + a + ",b錄入的是" + b);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

Executor框架

  • 線程池
    在介紹Executor框架前,先介紹線程池相關(guān)的原理,其是并發(fā)編程中最為重要的部分,合理的使用線程池可以降低系統(tǒng)消耗、提高響應(yīng)速度、提高線程的可管理性,接下來介紹線程池的基礎(chǔ)處理流程。
    seo優(yōu)化培訓(xùn),網(wǎng)絡(luò)推廣培訓(xùn),網(wǎng)絡(luò)營銷培訓(xùn),SEM培訓(xùn),網(wǎng)絡(luò)優(yōu)化,在線營銷培訓(xùn)
    1.如果當(dāng)前運行的線程少于corePoolSize直接創(chuàng)建新線程來執(zhí)行任務(wù),需要獲取全局鎖。
    2.如果運行的線程等于或多余corePoolSize則將任務(wù)加入BlockingQueue。
    3.如果由于隊列已滿,無法將任務(wù)加到BlockingQueue,則創(chuàng)建新的線程來處任務(wù),需要獲取全局鎖。
    4.如果創(chuàng)建新線程將操作maximumPoolSize,任務(wù)將被拒絕,并調(diào)用RejectedExecutionHandler.rejectedExecution()方法。
    ThreadPoolExecutor采用上述步驟,保證了執(zhí)行execute()時,盡可能的避免了獲取全局鎖,大部分的可能都會執(zhí)行步驟2,而無需獲取全局鎖。
    在引入Executor框架前,Java線程既是工作單元,也是執(zhí)行機制。而在Executor框架中,工作單元和執(zhí)行機制被分離開來,前者包括RunnableCallable,而執(zhí)行機制由Executor框架提供。該框架是一個兩級的調(diào)度模型,在上層,通過調(diào)度器Executor將多個任務(wù)映射到固定數(shù)量的線程;在底層,操作系統(tǒng)內(nèi)核將這些線程再映射到處理器上。而我們的應(yīng)用程序只需通過E該框架控制上層的調(diào)度即可。
    Tip:
    合理配置線程池時,需要根據(jù)具體場景給出對應(yīng)的解決方案,總體來說,推薦使用有界隊列,便于控制。
    CPU密集型:配置盡可能少的線程,如cpu數(shù)量+1,可以通過Runtime.getRuntime().availableProcessors()獲取CPU個數(shù)
    IO密集型:配置盡可能多的線程,如2*cpu數(shù)量,常見場景,等待數(shù)據(jù)庫或服務(wù)接口的返回。
    優(yōu)先級:可以通過PriorityBlockingQueue來處理
    監(jiān)控:可以通過taskCount,completedTaskCount,getActiveSize等函數(shù)來監(jiān)控線程池的運行。

  • Executor框架結(jié)構(gòu)主要由三部分組成
    a.任務(wù),包括任務(wù)實現(xiàn)的接口RunnableCallable
    b.任務(wù)的執(zhí)行,包括任務(wù)執(zhí)行機制的核心接口Executor和其子類ExecutorService,相關(guān)的實現(xiàn)類包括ThreadPoolExecutorScheduledThreadPoolExecutor
    c.異步計算的結(jié)果,包括Future和其實現(xiàn)FutureTask。
    seo優(yōu)化培訓(xùn),網(wǎng)絡(luò)推廣培訓(xùn),網(wǎng)絡(luò)營銷培訓(xùn),SEM培訓(xùn),網(wǎng)絡(luò)優(yōu)化,在線營銷培訓(xùn)

  • ThreadPoolExecutor:框架的核心類,由corePoolmaximumPoolBlockingQueueRejectedExecutionHandler4部分組成,可以由工具類Executors創(chuàng)建。具體老說,工具類可以創(chuàng)建FixedThreadPool固定線程數(shù)(最推薦)、SingleThreadExecutor、CachedThreadPool三種類型的ThreadPoolExecutor。

  • ScheduledThreadPoolExecutor:比基礎(chǔ)的Timer對象更加全面,其通過DelayQueue來執(zhí)行周期性或定時的任務(wù)。

  • FutureTask基于AbstractQueuedSynchronizer(AQS),之前介紹的ReentrantLock、CountDownLatch等其實都是基于AQS來實現(xiàn)的。AQS是一個同步框架,提供通用機制來原子性的管理同步狀態(tài)、阻塞&喚醒線程、維護被阻塞的線程隊列。每個基于AQS的實現(xiàn)都會包含兩類操作,acquire用于阻塞調(diào)用線程,對應(yīng)futureTask.get(),知道AQS狀態(tài)允許這個線程才能繼續(xù)執(zhí)行;另一個為release,對應(yīng)futureTask.cancel()&run(),該操作改變AQS狀態(tài),改變后的狀態(tài)允許一個或多個阻塞線程解除阻塞。

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
         Future<BigDecimal> result = executor.submit(new Callable<BigDecimal>() {        @Override
            public BigDecimal call() throws Exception {            return getSalaryByService();
            }
        });
        System.out.println(result.get());
    }
  • 生產(chǎn)者消費者模式:該模式可以解決大部分的并發(fā)問題,其通過阻塞隊列,平衡生產(chǎn)線程和消費線程的工作能力來提高程序整體處理數(shù)據(jù)的速度。比如經(jīng)常會郵件來分享技術(shù)知識,就可以通過通過Job到郵箱中獲取到文章并放入阻塞隊列,之后消費者去獲取數(shù)據(jù)并插入到類似confluence的文檔管理工具中,接下來展示一個單個生產(chǎn)者,多個消費者的應(yīng)用場景實現(xiàn)。
    Tip:
    線上問題定位:Linux中可以通過top命令查看進程的情況,之后可以使用交互命令1查看CPU性能,H查看每個線程的性能信息。
    性能測試:比如使用Jmeter來做壓測,可以通過netstat -nat | grep 3306 -c來查看數(shù)據(jù)的壓力情況。

參考資料

  1. 方騰飛. Java并發(fā)編程的藝術(shù)[M]. 上海:機械工業(yè)出版社, 2017.

作  者:熊二哥 
出  處:http://www.cnblogs.com/wanliwang01/ 
版權(quán)聲明:本文版權(quán)歸作者和博客園共有,歡迎轉(zhuǎn)載,但未經(jīng)作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接。 

http://www.cnblogs.com/wanliwang01/p/javacore_multiThread.html