前言:本系列將從零開(kāi)始講解java多線程相關(guān)的技術(shù),內(nèi)容參考于《java多線程核心技術(shù)》與《java并發(fā)編程實(shí)戰(zhàn)》等相關(guān)資料,希望站在巨人的肩膀上,再通過(guò)我的理解能讓知識(shí)更加簡(jiǎn)單易懂。

目錄

非等待通知

public void run() {        try {            for (int i = 0; i < 10; i++) {                list.add();
                System.out.println("添加了" + (i + 1) + "個(gè)元素");
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }public void run() {        try {            while (true) {                if (list.size() == 5) {
                    System.out.println("==5了,線程b要退出了!");                    throw new InterruptedException();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
  • 兩個(gè)線程實(shí)現(xiàn)了通信,但list大小為5的時(shí)候,線程B退出了,但是線程B不停地輪詢是否為5,這個(gè)時(shí)候是很占資源的

  • 如果輪詢的時(shí)間間隔小,這個(gè)時(shí)候更加浪費(fèi)資源

  • 如果輪詢的時(shí)間間隔大,那么還可能錯(cuò)過(guò)了想要的數(shù)據(jù),比如可能錯(cuò)過(guò)了5

  • 這里共享了list,所以實(shí)現(xiàn)了通信,但是因?yàn)椴恢朗裁磿r(shí)候通信,所以不停地輪詢,這種通信有缺點(diǎn),一是浪費(fèi)cpu資源,二是可能讀取到錯(cuò)誤的數(shù)據(jù)

什么是等待通知機(jī)制

  • 線程A要等待線程B發(fā)出通知才執(zhí)行,這個(gè)時(shí)候線程A可以執(zhí)行wait方法,等待線程B執(zhí)行notify方法喚醒線程A

等待通知機(jī)制實(shí)現(xiàn)

public void run() {        try {
            synchronized (lock) {                if (MyList.size() != 5) {
                    System.out.println("wait begin "
                            + System.currentTimeMillis());                    lock.wait();
                    System.out.println("wait end  "
                            + System.currentTimeMillis());
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }    
public void run() {        try {
            synchronized (lock) {                for (int i = 0; i < 10; i++) {
                    MyList.add();                    if (MyList.size() == 5) {                        lock.notify();
                        System.out.println("已發(fā)出通知!");
                    }
                    System.out.println("添加了" + (i + 1) + "個(gè)元素!");
                    Thread.sleep(1000);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
  • 將上面的代碼進(jìn)行更改,當(dāng)大小不等于5的時(shí)候,線程A處于wait狀態(tài),直到線程B發(fā)出通知,喚醒線程A,通過(guò)等待通知機(jī)制,避免了線程A不停輪詢?cè)斐傻馁Y源浪費(fèi)

消息通知機(jī)制注意點(diǎn)

  • wait和notify必須是在同步方法和同步代碼塊里面調(diào)用,要不然會(huì)拋出異常

  • notify方法是繼承自O(shè)bject類,可以喚醒在此對(duì)象監(jiān)視器等待的線程,也就是說(shuō)喚醒的是同一個(gè)鎖的線程

  • notify方法調(diào)用之后,不會(huì)馬上釋放鎖,而是運(yùn)行完該同步方法或者是運(yùn)行完該同步代碼塊的代碼

  • 調(diào)用notify后隨機(jī)喚醒的是一個(gè)線程

  • 調(diào)用wait方法后會(huì)將鎖釋放

  • wait狀態(tài)下中斷線程會(huì)拋出異常

  • wait(long),超過(guò)設(shè)置的時(shí)間后會(huì)自動(dòng)喚醒,還沒(méi)超過(guò)該時(shí)間也可以通過(guò)其他線程喚醒

  • notifyAll可以喚醒同一鎖的所有線程

  • 如果線程還沒(méi)有處于等待狀態(tài),其他線程進(jìn)行喚醒,那么不會(huì)起作用,此時(shí)會(huì)打亂程序的正常邏輯

案例:生產(chǎn)者消費(fèi)者模式

一個(gè)生產(chǎn)者,一個(gè)消費(fèi)者

public void setValue() {        try {
            synchronized (lock) {                if (!ValueObject.value.equals("")) {                    lock.wait();
                }
                String value = System.currentTimeMillis() + "_"
                        + System.nanoTime();
                System.out.println("set"+ value);
                ValueObject.value = value;                lock.notify();
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }public void getValue() {        try {
            synchronized (lock) {                if (ValueObject.value.equals("")) {                    lock.wait();
                }
                System.out.println("get"+ ValueObject.value);
                ValueObject.value = "";                lock.notify();
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }public void run() {        while (true) {
            r.getValue();
        }
    }public void run() {        while (true) {
            p.setValue();
        }
    }
  • 如果我們創(chuàng)建一個(gè)生產(chǎn)線程,一個(gè)消費(fèi)線程,那么這個(gè)時(shí)候會(huì)交替運(yùn)行

多個(gè)生產(chǎn)者,多個(gè)消費(fèi)者

public void getValue() {        try {
            synchronized (lock) {                while (ValueObject.value.equals("")) {
                    System.out.println("消費(fèi)者 "
                            + Thread.currentThread().getName() + " WAITING了☆");                    lock.wait();
                }
                System.out.println("消費(fèi)者 " + Thread.currentThread().getName()
                        + " RUNNABLE了");
                ValueObject.value = "";                lock.notify();
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }public void run() {        while (true) {
            r.getValue();
        }
    }public void setValue() {        try {
            synchronized (lock) {                while (!ValueObject.value.equals("")) {
                    System.out.println("生產(chǎn)者 "
                            + Thread.currentThread().getName() + " WAITING了★");                    lock.wait();
                }
                System.out.println("生產(chǎn)者 " + Thread.currentThread().getName()
                        + " RUNNABLE了");
                String value = System.currentTimeMillis() + "_"
                        + System.nanoTime();
                ValueObject.value = value;                lock.notify();
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }public void run() {        while (true) {
            p.setValue();
        }
    }
  • 如果這個(gè)時(shí)候創(chuàng)建多個(gè)生產(chǎn)者,多個(gè)消費(fèi)者,如果連續(xù)喚醒的是同類線程,那么會(huì)出現(xiàn)假死狀態(tài),就是線程都處于waiting狀態(tài),因?yàn)閚otify隨機(jī)喚醒一個(gè)線程,如果喚醒的同類的,那么就浪費(fèi)了一次喚醒,如果這個(gè)時(shí)候無(wú)法再喚醒異類線程,那么就會(huì)假死。這種情況把notify改成notifyAll()就行了。

消息通知機(jī)制需要注意的地方

  • 是否線程喚醒的是同類線程會(huì)造成影響

  • 生產(chǎn)者消費(fèi)模式,判斷條件if和while應(yīng)該使用哪一個(gè)

通過(guò)管道進(jìn)行線程間通信

public class ThreadWrite extends Thread {    private WriteData write;    private PipedOutputStream out;

    public ThreadWrite(WriteData write, PipedOutputStream out) {        super();        this.write = write;        this.out = out;
    }    @Override
    public void run() {
        write.writeMethod(out);
    }

}
public class ThreadRead extends Thread {    private ReadData read;    private PipedInputStream input;

    public ThreadRead(ReadData read, PipedInputStream input) {        super();        this.read = read;        this.input = input;
    }    @Override
    public void run() {
        read.readMethod(input);
    }
}
public class Run {

    public static void main(String[] args) {        try {            WriteData writeData = new WriteData();            ReadData readData = new ReadData();            PipedInputStream inputStream = new PipedInputStream();            PipedOutputStream outputStream = new PipedOutputStream();            // inputStream.connect(outputStream);
            outputStream.connect(inputStream);//關(guān)鍵

            ThreadRead threadRead = new ThreadRead(readData, inputStream);
            threadRead.start();            Thread.sleep(2000);            ThreadWrite threadWrite = new ThreadWrite(writeData, outputStream);
            threadWrite.start();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}
  • PipedInputStream和PiepedOutputStream(對(duì)應(yīng)字符流PipedReader和PipedOutputWriter)這幾個(gè)類可以實(shí)現(xiàn)線程間流的通信,將管道輸出流和輸出流連接,實(shí)現(xiàn)一個(gè)線程往管道發(fā)送數(shù)據(jù),一個(gè)線程從管道讀取數(shù)據(jù)

join方法

public static void main(String[] args) {        try {
            MyThread threadTest = new MyThread();
            threadTest.start();
            threadTest.join();

            System.out.println("threadTest對(duì)象執(zhí)行完,我再執(zhí)行");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
  • 當(dāng)前線程阻塞(main線程),調(diào)用線程(threadTest)正常執(zhí)行,執(zhí)行完后當(dāng)前線程(main)繼續(xù)執(zhí)行

public class ThreadB extends Thread {    @Override
    public void run() {        try {            ThreadA a = new ThreadA();
            a.start();
            a.join();            System.out.println("線程B在run end處打印了");
        } catch (InterruptedException e) {            System.out.println("線程B在catch處打印了");
            e.printStackTrace();
        }
    }

}
  • 如果線程B執(zhí)行完了join方法,此時(shí)線程B被中斷,那么這個(gè)時(shí)候拋出異常,但是線程A正常運(yùn)行

join(long)和sleep(long)的區(qū)別

public final synchronized void join(long millis)
    throws InterruptedException {        long base = System.currentTimeMillis();        long now = 0;        if (millis < 0) {            throw new IllegalArgumentException("timeout value is negative");
        }        if (millis == 0) {            while (isAlive()) {
                wait(0);
            }
        } else {            while (isAlive()) {                long delay = millis - now;                if (delay <= 0) {                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }
  • 從join方法的源代碼可以發(fā)現(xiàn),他的核心方法是wait,在前面已經(jīng)提到wait方法會(huì)釋放鎖,說(shuō)明join方法也會(huì)釋放鎖,但是sleep是不會(huì)釋放鎖的。

  • join方法是非靜態(tài)的,而sleep是靜態(tài)的

ThreadLocal

  • 解決變量在各個(gè)線程的隔離性,每個(gè)線程綁定自己的值

public void run() {        try {            for (int i = 0; i < 100; i++) {                if (Tools.tl.get() == null) {
                    Tools.tl.set("ThreadA" + (i + 1));
                } else {
                    System.out.println("ThreadA get Value=" + Tools.tl.get());
                }
                Thread.sleep(200);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }public void run() {        try {            for (int i = 0; i < 100; i++) {                if (Tools.tl.get() == null) {
                    Tools.tl.set("ThreadB" + (i + 1));
                } else {
                    System.out.println("ThreadB get Value=" + Tools.tl.get());
                }
                Thread.sleep(200);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }public class Tools {    public static ThreadLocal tl = new ThreadLocal();

}
  • 每個(gè)線程都設(shè)置了值,但是得到的值卻是自己的,互相隔離

  • 如果不開(kāi)始不設(shè)置值,那么得到的值都是null,可以通過(guò)繼承ThreadLocal,重載initalValue方法,設(shè)置初始值

    public class ThreadLocalExt extends ThreadLocal {@Overrideprotected Object initialValue() {    return new Date().getTime();
    }
    }
  • InheritableThreadLocal,子線程可以繼承父線程的值

    public class InheritableThreadLocalExt extends InheritableThreadLocal {@Overrideprotected Object initialValue() {    return new Date().getTime();
    }
    }
    public static void main(String[] args) {    try {        for (int i = 0; i < 10; i++) {            System.out.println("       在Main線程中取值=" + Tools.tl.get());
  • http://www.cnblogs.com/-new/p/7217844.html