正文
本文推薦到簡書閱讀:http://www.jianshu.com/p/0cd258eecf60
為什么要學(xué) RxJava?
提升開發(fā)效率,降低維護(hù)成本一直是開發(fā)團(tuán)隊(duì)永恒不變的宗旨。近兩年來國內(nèi)的技術(shù)圈子中越來越多的開始提及 RxJava ,越來越多的應(yīng)用和面試中都會(huì)有 RxJava ,而就目前的情況,Android 的網(wǎng)絡(luò)庫基本被 Retrofit + OkHttp 一統(tǒng)天下了,而配合上響應(yīng)式編程RxJava 可謂如魚得水。想必大家肯定被近期的 Kotlin 炸開了鍋,筆者也在閑暇之時(shí)去了解了一番(作為一個(gè)與時(shí)俱進(jìn)的有理想的青年怎么可能不與時(shí)俱進(jìn)?),發(fā)現(xiàn)其中有個(gè)非常好的優(yōu)點(diǎn)就是簡潔,支持函數(shù)式編程。是的, RxJava 最大的優(yōu)點(diǎn)也是簡潔,但它不止是簡潔,而且是** 隨著程序邏輯變得越來越復(fù)雜,它依然能夠保持簡潔 **(這貨潔身自好呀有木有)。
咳咳,要例子,猛戳這里:給 Android 開發(fā)者的 RxJava 詳解
什么是響應(yīng)式編程
上面我們提及了響應(yīng)式編程,不少新司機(jī)對(duì)它可謂一臉懵逼,那什么是響應(yīng)式編程呢?響應(yīng)式編程是一種基于異步數(shù)據(jù)流概念的編程模式。數(shù)據(jù)流就像一條河:它可以被觀測(cè),被過濾,被操作,或者為新的消費(fèi)者與另外一條流合并為一條新的流。
響應(yīng)式編程的一個(gè)關(guān)鍵概念是事件。事件可以被等待,可以觸發(fā)過程,也可以觸發(fā)其它事件。事件是唯一的以合適的方式將我們的現(xiàn)實(shí)世界映射到我們的軟件中:如果屋里太熱了我們就打開一扇窗戶。同樣的,當(dāng)我們的天氣app從服務(wù)端獲取到新的天氣數(shù)據(jù)后,我們需要更新app上展示天氣信息的UI;汽車上的車道偏移系統(tǒng)探測(cè)到車輛偏移了正常路線就會(huì)提醒駕駛者糾正,就是是響應(yīng)事件。
今天,響應(yīng)式編程最通用的一個(gè)場(chǎng)景是UI:我們的移動(dòng)App必須做出對(duì)網(wǎng)絡(luò)調(diào)用、用戶觸摸輸入和系統(tǒng)彈框的響應(yīng)。在這個(gè)世界上,軟件之所以是事件驅(qū)動(dòng)并響應(yīng)的是因?yàn)楝F(xiàn)實(shí)生活也是如此。
為什么出了一個(gè)系列后還有完結(jié)版?
RxJava 這些年可謂越來越流行,而在去年的晚些時(shí)候發(fā)布了2.0正式版。大半年已過,雖然網(wǎng)上已經(jīng)出現(xiàn)了大部分的 RxJava 教程(其實(shí)細(xì)心的你還是會(huì)發(fā)現(xiàn) 1.x 的超級(jí)多),前些日子,筆者花了大約兩周的閑暇之時(shí)寫了 RxJava 2.x 系列教程,也得到了不少反饋,其中就有不少讀者覺得每一篇的教程太短,抑或是希望更多的側(cè)重適用場(chǎng)景的介紹,在這樣的大前提下,這篇完結(jié)版教程就此誕生,僅供各位新司機(jī)采納。
開始
RxJava 2.x 已經(jīng)按照 Reactive-Streams specification 規(guī)范完全的重寫了,maven也被放在了io.reactivex.rxjava2:rxjava:2.x.y
下,所以 RxJava 2.x 獨(dú)立于 RxJava 1.x 而存在,而隨后官方宣布的將在一段時(shí)間后終止對(duì) RxJava 1.x 的維護(hù),所以對(duì)于熟悉 RxJava 1.x 的老司機(jī)自然可以直接看一下 2.x 的文檔和異同就能輕松上手了,而對(duì)于不熟悉的年輕司機(jī),不要慌,本醬帶你裝逼帶你飛,馬上就發(fā)車,坐穩(wěn)了:https://github.com/nanchen2251/RxJava2Examples
你只需要在 build.gradle 中加上:compile 'io.reactivex.rxjava2:rxjava:2.1.1'
(2.1.1為寫此文章時(shí)的最新版本)
接口變化
RxJava 2.x 擁有了新的特性,其依賴于4個(gè)基礎(chǔ)接口,它們分別是
Publisher
Subscriber
Subscription
Processor
其中最核心的莫過于 Publisher
和 Subscriber
。Publisher
可以發(fā)出一系列的事件,而 Subscriber
負(fù)責(zé)和處理這些事件。
其中用的比較多的自然是 Publisher
的 Flowable
,它支持背壓,有興趣的可以看一下官方對(duì)于背壓的講解
可以明顯地發(fā)現(xiàn),RxJava 2.x 最大的改動(dòng)就是對(duì)于 backpressure
的處理,為此將原來的 Observable
拆分成了新的Observable
和Flowable
,同時(shí)其他相關(guān)部分也同時(shí)進(jìn)行了拆分,但令人慶幸的是,是它,是它,還是它,還是我們最熟悉和最喜歡的 RxJava。
Observable
在 RxJava 1.x 中,我們最熟悉的莫過于 Observable
這個(gè)類了,筆者在剛剛使用 RxJava 2.x 的時(shí)候,創(chuàng)建了 一個(gè) Observable
,瞬間一臉懵逼有木有,居然連我們最最熟悉的 Subscriber
都沒了,取而代之的是 ObservableEmmiter
,俗稱發(fā)射器。此外,由于沒有了Subscriber
的蹤影,我們創(chuàng)建觀察者時(shí)需使用 Observer
。而 Observer
也不是我們熟悉的那個(gè) Observer
,又出現(xiàn)了一個(gè) Disposable
參數(shù)帶你裝逼帶你飛。
廢話不多說,從會(huì)用開始,還記得 RxJava 的三部曲嗎?
** 第一步:初始化 Observable
第二步:初始化 Observer
第三步:建立訂閱關(guān)系 **
Observable.create(new ObservableOnSubscribe<Integer>() { // 第一步:初始化Observable @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { Log.e(TAG, "Observable emit 1" + "\n"); e.onNext(1); Log.e(TAG, "Observable emit 2" + "\n"); e.onNext(2); Log.e(TAG, "Observable emit 3" + "\n"); e.onNext(3); e.onComplete(); Log.e(TAG, "Observable emit 4" + "\n" ); e.onNext(4); } }).subscribe(new Observer<Integer>() { // 第三步:訂閱 // 第二步:初始化Observer private int i; private Disposable mDisposable; @Override public void onSubscribe(@NonNull Disposable d) { mDisposable = d; } @Override public void onNext(@NonNull Integer integer) { i++; if (i == 2) { // 在RxJava 2.x 中,新增的Disposable可以做到切斷的操作,讓Observer觀察者不再接收上游事件 mDisposable.dispose(); } } @Override public void onError(@NonNull Throwable e) { Log.e(TAG, "onError : value : " + e.getMessage() + "\n" ); } @Override public void onComplete() { Log.e(TAG, "onComplete" + "\n" ); } });
不難看出,RxJava 2.x 與 1.x 還是存在著一些區(qū)別的。首先,創(chuàng)建 Observable
時(shí),回調(diào)的是 ObservableEmitter
,字面意思即發(fā)射器,并且直接 throws Exception。其次,在創(chuàng)建的 Observer 中,也多了一個(gè)回調(diào)方法:onSubscribe
,傳遞參數(shù)為Disposable
,Disposable
相當(dāng)于 RxJava 1.x 中的 Subscription
, 用于解除訂閱??梢钥吹绞纠a中,在 i 自增到 2 的時(shí)候,訂閱關(guān)系被切斷。
07-03 14:24:11.663 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onSubscribe : false07-03 14:24:11.664 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 107-03 14:24:11.665 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 107-03 14:24:11.666 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 207-03 14:24:11.667 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 207-03 14:24:11.668 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : isDisposable : true07-03 14:24:11.669 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 307-03 14:24:11.670 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 4
當(dāng)然,我們的 RxJava 2.x 也為我們保留了簡化訂閱方法,我們可以根據(jù)需求,進(jìn)行相應(yīng)的簡化訂閱,只不過傳入對(duì)象改為了 Consumer
。
Consumer
即消費(fèi)者,用于接收單個(gè)值,BiConsumer
則是接收兩個(gè)值,Function
用于變換對(duì)象,Predicate
用于判斷。這些接口命名大多參照了 Java 8 ,熟悉 Java 8 新特性的應(yīng)該都知道意思,這里也不再贅述。
線程調(diào)度
關(guān)于線程切換這點(diǎn),RxJava 1.x 和 RxJava 2.x 的實(shí)現(xiàn)思路是一樣的。這里簡單的說一下,以便于我們的新司機(jī)入手。
subScribeOn
同 RxJava 1.x 一樣,subscribeOn
用于指定subscribe()
時(shí)所發(fā)生的線程,從源碼角度可以看出,內(nèi)部線程調(diào)度是通過 ObservableSubscribeOn
來實(shí)現(xiàn)的。
@SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
ObservableSubscribeOn
的核心源碼在 subscribeActual
方法中,通過代理的方式使用 SubscribeOnObserver
包裝 Observer
后,設(shè)置 Disposable
來將 subscribe
切換到 Scheduler
線程中。
observeOn
observeOn
方法用于指定下游 Observer
回調(diào)發(fā)生的線程。
@SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); }
線程切換需要注意的
RxJava 內(nèi)置的線程調(diào)度器的確可以讓我們的線程切換得心應(yīng)手,但其中也有些需要注意的地方。
簡單地說,
subscribeOn()
指定的就是發(fā)射事件的線程,observerOn
指定的就是訂閱者接收事件的線程。多次指定發(fā)射事件的線程只有第一次指定的有效,也就是說多次調(diào)用
subscribeOn()
只有第一次的有效,其余的會(huì)被忽略。但多次指定訂閱者接收線程是可以的,也就是說每調(diào)用一次
observerOn()
,下游的線程就會(huì)切換一次。
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName()); e.onNext(1); e.onComplete(); } }).subscribeOn(Schedulers.newThread()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { Log.e(TAG, "After observeOn(mainThread),Current thread is " + Thread.currentThread().getName()); } }) .observeOn(Schedulers.io()) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName()); } });
輸出:
07-03 14:54:01.177 15121-15438/com.nanchen.rxjava2examples E/RxThreadActivity: Observable thread is : RxNewThreadScheduler-107-03 14:54:01.178 15121-15121/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(mainThread),Current thread is main07-03 14:54:01.179 15121-15439/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(io),Current thread is RxCachedThreadScheduler-2
實(shí)例代碼中,分別用 Schedulers.newThread()
和 Schedulers.io()
對(duì)發(fā)射線程進(jìn)行切換,并采用 observeOn(AndroidSchedulers.mainThread()
和 Schedulers.io()
進(jìn)行了接收線程的切換。可以看到輸出中發(fā)射線程僅僅響應(yīng)了第一個(gè) newThread
,但每調(diào)用一次 observeOn()
,線程便會(huì)切換一次,因此如果我們有類似的需求時(shí),便知道如何處理了。
RxJava 中,已經(jīng)內(nèi)置了很多線程選項(xiàng)供我們選擇,例如有:
Schedulers.io()
代表io操作的線程, 通常用于網(wǎng)絡(luò),讀寫文件等io密集型的操作;Schedulers.computation()
代表CPU計(jì)算密集型的操作, 例如需要大量計(jì)算的操作;Schedulers.newThread()
代表一個(gè)常規(guī)的新線程;AndroidSchedulers.mainThread()
代表Android的主線程
這些內(nèi)置的 Scheduler
已經(jīng)足夠滿足我們開發(fā)的需求,因此我們應(yīng)該使用內(nèi)置的這些選項(xiàng),而 RxJava 內(nèi)部使用的是線程池來維護(hù)這些線程,所以效率也比較高。
操作符
關(guān)于操作符,在官方文檔中已經(jīng)做了非常完善的講解,并且筆者前面的系列教程中也著重講解了絕大多數(shù)的操作符作用,這里受于篇幅限制,就不多做贅述,只挑選幾個(gè)進(jìn)行實(shí)際情景的講解。
map
map
操作符可以將一個(gè) Observable
對(duì)象通過某種關(guān)系轉(zhuǎn)換為另一個(gè)Observable
對(duì)象。在 2.x 中和 1.x 中作用幾乎一致,不同點(diǎn)在于:2.x 將 1.x 中的 Func1
和 Func2
改為了 Function
和 BiFunction
。
采用 map 操作符進(jìn)行網(wǎng)絡(luò)數(shù)據(jù)解析
想必大家都知道,很多時(shí)候我們?cè)谑褂?RxJava 的時(shí)候總是和 Retrofit 進(jìn)行結(jié)合使用,而為了方便演示,這里我們就暫且采用 OkHttp3 進(jìn)行演示,
作 者: 南 塵
出 處: http://www.cnblogs.com/liushilin/
關(guān)于作者:專注于移動(dòng)前端的項(xiàng)目開發(fā)。如有問題或建議,請(qǐng)多多賜教!歡迎加入Android交流群:118116509
版權(quán)聲明:本文版權(quán)歸作者和博客園共有,歡迎轉(zhuǎn)載,但未經(jīng)作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接。
特此聲明:所有評(píng)論和私信都會(huì)在第一時(shí)間回復(fù)。也歡迎園子的大大們指正錯(cuò)誤,共同進(jìn)步?;蛘?a style="margin: 0px; padding: 0px; color: rgb(0, 0, 0); text-decoration-line: none;">直接私信我
聲援博主:如果您覺得文章對(duì)您有幫助,可以點(diǎn)擊文章下部【推薦】或側(cè)邊【關(guān)注】。您的鼓勵(lì)是作者堅(jiān)持原創(chuàng)和持續(xù)寫作的最大動(dòng)力!
http://www.cnblogs.com/liushilin/p/7113417.html