等待通知機制
前言:本系列將從零開始講解java多線程相關(guān)的技術(shù),內(nèi)容參考于《java多線程核心技術(shù)》與《java并發(fā)編程實戰(zhàn)》等相關(guān)資料,希望站在巨人的肩膀上,再通過我的理解能讓知識更加簡單易懂。
目錄
非等待通知
public void run() { try { for (int i = 0; i < 10; i++) { list.add(); System.out.println("添加了" + (i + 1) + "個元素"); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }public void run() { try { while (true) { if (list.size() == 5) { System.out.println("==5了,線程b要退出了!"); throw new InterruptedException(); } } } catch (InterruptedException e) { e.printStackTrace(); } }
兩個線程實現(xiàn)了通信,但list大小為5的時候,線程B退出了,但是線程B不停地輪詢是否為5,這個時候是很占資源的
如果輪詢的時間間隔小,這個時候更加浪費資源
如果輪詢的時間間隔大,那么還可能錯過了想要的數(shù)據(jù),比如可能錯過了5
這里共享了list,所以實現(xiàn)了通信,但是因為不知道什么時候通信,所以不停地輪詢,這種通信有缺點,一是浪費cpu資源,二是可能讀取到錯誤的數(shù)據(jù)
什么是等待通知機制
線程A要等待線程B發(fā)出通知才執(zhí)行,這個時候線程A可以執(zhí)行wait方法,等待線程B執(zhí)行notify方法喚醒線程A
等待通知機制實現(xiàn)
public void run() { try { synchronized (lock) { if (MyList.size() != 5) { System.out.println("wait begin " + System.currentTimeMillis()); lock.wait(); System.out.println("wait end " + System.currentTimeMillis()); } } } catch (InterruptedException e) { e.printStackTrace(); } } public void run() { try { synchronized (lock) { for (int i = 0; i < 10; i++) { MyList.add(); if (MyList.size() == 5) { lock.notify(); System.out.println("已發(fā)出通知!"); } System.out.println("添加了" + (i + 1) + "個元素!"); Thread.sleep(1000); } } } catch (InterruptedException e) { e.printStackTrace(); } }
將上面的代碼進行更改,當大小不等于5的時候,線程A處于wait狀態(tài),直到線程B發(fā)出通知,喚醒線程A,通過等待通知機制,避免了線程A不停輪詢造成的資源浪費
消息通知機制注意點
wait和notify必須是在同步方法和同步代碼塊里面調(diào)用,要不然會拋出異常
notify方法是繼承自O(shè)bject類,可以喚醒在此對象監(jiān)視器等待的線程,也就是說喚醒的是同一個鎖的線程
notify方法調(diào)用之后,不會馬上釋放鎖,而是運行完該同步方法或者是運行完該同步代碼塊的代碼
調(diào)用notify后隨機喚醒的是一個線程
調(diào)用wait方法后會將鎖釋放
wait狀態(tài)下中斷線程會拋出異常
wait(long),超過設(shè)置的時間后會自動喚醒,還沒超過該時間也可以通過其他線程喚醒
notifyAll可以喚醒同一鎖的所有線程
如果線程還沒有處于等待狀態(tài),其他線程進行喚醒,那么不會起作用,此時會打亂程序的正常邏輯
案例:生產(chǎn)者消費者模式
一個生產(chǎn)者,一個消費者
public void setValue() { try { synchronized (lock) { if (!ValueObject.value.equals("")) { lock.wait(); } String value = System.currentTimeMillis() + "_" + System.nanoTime(); System.out.println("set"+ value); ValueObject.value = value; lock.notify(); } } catch (InterruptedException e) { e.printStackTrace(); } }public void getValue() { try { synchronized (lock) { if (ValueObject.value.equals("")) { lock.wait(); } System.out.println("get"+ ValueObject.value); ValueObject.value = ""; lock.notify(); } } catch (InterruptedException e) { e.printStackTrace(); } }public void run() { while (true) { r.getValue(); } }public void run() { while (true) { p.setValue(); } }
如果我們創(chuàng)建一個生產(chǎn)線程,一個消費線程,那么這個時候會交替運行
多個生產(chǎn)者,多個消費者
public void getValue() { try { synchronized (lock) { while (ValueObject.value.equals("")) { System.out.println("消費者 " + Thread.currentThread().getName() + " WAITING了☆"); lock.wait(); } System.out.println("消費者 " + Thread.currentThread().getName() + " RUNNABLE了"); ValueObject.value = ""; lock.notify(); } } catch (InterruptedException e) { e.printStackTrace(); } }public void run() { while (true) { r.getValue(); } }public void setValue() { try { synchronized (lock) { while (!ValueObject.value.equals("")) { System.out.println("生產(chǎn)者 " + Thread.currentThread().getName() + " WAITING了★"); lock.wait(); } System.out.println("生產(chǎn)者 " + Thread.currentThread().getName() + " RUNNABLE了"); String value = System.currentTimeMillis() + "_" + System.nanoTime(); ValueObject.value = value; lock.notify(); } } catch (InterruptedException e) { e.printStackTrace(); } }public void run() { while (true) { p.setValue(); } }
如果這個時候創(chuàng)建多個生產(chǎn)者,多個消費者,如果連續(xù)喚醒的是同類線程,那么會出現(xiàn)假死狀態(tài),就是線程都處于waiting狀態(tài),因為notify隨機喚醒一個線程,如果喚醒的同類的,那么就浪費了一次喚醒,如果這個時候無法再喚醒異類線程,那么就會假死。這種情況把notify改成notifyAll()就行了。
消息通知機制需要注意的地方
是否線程喚醒的是同類線程會造成影響
生產(chǎn)者消費模式,判斷條件if和while應(yīng)該使用哪一個
通過管道進行線程間通信
public class ThreadWrite extends Thread { private WriteData write; private PipedOutputStream out; public ThreadWrite(WriteData write, PipedOutputStream out) { super(); this.write = write; this.out = out; } @Override public void run() { write.writeMethod(out); } } public class ThreadRead extends Thread { private ReadData read; private PipedInputStream input; public ThreadRead(ReadData read, PipedInputStream input) { super(); this.read = read; this.input = input; } @Override public void run() { read.readMethod(input); } } public class Run { public static void main(String[] args) { try { WriteData writeData = new WriteData(); ReadData readData = new ReadData(); PipedInputStream inputStream = new PipedInputStream(); PipedOutputStream outputStream = new PipedOutputStream(); // inputStream.connect(outputStream); outputStream.connect(inputStream);//關(guān)鍵 ThreadRead threadRead = new ThreadRead(readData, inputStream); threadRead.start(); Thread.sleep(2000); ThreadWrite threadWrite = new ThreadWrite(writeData, outputStream); threadWrite.start(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
PipedInputStream和PiepedOutputStream(對應(yīng)字符流PipedReader和PipedOutputWriter)這幾個類可以實現(xiàn)線程間流的通信,將管道輸出流和輸出流連接,實現(xiàn)一個線程往管道發(fā)送數(shù)據(jù),一個線程從管道讀取數(shù)據(jù)
join方法
public static void main(String[] args) { try { MyThread threadTest = new MyThread(); threadTest.start(); threadTest.join(); System.out.println("threadTest對象執(zhí)行完,我再執(zhí)行"); } catch (InterruptedException e) { e.printStackTrace(); } }
當前線程阻塞(main線程),調(diào)用線程(threadTest)正常執(zhí)行,執(zhí)行完后當前線程(main)繼續(xù)執(zhí)行
public class ThreadB extends Thread { @Override public void run() { try { ThreadA a = new ThreadA(); a.start(); a.join(); System.out.println("線程B在run end處打印了"); } catch (InterruptedException e) { System.out.println("線程B在catch處打印了"); e.printStackTrace(); } } }
如果線程B執(zhí)行完了join方法,此時線程B被中斷,那么這個時候拋出異常,但是線程A正常運行
join(long)和sleep(long)的區(qū)別
public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } if (millis == 0) { while (isAlive()) { wait(0); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
從join方法的源代碼可以發(fā)現(xiàn),他的核心方法是wait,在前面已經(jīng)提到wait方法會釋放鎖,說明join方法也會釋放鎖,但是sleep是不會釋放鎖的。
join方法是非靜態(tài)的,而sleep是靜態(tài)的
ThreadLocal
解決變量在各個線程的隔離性,每個線程綁定自己的值
public void run() { try { for (int i = 0; i < 100; i++) { if (Tools.tl.get() == null) {
http://www.cnblogs.com/-new/p/7217844.html