正文

回到頂部

本文推薦到簡書閱讀:http://www.jianshu.com/p/0cd258eecf60

回到頂部

為什么要學(xué) RxJava?

提升開發(fā)效率,降低維護(hù)成本一直是開發(fā)團(tuán)隊(duì)永恒不變的宗旨。近兩年來國內(nèi)的技術(shù)圈子中越來越多的開始提及 RxJava ,越來越多的應(yīng)用和面試中都會(huì)有 RxJava ,而就目前的情況,Android 的網(wǎng)絡(luò)庫基本被 Retrofit + OkHttp 一統(tǒng)天下了,而配合上響應(yīng)式編程RxJava 可謂如魚得水。想必大家肯定被近期的 Kotlin 炸開了鍋,筆者也在閑暇之時(shí)去了解了一番(作為一個(gè)與時(shí)俱進(jìn)的有理想的青年怎么可能不與時(shí)俱進(jìn)?),發(fā)現(xiàn)其中有個(gè)非常好的優(yōu)點(diǎn)就是簡潔,支持函數(shù)式編程。是的, RxJava 最大的優(yōu)點(diǎn)也是簡潔,但它不止是簡潔,而且是** 隨著程序邏輯變得越來越復(fù)雜,它依然能夠保持簡潔 **(這貨潔身自好呀有木有)。

咳咳,要例子,猛戳這里:給 Android 開發(fā)者的 RxJava 詳解

回到頂部

什么是響應(yīng)式編程

上面我們提及了響應(yīng)式編程,不少新司機(jī)對(duì)它可謂一臉懵逼,那什么是響應(yīng)式編程呢?響應(yīng)式編程是一種基于異步數(shù)據(jù)流概念的編程模式。數(shù)據(jù)流就像一條河:它可以被觀測(cè),被過濾,被操作,或者為新的消費(fèi)者與另外一條流合并為一條新的流。

響應(yīng)式編程的一個(gè)關(guān)鍵概念是事件。事件可以被等待,可以觸發(fā)過程,也可以觸發(fā)其它事件。事件是唯一的以合適的方式將我們的現(xiàn)實(shí)世界映射到我們的軟件中:如果屋里太熱了我們就打開一扇窗戶。同樣的,當(dāng)我們的天氣app從服務(wù)端獲取到新的天氣數(shù)據(jù)后,我們需要更新app上展示天氣信息的UI;汽車上的車道偏移系統(tǒng)探測(cè)到車輛偏移了正常路線就會(huì)提醒駕駛者糾正,就是是響應(yīng)事件。

今天,響應(yīng)式編程最通用的一個(gè)場(chǎng)景是UI:我們的移動(dòng)App必須做出對(duì)網(wǎng)絡(luò)調(diào)用、用戶觸摸輸入和系統(tǒng)彈框的響應(yīng)。在這個(gè)世界上,軟件之所以是事件驅(qū)動(dòng)并響應(yīng)的是因?yàn)楝F(xiàn)實(shí)生活也是如此。

回到頂部

為什么出了一個(gè)系列后還有完結(jié)版?

RxJava 這些年可謂越來越流行,而在去年的晚些時(shí)候發(fā)布了2.0正式版。大半年已過,雖然網(wǎng)上已經(jīng)出現(xiàn)了大部分的 RxJava 教程(其實(shí)細(xì)心的你還是會(huì)發(fā)現(xiàn) 1.x 的超級(jí)多),前些日子,筆者花了大約兩周的閑暇之時(shí)寫了 RxJava 2.x 系列教程,也得到了不少反饋,其中就有不少讀者覺得每一篇的教程太短,抑或是希望更多的側(cè)重適用場(chǎng)景的介紹,在這樣的大前提下,這篇完結(jié)版教程就此誕生,僅供各位新司機(jī)采納。

回到頂部

開始

RxJava 2.x 已經(jīng)按照 Reactive-Streams specification 規(guī)范完全的重寫了,maven也被放在了io.reactivex.rxjava2:rxjava:2.x.y 下,所以 RxJava 2.x 獨(dú)立于 RxJava 1.x 而存在,而隨后官方宣布的將在一段時(shí)間后終止對(duì) RxJava 1.x 的維護(hù),所以對(duì)于熟悉 RxJava 1.x 的老司機(jī)自然可以直接看一下 2.x 的文檔和異同就能輕松上手了,而對(duì)于不熟悉的年輕司機(jī),不要慌,本醬帶你裝逼帶你飛,馬上就發(fā)車,坐穩(wěn)了:https://github.com/nanchen2251/RxJava2Examples

你只需要在 build.gradle 中加上:compile 'io.reactivex.rxjava2:rxjava:2.1.1'(2.1.1為寫此文章時(shí)的最新版本)

回到頂部

接口變化

RxJava 2.x 擁有了新的特性,其依賴于4個(gè)基礎(chǔ)接口,它們分別是

  • Publisher

  • Subscriber

  • Subscription

  • Processor

其中最核心的莫過于 Publisher 和 Subscriber。Publisher 可以發(fā)出一系列的事件,而 Subscriber 負(fù)責(zé)和處理這些事件。

其中用的比較多的自然是 Publisher 的 Flowable,它支持背壓,有興趣的可以看一下官方對(duì)于背壓的講解

可以明顯地發(fā)現(xiàn),RxJava 2.x 最大的改動(dòng)就是對(duì)于 backpressure 的處理,為此將原來的 Observable 拆分成了新的Observable 和Flowable,同時(shí)其他相關(guān)部分也同時(shí)進(jìn)行了拆分,但令人慶幸的是,是它,是它,還是它,還是我們最熟悉和最喜歡的 RxJava。

回到頂部

Observable

在 RxJava 1.x 中,我們最熟悉的莫過于 Observable 這個(gè)類了,筆者在剛剛使用 RxJava 2.x 的時(shí)候,創(chuàng)建了 一個(gè) Observable,瞬間一臉懵逼有木有,居然連我們最最熟悉的 Subscriber 都沒了,取而代之的是 ObservableEmmiter,俗稱發(fā)射器。此外,由于沒有了Subscriber的蹤影,我們創(chuàng)建觀察者時(shí)需使用 Observer。而 Observer 也不是我們熟悉的那個(gè) Observer,又出現(xiàn)了一個(gè) Disposable 參數(shù)帶你裝逼帶你飛。

廢話不多說,從會(huì)用開始,還記得 RxJava 的三部曲嗎?

** 第一步:初始化 Observable 
第二步:初始化 Observer 
第三步:建立訂閱關(guān)系 **

Observable.create(new ObservableOnSubscribe<Integer>() { // 第一步:初始化Observable
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.e(TAG, "Observable emit 1" + "\n");
                e.onNext(1);
                Log.e(TAG, "Observable emit 2" + "\n");
                e.onNext(2);
                Log.e(TAG, "Observable emit 3" + "\n");
                e.onNext(3);
                e.onComplete();
                Log.e(TAG, "Observable emit 4" + "\n" );
                e.onNext(4);
            }
        }).subscribe(new Observer<Integer>() { // 第三步:訂閱

            // 第二步:初始化Observer
            private int i;            private Disposable mDisposable;            @Override
            public void onSubscribe(@NonNull Disposable d) {      
                mDisposable = d;
            }            @Override
            public void onNext(@NonNull Integer integer) {
                i++;                if (i == 2) {                    // 在RxJava 2.x 中,新增的Disposable可以做到切斷的操作,讓Observer觀察者不再接收上游事件
                    mDisposable.dispose();
                }
            }            @Override
            public void onError(@NonNull Throwable e) {
                Log.e(TAG, "onError : value : " + e.getMessage() + "\n" );
            }            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete" + "\n" );
            }
        });

不難看出,RxJava 2.x 與 1.x 還是存在著一些區(qū)別的。首先,創(chuàng)建 Observable時(shí),回調(diào)的是 ObservableEmitter ,字面意思即發(fā)射器,并且直接 throws Exception。其次,在創(chuàng)建的 Observer 中,也多了一個(gè)回調(diào)方法:onSubscribe,傳遞參數(shù)為Disposable,Disposable 相當(dāng)于 RxJava 1.x 中的 Subscription, 用于解除訂閱??梢钥吹绞纠a中,在 i 自增到 2 的時(shí)候,訂閱關(guān)系被切斷。

07-03 14:24:11.663 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onSubscribe : false07-03 14:24:11.664 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 107-03 14:24:11.665 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 107-03 14:24:11.666 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 207-03 14:24:11.667 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 207-03 14:24:11.668 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : isDisposable : true07-03 14:24:11.669 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 307-03 14:24:11.670 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 4

當(dāng)然,我們的 RxJava 2.x 也為我們保留了簡化訂閱方法,我們可以根據(jù)需求,進(jìn)行相應(yīng)的簡化訂閱,只不過傳入對(duì)象改為了 Consumer。

Consumer 即消費(fèi)者,用于接收單個(gè)值,BiConsumer 則是接收兩個(gè)值,Function 用于變換對(duì)象,Predicate 用于判斷。這些接口命名大多參照了 Java 8 ,熟悉 Java 8 新特性的應(yīng)該都知道意思,這里也不再贅述。

回到頂部

線程調(diào)度

關(guān)于線程切換這點(diǎn),RxJava 1.x 和 RxJava 2.x 的實(shí)現(xiàn)思路是一樣的。這里簡單的說一下,以便于我們的新司機(jī)入手。

subScribeOn

同 RxJava 1.x 一樣,subscribeOn 用于指定subscribe()時(shí)所發(fā)生的線程,從源碼角度可以看出,內(nèi)部線程調(diào)度是通過 ObservableSubscribeOn來實(shí)現(xiàn)的。

    @SchedulerSupport(SchedulerSupport.CUSTOM)    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

ObservableSubscribeOn 的核心源碼在 subscribeActual 方法中,通過代理的方式使用 SubscribeOnObserver 包裝 Observer 后,設(shè)置 Disposable 來將 subscribe 切換到 Scheduler 線程中。

observeOn

observeOn 方法用于指定下游 Observer 回調(diào)發(fā)生的線程。

   @SchedulerSupport(SchedulerSupport.CUSTOM)    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

線程切換需要注意的

RxJava 內(nèi)置的線程調(diào)度器的確可以讓我們的線程切換得心應(yīng)手,但其中也有些需要注意的地方。

  • 簡單地說,subscribeOn() 指定的就是發(fā)射事件的線程,observerOn 指定的就是訂閱者接收事件的線程。

  • 多次指定發(fā)射事件的線程只有第一次指定的有效,也就是說多次調(diào)用 subscribeOn() 只有第一次的有效,其余的會(huì)被忽略。

  • 但多次指定訂閱者接收線程是可以的,也就是說每調(diào)用一次 observerOn(),下游的線程就會(huì)切換一次。

Observable.create(new ObservableOnSubscribe<Integer>() {            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName());
                e.onNext(1);
                e.onComplete();
            }
        }).subscribeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<Integer>() {                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.e(TAG, "After observeOn(mainThread),Current thread is " + Thread.currentThread().getName());
                    }
                })
                .observeOn(Schedulers.io())
                .subscribe(new Consumer<Integer>() {                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName());
                    }
                });

輸出:

07-03 14:54:01.177 15121-15438/com.nanchen.rxjava2examples E/RxThreadActivity: Observable thread is : RxNewThreadScheduler-107-03 14:54:01.178 15121-15121/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(mainThread),Current thread is main07-03 14:54:01.179 15121-15439/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(io),Current thread is RxCachedThreadScheduler-2

實(shí)例代碼中,分別用 Schedulers.newThread() 和 Schedulers.io() 對(duì)發(fā)射線程進(jìn)行切換,并采用 observeOn(AndroidSchedulers.mainThread() 和 Schedulers.io() 進(jìn)行了接收線程的切換。可以看到輸出中發(fā)射線程僅僅響應(yīng)了第一個(gè) newThread,但每調(diào)用一次 observeOn() ,線程便會(huì)切換一次,因此如果我們有類似的需求時(shí),便知道如何處理了。

RxJava 中,已經(jīng)內(nèi)置了很多線程選項(xiàng)供我們選擇,例如有:

  • Schedulers.io() 代表io操作的線程, 通常用于網(wǎng)絡(luò),讀寫文件等io密集型的操作;

  • Schedulers.computation() 代表CPU計(jì)算密集型的操作, 例如需要大量計(jì)算的操作;

  • Schedulers.newThread() 代表一個(gè)常規(guī)的新線程;

  • AndroidSchedulers.mainThread() 代表Android的主線程

這些內(nèi)置的 Scheduler 已經(jīng)足夠滿足我們開發(fā)的需求,因此我們應(yīng)該使用內(nèi)置的這些選項(xiàng),而 RxJava 內(nèi)部使用的是線程池來維護(hù)這些線程,所以效率也比較高。

回到頂部

操作符

關(guān)于操作符,在官方文檔中已經(jīng)做了非常完善的講解,并且筆者前面的系列教程中也著重講解了絕大多數(shù)的操作符作用,這里受于篇幅限制,就不多做贅述,只挑選幾個(gè)進(jìn)行實(shí)際情景的講解。

map

map 操作符可以將一個(gè) Observable 對(duì)象通過某種關(guān)系轉(zhuǎn)換為另一個(gè)Observable 對(duì)象。在 2.x 中和 1.x 中作用幾乎一致,不同點(diǎn)在于:2.x 將 1.x 中的 Func1 和 Func2 改為了 Function 和 BiFunction。

采用 map 操作符進(jìn)行網(wǎng)絡(luò)數(shù)據(jù)解析

想必大家都知道,很多時(shí)候我們?cè)谑褂?RxJava 的時(shí)候總是和 Retrofit 進(jìn)行結(jié)合使用,而為了方便演示,這里我們就暫且采用 OkHttp3 進(jìn)行演示,

作  者: 南 塵   seo優(yōu)化培訓(xùn),網(wǎng)絡(luò)推廣培訓(xùn),網(wǎng)絡(luò)營銷培訓(xùn),SEM培訓(xùn),網(wǎng)絡(luò)優(yōu)化,在線營銷培訓(xùn) 
出  處: http://www.cnblogs.com/liushilin/ 
關(guān)于作者:專注于移動(dòng)前端的項(xiàng)目開發(fā)。如有問題或建議,請(qǐng)多多賜教!歡迎加入Android交流群:118116509 
版權(quán)聲明:本文版權(quán)歸作者和博客園共有,歡迎轉(zhuǎn)載,但未經(jīng)作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接。 
特此聲明:所有評(píng)論和私信都會(huì)在第一時(shí)間回復(fù)。也歡迎園子的大大們指正錯(cuò)誤,共同進(jìn)步?;蛘?a style="margin: 0px; padding: 0px; color: rgb(0, 0, 0); text-decoration-line: none;">直接私信我 
聲援博主:如果您覺得文章對(duì)您有幫助,可以點(diǎn)擊文章下部推薦或側(cè)邊關(guān)注。您的鼓勵(lì)是作者堅(jiān)持原創(chuàng)和持續(xù)寫作的最大動(dòng)力! 

http://www.cnblogs.com/liushilin/p/7113417.html