一、進(jìn)程與線程
1.進(jìn)程
我們電腦的應(yīng)用程序,都是進(jìn)程,假設(shè)我們用的電腦是單核的,cpu同時(shí)只能執(zhí)行一個(gè)進(jìn)程。當(dāng)程序出于I/O阻塞的時(shí)候,CPU如果和程序一起等待,那就太浪費(fèi)了,cpu會(huì)去執(zhí)行其他的程序,此時(shí)就涉及到切換,切換前要保存上一個(gè)程序運(yùn)行的狀態(tài),才能恢復(fù),所以就需要有個(gè)東西來記錄這個(gè)東西,就可以引出進(jìn)程的概念了。
進(jìn)程就是一個(gè)程序在一個(gè)數(shù)據(jù)集上的一次動(dòng)態(tài)執(zhí)行過程。進(jìn)程由程序,數(shù)據(jù)集,進(jìn)程控制塊三部分組成。程序用來描述進(jìn)程哪些功能以及如何完成;數(shù)據(jù)集是程序執(zhí)行過程中所使用的資源;進(jìn)程控制塊用來保存程序運(yùn)行的狀態(tài)
2.線程
一個(gè)進(jìn)程中可以開多個(gè)線程,為什么要有進(jìn)程,而不做成線程呢?因?yàn)橐粋€(gè)程序中,線程共享一套數(shù)據(jù),如果都做成進(jìn)程,每個(gè)進(jìn)程獨(dú)占一塊內(nèi)存,那這套數(shù)據(jù)就要復(fù)制好幾份給每個(gè)程序,不合理,所以有了線程。
線程又叫輕量級(jí)進(jìn)程,是一個(gè)基本的cpu執(zhí)行單元,也是程序執(zhí)行過程中的最小單元。一個(gè)進(jìn)程最少也會(huì)有一個(gè)主線程,在主線程中通過threading模塊,在開子線程
3.進(jìn)程線程的關(guān)系
(1)一個(gè)線程只能屬于一個(gè)進(jìn)程,而一個(gè)進(jìn)程可以有多個(gè)線程,但至少有一個(gè)線程
(2)資源分配給進(jìn)程,進(jìn)程是程序的主體,同一進(jìn)程的所有線程共享該進(jìn)程的所有資源
(3)cpu分配給線程,即真正在cpu上運(yùn)行的是線程
(4)線程是最小的執(zhí)行單元,進(jìn)程是最小的資源管理單元
4.并行和并發(fā)
并行處理是指計(jì)算機(jī)系統(tǒng)中能同時(shí)執(zhí)行兩個(gè)或多個(gè)任務(wù)的計(jì)算方法,并行處理可同時(shí)工作于同一程序的不同方面
并發(fā)處理是同一時(shí)間段內(nèi)有幾個(gè)程序都在一個(gè)cpu中處于運(yùn)行狀態(tài),但任一時(shí)刻只有一個(gè)程序在cpu上運(yùn)行。
并發(fā)的重點(diǎn)在于有處理多個(gè)任務(wù)的能力,不一定要同時(shí);而并行的重點(diǎn)在于就是有同時(shí)處理多個(gè)任務(wù)的能力。并行是并發(fā)的子集
以上所說的是相對(duì)于所有語言來說的,Python的特殊之處在于Python有一把GIL鎖,這把鎖限制了同一時(shí)間內(nèi)一個(gè)進(jìn)程只能有一個(gè)線程能使用cpu
二、threading模塊
這個(gè)模塊的功能就是創(chuàng)建新的線程,有兩種創(chuàng)建線程的方法:
1.直接創(chuàng)建
import threadingimport timedef foo(n): print('>>>>>>>>>>>>>>>%s'%n) time.sleep(3) print('tread 1') t1=threading.Thread(target=foo,args=(2,))#arg后面一定是元組,t1就是創(chuàng)建的子線程對(duì)象t1.start()#把子進(jìn)程運(yùn)行起來print('ending')
上面的代碼就是在主線程中創(chuàng)建了一個(gè)子線程
運(yùn)行結(jié)果是:先打印>>>>>>>>>>>>>2,在打印ending,然后等待3秒后打印thread 1
2.另一種方式是通過繼承類創(chuàng)建線程對(duì)象
import threadingimport timeclass MyThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): print('ok') time.sleep(2) print('end') t1=MyThread()#創(chuàng)建線程對(duì)象t1.start()#激活線程對(duì)象print('end again')
3.join()方法
這個(gè)方法的作用是:在子線程完成運(yùn)行之前,這個(gè)子線程的父線程將一直等待子線程運(yùn)行完再運(yùn)行
import threadingimport timedef foo(n): print('>>>>>>>>>>>>>>>%s'%n) time.sleep(n) print('tread 1')def bar(n): print('>>>>>>>>>>>>>>>>%s'%n) time.sleep(n) print('thread 2') s=time.time() t1=threading.Thread(target=foo,args=(2,)) t1.start()#把子進(jìn)程運(yùn)行起來t2=threading.Thread(target=bar,args=(5,)) t2.start() t1.join() #只是會(huì)阻擋主線程運(yùn)行,跟t2沒關(guān)系t2.join()print(time.time()-s)print('ending')'''運(yùn)行結(jié)果: >>>>>>>>>>>>>>>2 >>>>>>>>>>>>>>>>5 tread 1 thread 2 5.001286268234253 ending'''
4.setDaemon()方法
這個(gè)方法的作用是把線程聲明為守護(hù)線程,必須在start()方法調(diào)用之前設(shè)置。
默認(rèn)情況下,主線程運(yùn)行完會(huì)檢查子線程是否完成,如果未完成,那么主線程會(huì)等待子線程完成后再退出。但是如果主線程完成后不用管子線程是否運(yùn)行完都退出,就要設(shè)置setDaemon(True)
import threadingimport timeclass MyThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): print('ok') time.sleep(2) print('end') t1=MyThread()#創(chuàng)建線程對(duì)象t1.setDaemon(True) t1.start()#激活線程對(duì)象print('end again')#運(yùn)行結(jié)果是馬上打印ok和 end again #然后程序終止,不會(huì)打印end
主線程默認(rèn)是非守護(hù)線程,子線程都是繼承的主線程,所以默認(rèn)也都是非守護(hù)線程
5.其他方法
isAlive(): 返回線程是否處于活動(dòng)中
getName(): 返回線程名
setName(): 設(shè)置線程名
threading.currentThread():返回當(dāng)前的線程變量
threading.enumerate():返回一個(gè)包含正在運(yùn)行的線程的列表
threading.activeCount():返回正在運(yùn)行的線程數(shù)量
三、各種鎖
1.同步鎖(用戶鎖,互斥鎖)
先來看一個(gè)例子:
需求是有一個(gè)全局變量的值是100,我們開100個(gè)線程,每個(gè)線程執(zhí)行的操作是對(duì)這個(gè)全局變量減一,最后import threading
import threadingimport timedef sub(): global num temp=num num=temp-1 time.sleep(2) num=100l=[]for i in range(100): t=threading.Thread(target=sub,args=()) t.start() l.append(t)for i in l: i.join()print(num)
好像一切正常,現(xiàn)在我們改動(dòng)一下,在sub函數(shù)的temp=num,和num=temp-1 中間,加一個(gè)time.sleep(0.1),會(huì)發(fā)現(xiàn)出問題了,結(jié)果變成兩秒后打印99了,改成time.sleep(0.0001)呢,結(jié)果不確定了,但都是90幾,這是怎么回事呢?
這就要說到Python里的那把GIL鎖了,我們來捋一捋:
首次定義一個(gè)全局變量num=100,然后開辟了100個(gè)子線程,但是Python的那把GIL鎖限制了同一時(shí)刻只能有一個(gè)線程使用cpu,所以這100個(gè)線程是處于搶這把鎖的狀態(tài),誰搶到了,誰就可以運(yùn)行自己的代碼。在最開始的情況下,每個(gè)線程搶到cpu,馬上執(zhí)行了對(duì)全局變量減一的操作,所以不會(huì)出現(xiàn)問題。但是我們改動(dòng)后,在全局變量減一之前,讓他睡了0.1秒,程序睡著了,cpu可不能一直等著這個(gè)線程,當(dāng)這個(gè)線程處于I/O阻塞的時(shí)候,其他線程就又可以搶cpu了,所以其他線程搶到了,開始執(zhí)行代碼,要知道0.1秒對(duì)于cpu的運(yùn)行來說已經(jīng)很長(zhǎng)時(shí)間了,這段時(shí)間足夠讓第一個(gè)線程還沒睡醒的時(shí)候,其他線程都搶到過cpu一次了。他們拿到的num都是100,等他們醒來后,執(zhí)行的操作都是100-1,所以最后結(jié)果是99.同樣的道理,如果睡的時(shí)間短一點(diǎn),變成0.001,可能情況就是當(dāng)?shù)?1個(gè)線程第一次搶到cpu的時(shí)候,第一個(gè)線程已經(jīng)睡醒了,并修改了全局變量。所以這第91個(gè)線程拿到的全局變量就是99,然后第二個(gè)第三個(gè)線程陸續(xù)醒過來,分別修改了全局變量,所以最后結(jié)果就是一個(gè)不可知的數(shù)了。一張圖看懂這個(gè)過程
這就是線程安全問題,只要涉及到線程,都會(huì)有這個(gè)問題。解決辦法就是加鎖
我們?cè)谌旨右话焰i,用鎖把涉及到數(shù)據(jù)運(yùn)算的操作鎖起來,就把這段代碼變成串行的了,上代碼:
import threadingimport timedef sub(): global num lock.acquire()#獲取鎖 temp=num time.sleep(0.001) num=temp-1 lock.release()#釋放鎖 time.sleep(2) num=100l=[] lock=threading.Lock()for i in range(100): t=threading.Thread(target=sub,args=()) t.start() l.append(t)for i in l: i.join()print(num)
獲取這把鎖之后,必須釋放掉才能再次被獲取。這把鎖就叫用戶鎖
2.死鎖與遞歸鎖
死鎖就是兩個(gè)及以上進(jìn)程或線程在執(zhí)行過程中,因相互制約造成的一種互相等待的現(xiàn)象,若無外力作用,他們將永遠(yuǎn)卡在那里。舉個(gè)例子:
死鎖示例
上面這個(gè)例子中,線程2在等待線程1釋放B鎖,線程1在等待線程2釋放A鎖,互相制約
我們?cè)谟没コ怄i的時(shí)候,一旦用的鎖多了,很容易就出現(xiàn)這種問題
在Python中,為了解決這個(gè)問題,Python提供了一個(gè)叫可重用鎖(RLock)的概念,這個(gè)鎖內(nèi)部維護(hù)著一個(gè)lock和一個(gè)counter變量,counter記錄了acquire的次數(shù),每次acquire,counter就加1,每次release,counter就減1,只有counter的值為0的時(shí)候,其他線程才能獲得資源,下面用RLock替換Lock,在運(yùn)行就不會(huì)卡住了:
遞歸鎖示例
這把鎖又叫遞歸鎖
3.Semaphore(信號(hào)量)
這也是一把鎖,可以指定有幾個(gè)線程可以同時(shí)獲得這把鎖,最多是5個(gè)(前面說的互斥鎖只能有一個(gè)線程獲得)
import threadingimport time semaphore=threading.Semaphore(5)def foo(): semaphore.acquire() time.sleep(2) print('ok') semaphore.release()for i in range(10): t=threading.Thread(target=foo,args=()) t.start()
運(yùn)行結(jié)果是每隔兩秒就打印5個(gè)ok
4.Event對(duì)象
線程的運(yùn)行是獨(dú)立的,如果線程間需要通信,或者說某個(gè)線程需要根據(jù)一個(gè)線程的狀態(tài)來執(zhí)行下一步的操作,就需要用到Event對(duì)象。可以把Event對(duì)象看作是一個(gè)標(biāo)志位,默認(rèn)值為假,如果一個(gè)線程等待Event對(duì)象,而此時(shí)Event對(duì)象中的標(biāo)志位為假,那么這個(gè)線程就會(huì)一直等待,直至標(biāo)志位為真,為真以后,所有等待Event對(duì)象的線程將被喚醒
event.isSet():返回event的狀態(tài)值; event.wait():如果 event.isSet()==False將阻塞線程; event.set(): 設(shè)置event的狀態(tài)值為True,所有阻塞池的線程激活進(jìn)入就緒狀態(tài), 等待操作系統(tǒng)調(diào)度;設(shè)置對(duì)象的時(shí)候,默認(rèn)是False的 event.clear():恢復(fù)event的狀態(tài)值為False。
用一個(gè)例子來演示Event對(duì)象的用法:
import threading,time event=threading.Event() #創(chuàng)建一個(gè)event對(duì)象def foo(): print('wait.......') event.wait() #event.wait(1)#if event 對(duì)象內(nèi)的標(biāo)志位為Flase,則阻塞 #wait()里面的參數(shù)的意思是:只等待1秒,如果1秒后還沒有把標(biāo)志位改過來,就不等了,繼續(xù)執(zhí)行下面的代碼 print('connect to redis server')print('attempt to start redis sever)') time.sleep(3) event.set()for i in range(5): t=threading.Thread(target=foo,args=()) t.start()#3秒之后,主線程結(jié)束,但子線程并不是守護(hù)線程,子線程還沒結(jié)束,所以,程序并沒有結(jié)束,應(yīng)該是在3秒之后,把標(biāo)志位設(shè)為true,即event.set()
5.隊(duì)列
官方文檔說隊(duì)列在多線程中保證數(shù)據(jù)安全是非常有用的
隊(duì)列可以理解為是一種數(shù)據(jù)結(jié)構(gòu),可以存儲(chǔ)數(shù)據(jù),讀寫數(shù)據(jù)。就類似列表里面加了一把鎖
5.1get和put方法
q=queue.Queue()#如果設(shè)置參數(shù)為20,第21次put的時(shí)候,程序就會(huì)阻塞住,直到有空位置,也就是有數(shù)據(jù)被get走11)q.put(3.14(q.get())(q.get())(q.get())(q.get())
get方法中有個(gè)默認(rèn)參數(shù)block=True,把這個(gè)參數(shù)改成False,取不到值的時(shí)候就會(huì)報(bào)錯(cuò)queue.Empty
這樣寫就等同于寫成q.get_nowait())
5.2join和task_done方法
join是用來阻塞進(jìn)程,與task_done配合使用才有意義??梢杂肊vent對(duì)象來理解,沒次put(),join里面的計(jì)數(shù)器加1,沒次task_done(),計(jì)數(shù)器減1,計(jì)數(shù)器為0的時(shí)候,才能進(jìn)行下次put()
注意要在每個(gè)get()后面都加task_done才行
import queueimport threading#隊(duì)列里只有put和get兩個(gè)方法,列表的那些方法都沒有q=queue.Queue()#def foo():#存數(shù)據(jù) # while True: q.put(111) q.put(222) q.put(333) q.join() print('ok')#有個(gè)join,程序就停在這里def bar(): print(q.get()) q.task_done() print(q.get()) q.task_done() print(q.get()) q.task_done()#要在每個(gè)get()語句后面都加上t1=threading.Thread(target=foo,args=()) t1.start() t2=threading.Thread(target=bar,args=()) t2.start()#t1,t2誰先誰后無所謂,因?yàn)闀?huì)阻塞住,等待信號(hào)
5.3 其他方法
q.qsize() 返回隊(duì)列的大小
q.empty() 如果隊(duì)列為空,返回True,反之False
q.full() 如果隊(duì)列滿了,返回True,反之False
q.full 與 maxsize 大小對(duì)應(yīng)
5.4其他模式
前面說的隊(duì)列都是先進(jìn)先出(FIFO)模式,另外還有先進(jìn)后出(LIFO)模式和優(yōu)先級(jí)隊(duì)列
先進(jìn)后出模式創(chuàng)建隊(duì)列的方式是:class queue.LifoQueue(maxsize)
優(yōu)先級(jí)隊(duì)列的寫法是:class queue.Priorityueue(maxsize)
q=queue.PriorityQueue()
q.put([5,100])#這個(gè)方括號(hào)只是代表一個(gè)序列類型,元組列表都行,但是都必須所有的一樣
q.put([7,200])
q.put([3,"hello"])
q.put([4,{"name":"alex"}])
中括號(hào)里面第一個(gè)位置就是優(yōu)先級(jí)
5.5 生產(chǎn)者消費(fèi)者模型
生產(chǎn)者就相當(dāng)于產(chǎn)生數(shù)據(jù)的線程,消費(fèi)者就相當(dāng)于取數(shù)據(jù)的線程。我們?cè)诰帉懗绦虻臅r(shí)候,一定要考慮生產(chǎn)數(shù)據(jù)的能力和消費(fèi)數(shù)據(jù)的能力是否匹配,如果不匹配,那肯定要有一方需要等待,所以引入了生產(chǎn)者和消費(fèi)者模型。
這個(gè)模型是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者之間的 強(qiáng)耦合問題。有了這個(gè)容器,他們不用直接通信,而是通過這個(gè)容器,這個(gè)容器就是一個(gè)阻塞隊(duì)列,相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的能力。我們寫程序時(shí)用的目錄結(jié)構(gòu),不也是為了解耦和嗎
除了解決強(qiáng)耦合問題,生產(chǎn)者消費(fèi)者模型還能實(shí)現(xiàn)并發(fā)
當(dāng)生產(chǎn)者消費(fèi)者能力不匹配的時(shí)候,就考慮加限制,類似if q.qsize()<20,這種
四、多進(jìn)程
python 中有一把全局鎖(GIL)使得多線程無法使用多核,但是如果是多進(jìn)程,這把鎖就限制不了了。如何開多個(gè)進(jìn)程呢,需要導(dǎo)入一個(gè)multiprocessing模塊
import multiprocessingimport timedef foo(): print('ok') time.sleep(2)if __name__ == '__main__':#必須是這個(gè)格式 p=multiprocessing.Process(target=foo,args=()) p.start() print('ending')
雖然可以開多進(jìn)程,但是一定注意不能開太多,因?yàn)檫M(jìn)程間切換非常消耗系統(tǒng)資源,如果開上千個(gè)子進(jìn)程,系統(tǒng)會(huì)崩潰的,而且進(jìn)程間的通信也是個(gè)問題。所以,進(jìn)程能不用就不用,能少用就少用
1.進(jìn)程間的通信
進(jìn)程間通信有兩種方式,隊(duì)列和管道
1.1進(jìn)程間的隊(duì)列
每個(gè)進(jìn)程在內(nèi)存中都是獨(dú)立的一塊空間,不項(xiàng)線程那樣可以共享數(shù)據(jù),所以只能由父進(jìn)程通過傳參的方式把隊(duì)列傳給子進(jìn)程
import multiprocessingimport threadingdef foo(q): q.put([12,'hello',True])if __name__ =='__main__': q=multiprocessing.Queue()#創(chuàng)建進(jìn)程隊(duì)列 #創(chuàng)建一個(gè)子線程 p=multiprocessing.Process(target=foo,args=(q,)) #通過傳參的方式把這個(gè)隊(duì)列對(duì)象傳給父進(jìn)程 p.start() print(q.get())
1.2管道
之前學(xué)過的socket其實(shí)就是管道,客戶端 的sock和服務(wù)端的conn是管道 的兩端,在進(jìn)程中也是這個(gè)玩法,也要有管道的兩頭
from multiprocessing import Pipe,Processdef foo(sk): sk.send('hello')#主進(jìn)程發(fā)消息 print(sk.recv())#主進(jìn)程收消息sock,conn=Pipe()#創(chuàng)建了管道的兩頭if __name__ == '__main__': p=Process(target=foo,args=(sock,)) p.start() print(conn.recv())#子進(jìn)程接收消息 conn.send('hi son')#子進(jìn)程發(fā)消息
2.進(jìn)程間的數(shù)據(jù)共享
我們已經(jīng)通過進(jìn)程隊(duì)列和管道兩種方式實(shí)現(xiàn)了進(jìn)程間的通信,但是還沒有實(shí)現(xiàn)數(shù)據(jù)共享
進(jìn)程間的數(shù)據(jù)共享需要引用一個(gè)manager對(duì)象實(shí)現(xiàn),使用的所有的數(shù)據(jù)類型都要通過manager點(diǎn)的方式去創(chuàng)建
from multiprocessing import Processfrom multiprocessing import Managerdef foo(l,i): l.append(i*i)if __name__ == '__main__': manager = Manager() Mlist = manager.list([11,22,33])#創(chuàng)建一個(gè)共享的列表 l=[] for i in range(5): #開辟5個(gè)子進(jìn)程 p = Process(target=foo, args=(Mlist,i)) p.start() l.append(p) for i in l: i.join()#join 方法是等待進(jìn)程結(jié)束后再執(zhí)行下一個(gè) print(Mlist)
3.進(jìn)程池
進(jìn)程池的作用是維護(hù)一個(gè)最大的進(jìn)程量,如果超出設(shè)置的最大值,程序就會(huì)阻塞,知道有可用的進(jìn)程為止
from multiprocessing import Poolimport timedef foo(n): print(n) time.sleep(2)if __name__ == '__main__': pool_obj=Pool(5)#創(chuàng)建進(jìn)程池 #通過進(jìn)程池創(chuàng)建進(jìn)程 for i in range(5): p=pool_obj.apply_async(func=foo,args=(i,)) #p是創(chuàng)建的池對(duì)象 # pool 的使用是先close(),在join(),記住就行了 pool_obj.close() pool_obj.join() print('ending')
進(jìn)程池中有以下幾個(gè)方法:
1.apply:從進(jìn)程池里取一個(gè)進(jìn)程并執(zhí)行2.apply_async:apply的異步版本3.terminate:立刻關(guān)閉線程池4.join:主進(jìn)程等待所有子進(jìn)程執(zhí)行完畢,必須在close或terminate之后5.close:等待所有進(jìn)程結(jié)束后,才關(guān)閉線程池
五、協(xié)程
協(xié)程在手,天下我有,說走就走。知道了協(xié)程,前面說的進(jìn)程線程就都忘記吧
協(xié)程可以開很多很多,沒有上限,切換之間的消耗可以忽略不計(jì)
1.yield
先來回想一下yield這個(gè)詞,熟悉不,對(duì),就是生成器那用的那個(gè)。yield是個(gè)挺神奇的東西,這是Python的一個(gè)特點(diǎn)。
http://www.cnblogs.com/zhang-can/p/7215506.html