等待通知機制

前言:本系列將從零開始講解java多線程相關(guān)的技術(shù),內(nèi)容參考于《java多線程核心技術(shù)》與《java并發(fā)編程實戰(zhàn)》等相關(guān)資料,希望站在巨人的肩膀上,再通過我的理解能讓知識更加簡單易懂。

目錄

非等待通知

public void run() {        try {            for (int i = 0; i < 10; i++) {                list.add();
                System.out.println("添加了" + (i + 1) + "個元素");
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }public void run() {        try {            while (true) {                if (list.size() == 5) {
                    System.out.println("==5了,線程b要退出了!");                    throw new InterruptedException();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
  • 兩個線程實現(xiàn)了通信,但list大小為5的時候,線程B退出了,但是線程B不停地輪詢是否為5,這個時候是很占資源的

  • 如果輪詢的時間間隔小,這個時候更加浪費資源

  • 如果輪詢的時間間隔大,那么還可能錯過了想要的數(shù)據(jù),比如可能錯過了5

  • 這里共享了list,所以實現(xiàn)了通信,但是因為不知道什么時候通信,所以不停地輪詢,這種通信有缺點,一是浪費cpu資源,二是可能讀取到錯誤的數(shù)據(jù)

什么是等待通知機制

  • 線程A要等待線程B發(fā)出通知才執(zhí)行,這個時候線程A可以執(zhí)行wait方法,等待線程B執(zhí)行notify方法喚醒線程A

等待通知機制實現(xiàn)

public void run() {        try {
            synchronized (lock) {                if (MyList.size() != 5) {
                    System.out.println("wait begin "
                            + System.currentTimeMillis());                    lock.wait();
                    System.out.println("wait end  "
                            + System.currentTimeMillis());
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }    
public void run() {        try {
            synchronized (lock) {                for (int i = 0; i < 10; i++) {
                    MyList.add();                    if (MyList.size() == 5) {                        lock.notify();
                        System.out.println("已發(fā)出通知!");
                    }
                    System.out.println("添加了" + (i + 1) + "個元素!");
                    Thread.sleep(1000);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
  • 將上面的代碼進行更改,當大小不等于5的時候,線程A處于wait狀態(tài),直到線程B發(fā)出通知,喚醒線程A,通過等待通知機制,避免了線程A不停輪詢造成的資源浪費

消息通知機制注意點

  • wait和notify必須是在同步方法和同步代碼塊里面調(diào)用,要不然會拋出異常

  • notify方法是繼承自O(shè)bject類,可以喚醒在此對象監(jiān)視器等待的線程,也就是說喚醒的是同一個鎖的線程

  • notify方法調(diào)用之后,不會馬上釋放鎖,而是運行完該同步方法或者是運行完該同步代碼塊的代碼

  • 調(diào)用notify后隨機喚醒的是一個線程

  • 調(diào)用wait方法后會將鎖釋放

  • wait狀態(tài)下中斷線程會拋出異常

  • wait(long),超過設(shè)置的時間后會自動喚醒,還沒超過該時間也可以通過其他線程喚醒

  • notifyAll可以喚醒同一鎖的所有線程

  • 如果線程還沒有處于等待狀態(tài),其他線程進行喚醒,那么不會起作用,此時會打亂程序的正常邏輯

案例:生產(chǎn)者消費者模式

一個生產(chǎn)者,一個消費者

public void setValue() {        try {
            synchronized (lock) {                if (!ValueObject.value.equals("")) {                    lock.wait();
                }
                String value = System.currentTimeMillis() + "_"
                        + System.nanoTime();
                System.out.println("set"+ value);
                ValueObject.value = value;                lock.notify();
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }public void getValue() {        try {
            synchronized (lock) {                if (ValueObject.value.equals("")) {                    lock.wait();
                }
                System.out.println("get"+ ValueObject.value);
                ValueObject.value = "";                lock.notify();
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }public void run() {        while (true) {
            r.getValue();
        }
    }public void run() {        while (true) {
            p.setValue();
        }
    }
  • 如果我們創(chuàng)建一個生產(chǎn)線程,一個消費線程,那么這個時候會交替運行

多個生產(chǎn)者,多個消費者

public void getValue() {        try {
            synchronized (lock) {                while (ValueObject.value.equals("")) {
                    System.out.println("消費者 "
                            + Thread.currentThread().getName() + " WAITING了☆");                    lock.wait();
                }
                System.out.println("消費者 " + Thread.currentThread().getName()
                        + " RUNNABLE了");
                ValueObject.value = "";                lock.notify();
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }public void run() {        while (true) {
            r.getValue();
        }
    }public void setValue() {        try {
            synchronized (lock) {                while (!ValueObject.value.equals("")) {
                    System.out.println("生產(chǎn)者 "
                            + Thread.currentThread().getName() + " WAITING了★");                    lock.wait();
                }
                System.out.println("生產(chǎn)者 " + Thread.currentThread().getName()
                        + " RUNNABLE了");
                String value = System.currentTimeMillis() + "_"
                        + System.nanoTime();
                ValueObject.value = value;                lock.notify();
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }public void run() {        while (true) {
            p.setValue();
        }
    }
  • 如果這個時候創(chuàng)建多個生產(chǎn)者,多個消費者,如果連續(xù)喚醒的是同類線程,那么會出現(xiàn)假死狀態(tài),就是線程都處于waiting狀態(tài),因為notify隨機喚醒一個線程,如果喚醒的同類的,那么就浪費了一次喚醒,如果這個時候無法再喚醒異類線程,那么就會假死。這種情況把notify改成notifyAll()就行了。

消息通知機制需要注意的地方

  • 是否線程喚醒的是同類線程會造成影響

  • 生產(chǎn)者消費模式,判斷條件if和while應(yīng)該使用哪一個

通過管道進行線程間通信

public class ThreadWrite extends Thread {    private WriteData write;    private PipedOutputStream out;

    public ThreadWrite(WriteData write, PipedOutputStream out) {        super();        this.write = write;        this.out = out;
    }    @Override
    public void run() {
        write.writeMethod(out);
    }

}
public class ThreadRead extends Thread {    private ReadData read;    private PipedInputStream input;

    public ThreadRead(ReadData read, PipedInputStream input) {        super();        this.read = read;        this.input = input;
    }    @Override
    public void run() {
        read.readMethod(input);
    }
}
public class Run {

    public static void main(String[] args) {        try {            WriteData writeData = new WriteData();            ReadData readData = new ReadData();            PipedInputStream inputStream = new PipedInputStream();            PipedOutputStream outputStream = new PipedOutputStream();            // inputStream.connect(outputStream);
            outputStream.connect(inputStream);//關(guān)鍵

            ThreadRead threadRead = new ThreadRead(readData, inputStream);
            threadRead.start();            Thread.sleep(2000);            ThreadWrite threadWrite = new ThreadWrite(writeData, outputStream);
            threadWrite.start();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}
  • PipedInputStream和PiepedOutputStream(對應(yīng)字符流PipedReader和PipedOutputWriter)這幾個類可以實現(xiàn)線程間流的通信,將管道輸出流和輸出流連接,實現(xiàn)一個線程往管道發(fā)送數(shù)據(jù),一個線程從管道讀取數(shù)據(jù)

join方法

public static void main(String[] args) {        try {
            MyThread threadTest = new MyThread();
            threadTest.start();
            threadTest.join();

            System.out.println("threadTest對象執(zhí)行完,我再執(zhí)行");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
  • 當前線程阻塞(main線程),調(diào)用線程(threadTest)正常執(zhí)行,執(zhí)行完后當前線程(main)繼續(xù)執(zhí)行

public class ThreadB extends Thread {    @Override
    public void run() {        try {            ThreadA a = new ThreadA();
            a.start();
            a.join();            System.out.println("線程B在run end處打印了");
        } catch (InterruptedException e) {            System.out.println("線程B在catch處打印了");
            e.printStackTrace();
        }
    }

}
  • 如果線程B執(zhí)行完了join方法,此時線程B被中斷,那么這個時候拋出異常,但是線程A正常運行

join(long)和sleep(long)的區(qū)別

public final synchronized void join(long millis)
    throws InterruptedException {        long base = System.currentTimeMillis();        long now = 0;        if (millis < 0) {            throw new IllegalArgumentException("timeout value is negative");
        }        if (millis == 0) {            while (isAlive()) {
                wait(0);
            }
        } else {            while (isAlive()) {                long delay = millis - now;                if (delay <= 0) {                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }
  • 從join方法的源代碼可以發(fā)現(xiàn),他的核心方法是wait,在前面已經(jīng)提到wait方法會釋放鎖,說明join方法也會釋放鎖,但是sleep是不會釋放鎖的。

  • join方法是非靜態(tài)的,而sleep是靜態(tài)的

ThreadLocal

  • 解決變量在各個線程的隔離性,每個線程綁定自己的值

public void run() {        try {            for (int i = 0; i < 100; i++) {                if (Tools.tl.get() == null) {

http://www.cnblogs.com/-new/p/7217844.html