banner
jzman

jzman

Coding、思考、自觉。
github

RxJava2轉換型操作符

上篇文章介紹了創建型操作符的使用,今天來看一下 RxJava 中轉換型操作符的使用,常用的轉換型操作符如下:

  1. buffer 操作符
  2. window 操作符
  3. map 操作符
  4. groupBy 操作符
  5. cast 操作符
  6. scan 操作符
  7. To 操作符

buffer 操作符#

buffer 操作符重載方法比較多,這裡選取典型的幾個來說明 buffer 操作符的使用,buffer 操作符的使用可以分為如下三類,具體如下:

//第一類
public final Observable<List<T>> buffer(int count) 
public final Observable<List<T>> buffer(int count, int skip) 
//第二類
public final Observable<List<T>> buffer(long timespan, TimeUnit unit)
public final Observable<List<T>> buffer(long timespan, long timeskip, TimeUnit unit) 
//第三類
public final <B> Observable<List<T>> buffer(ObservableSource<B> boundary)
public final <TOpening, TClosing> Observable<List<T>> buffer(
            ObservableSource<? extends TOpening> openingIndicator,
            Function<? super TOpening, ? extends ObservableSource<? extends TClosing>> closingIndicator)

buffer(int count)#

buffer 操作符將一個 Observable 轉換為一個 Observable, 這個 Observable 用於收集原來發送的數據,然後發送這些緩存的數據集合,buffer 將發送的單個事件轉換成元素集合,下面是針對此種情況的官方示意圖:

image

如下面的事件的發送過程,如果不設置 buffer 則需要發送四次,如果使用如下 buffer 進行轉換,則只需發送兩次,測試代碼如下:

count = 0;
Observable.just("Event1", "Event2", "Event3", "Event4")
        .buffer(2)
        .subscribe(new Consumer<List<String>>() {
            @Override
            public void accept(List<String> strings) throws Exception {
                count++;
                Log.i(TAG, "第" + count + "次接收...");
                Log.i(TAG, "accept--->" + strings.size());
                Log.i(TAG, "接收的數據...");
                for (String str : strings) {
                    Log.i(TAG, "accept--->" + strings.size() + "---" + str);
                }
            }
        });

上述代碼的執行結果如下:

第1次接收...
accept--->2
接收的數據...
accept--->2---Event1
accept--->2---Event2
第2次接收...
accept--->2
接收的數據...
accept--->2---Event3
accept--->2---Event4

buffer(int count, int skip)#

相較 buffer (int count), skip 可以指定下一次由源 Observable 轉換的 Observable 收集事件的位置,如果 count 等於 skip, 則 buffer (int count,int skip) 等價於 buffer (int count),官方示意圖如下:

image

如下面的事件發送過程,相當於每 3 個事件一組進行發送,但每次收集數據的位置參數 skip 為 2,則每次收集的數據中會有數據重複,測試代碼如下:

count = 0;
Observable.just("Event1", "Event2", "Event3", "Event4", "Event5")
        .buffer(3, 2)
        .subscribe(new Consumer<List<String>>() {
            @Override
            public void accept(List<String> strings) throws Exception {
                count++;
                Log.i(TAG, "第" + count + "次接收...");
                Log.i(TAG, "accept--->" + strings.size());
                Log.i(TAG, "接收的數據...");
                for (String str : strings) {
                    Log.i(TAG, "accept--->" + strings.size() + "---" + str);
                }
            }
        });

上述代碼的執行結果如下:

第1次接收...
accept--->3
接收的數據...
accept--->3---Event1
accept--->3---Event2
accept--->3---Event3
第2次接收...
accept--->3
接收的數據...
accept--->3---Event3
accept--->3---Event4
accept--->3---Event5
第3次接收...
accept--->1
接收的數據...
accept--->1---Event5

buffer(long timespan, TimeUnit unit)#

buffer 操作符會將一個 Observable 轉換為一個新的 Observable,timespan 決定新的的 Observsable 在發出緩存的數據的時間間隔,官方示意圖如下:

image

如下面的事件發送過程,源 Observable 每隔 2 秒發送事件,而 buffer 新生成的 Obsrevable 則以每隔 1 秒的間隔發送緩存的事件集合,當然,這樣會在間隔的時間段收集不到數據導致丟失數據,測試代碼如下:

Observable.intervalRange(1,8,0,2, TimeUnit.SECONDS)
        .buffer(1,TimeUnit.SECONDS)
        .subscribe(new Consumer<List<Long>>() {
            @Override
            public void accept(List<Long> longs) throws Exception {
                Log.i(TAG, "accept--->" + String.valueOf(longs));
            }
        });

上述代碼的執行結果如下:

accept--->[1]
accept--->[]
accept--->[2]
accept--->[]
accept--->[3]
accept--->[]
accept--->[4]
accept--->[]
accept--->[5]

buffer(long timespan, long timeskip, TimeUnit unit)#

buffer 操作符會將一個 Observable 轉換為一個 Observable,timeskip 決定讓新生成的 Observable 定期啟動一個新的緩衝區,然後新的 Observable 會發出在 timespan 時間間隔內收集的事件集合,官方示意圖如下:

image

如下面的事件發送過程,源 Observable 會每隔 1 秒發送 1 到 12 的整數,buffer 新生成的 Observable 會每隔 5 秒接收源 Observable 發送的事件,測試代碼如下:

Observable.intervalRange(1,12,0,1, TimeUnit.SECONDS)
        .buffer(1,5, TimeUnit.SECONDS)
        .subscribe(new Consumer<List<Long>>() {
            @Override
            public void accept(List<Long> longs) throws Exception {
                Log.i(TAG, "accept--->" + String.valueOf(longs));
            }
        });

上述代碼的執行結果如下:

accept--->[1]
accept--->[6]
accept--->[11]

buffer(ObservableSource boundary)#

buffer (boundary) 會監視一個名叫 boundary 的 Observable,每當這個 Observable 發射了一個事件,它就創建一個新的 List 開始收集來自原始 Observable 的發送的事件並發送收集到的數據,官方示意圖如下:

image

如下面事件的發送過程,收集到的原事件會因為時間間隔的不同最終發送的收集到的事件的個數也不同,測試代碼如下:

Observable.intervalRange(1,10,0,2, TimeUnit.SECONDS)
        .buffer(Observable.interval(3, TimeUnit.SECONDS))
        .subscribe(new Consumer<List<Long>>() {
            @Override
            public void accept(List<Long> longs) throws Exception {
                Log.i(TAG, "accept--->" + String.valueOf(longs));
            }
        });

上述代碼的執行結果如下:

accept--->[1, 2]
accept--->[3]
accept--->[4, 5]
accept--->[6]
accept--->[7, 8]
accept--->[9]
accept--->[10]

buffer(openingIndicator, closingIndicator)#

buffer (openingIndicator, closingIndicator) 會監視一個名叫 openingIndicator 的 Observable,這個 Observable 每發射一個事件,它就創建一個 List 收集原始 Observable 發送的數據,並將收集的數據給 closingIndicator,closingIndicator 會返回一個 Observable, 這個 buffer 會監視 closingIndicator 返回的 Observable, 檢測到這個 Observable 的數據時,就會關閉這個 List 發射剛才從 openingIndicator 獲得數據,也就是名為 openingIndicator 的 Observable 收集的數據,下面是針對此種情況的官方示意圖:

image

如下面時間發送過程,原始的 Observable 以每個 1 秒的間隔發送 1 到 12 之間的整數,名為 openingIndicator 的 Observable 會每隔 3 秒創建一個 List 手機發送的事件,然後將收集的數據給 closingIndicator,closingIndicator 會延時 1 秒發送從名為 openingIndicator 的 Observable 拿到的數據,下面是測試代碼:

Observable openingIndicator = Observable.interval(3, TimeUnit.SECONDS);
Observable closingIndicator = Observable.timer(1,TimeUnit.SECONDS);
Observable.intervalRange(1,12,0,1, TimeUnit.SECONDS)
        .buffer(openingIndicator, new Function<Object, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Object o) throws Exception {
                return closingIndicator;
            }
        })
        .subscribe(new Consumer<List<Long>>() {
            @Override
            public void accept(List<Long> longs) throws Exception {
                Log.i(TAG, "accept--->" + String.valueOf(longs));
            }
        });

上述代碼的執行結果如下:

accept--->[4, 5]
accept--->[7]
accept--->[10]

window 操作符#

這裡就以 window (long count) 為例來介紹 window 操作符的使用,window 操作符的使用和 buffer 使用類似,不同之處是經 buffer 轉換成的 Observable 發送的時源 Observable 發送事件的事件集合,而經 window 操作符轉換成的 Observable 會依次發送 count 個源 Observable 發送的事件,該操作符官方示意圖如下:

image

測試代碼如下:

Observable.just("Event1", "Event2", "Event3", "Event4")
        .window(2)
        .subscribe(new Consumer<Observable<String>>() {
            @Override
            public void accept(Observable<String> stringObservable) throws Exception {
                Log.i(TAG, "accept--Observable->");
                stringObservable.subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.i(TAG, "accept--->" + s);
                    }
                });
            }
        });

上述代碼的執行結果如下:

accept--Observable->
accept--->Event1
accept--->Event2
accept--Observable->
accept--->Event3
accept--->Event4

map 操作符#

map(mapper)#

map 操作符可對發送的數據進行相應的類型轉化,map 操作的官方示意圖如下:

image

如下面的事件發送過程,經過 map 操作符轉換,可對源 Observable 發送的事件進行進一步的加工和轉換,測試代碼如下:

Observable.just("Event1", "Event2", "Event3", "Event4")
        .map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                return "this is " + s;
            }
        }).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.i(TAG, "accept--->" + s);
    }
});

上述代碼的執行結果如下:

accept--->this is Event1
accept--->this is Event2
accept--->this is Event3
accept--->this is Event4

flatMap(mapper)#

flatMap 操作符使用時,當源 Observable 發出事件會相應的轉換為可以發送多個事件的 Observable,這些 Observable 最終匯入同一個 Observable,然後這個 Observable 將這些事件統一發送出去,這裡決定不再想上文中一樣,每個重載方法都進行說明,這裡已常用的 flatMap (mapper) 為例,其官方示意圖如下:

image

如下面的事件發送過程,使用了 flatMap 操作符之後,源 Observable 發送事件時,相應的生成對應的 Observable,最終發送的事件都匯入同一個 Observable, 然後將事件結果回調給觀察者,測試代碼如下:

final Observable observable = Observable.just("Event5", "Event6");
Observable.just("Event1", "Event2", "Event3", "Event4")
        .flatMap(new Function<String, Observable<String>>() {
            @Override
            public Observable<String> apply(String s) throws Exception {
                return observable;
            }
        }).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.i(TAG, "accept--->" + s);
    }
});

上述代碼的執行結果如下:

accept--->Event5
accept--->Event6
accept--->Event5
accept--->Event6
accept--->Event5
accept--->Event6
accept--->Event5
accept--->Event6

concatMap(mapper)#

concatMap 的使用與 flatMap 的使用大致類似,相較 flatMap 能夠保證事件接收的順序,而 flatMap 不能保證事件接收的順序,concatMap 操作符的官方示意圖如下:

image

如下面的事件發送過程,我們在源 Observable 發送整數 1 時延時 3 秒,然後繼續發送其他事件,下面是測試代碼:

Observable.intervalRange(1, 2, 0, 1, TimeUnit.SECONDS)
        .concatMap(new Function<Long, ObservableSource<Long>>() {
            @Override
            public ObservableSource<Long> apply(Long aLong) throws Exception {
                int delay = 0;
                if (aLong == 1) {
                    delay = 3;
                }
                return Observable.intervalRange(4, 4, delay, 1, TimeUnit.SECONDS);
            }
        }).subscribe(new Consumer<Long>() {
    @Override
    public void accept(Long aLong) throws Exception {
        Log.i(TAG, "accept--->" + aLong);
    }
});

使用 concatMap 操作符上述代碼的執行結果如下:

accept--->4
accept--->5
accept--->6
accept--->7
accept--->4
accept--->5
accept--->6
accept--->7

使用 flatMap 操作符上述代碼的執行結果如下:

accept--->4
accept--->5
accept--->6
accept--->4
accept--->7
accept--->5
accept--->6
accept--->7

可見,concatMap 相較 flatMap 能夠保證事件接收的順序。

switchMap(mapper)#

當源 Observable 發送事件時會相應的轉換為可以發送多個事件的 Observable,switchMap 操作符只關心當前這個 Observable,也就是說,源 Observable 每當發送一個新的事件時,就會丟棄前面一個發送多個事件的 Observable,官方示意圖如下:

image

如下面的事件發送過程,源 Observable 每個 2 秒發送 1 和 2,轉換成的可以發送多個事件的 Observable 每個 1 秒發送從 4 開始的整數,使用 switchMap 操作符時,源 Observable 發送一個整數 1 時,這個新的可以發送多個事件的 Observable 只發送兩個整數,也就是 4 和 5 之後就停止發送了,因為此時源 Observable 又開始發送事件了,此時會丟棄前一個可發送多個時間的 Observable, 開始下一次源 Observable 發送事件的監聽,測試代碼如下:

Observable.intervalRange(1, 2, 0, 2, TimeUnit.SECONDS)
        .switchMap(new Function<Long, ObservableSource<Long>>() {
            @Override
            public ObservableSource<Long> apply(Long aLong) throws Exception {
                Log.i(TAG, "accept-aLong-->" + aLong);
                return Observable.intervalRange(4, 4, 0, 1, TimeUnit.SECONDS);
            }
        }).subscribe(new Consumer<Long>() {
    @Override
    public void accept(Long aLong) throws Exception {
        Log.i(TAG, "accept--->" + aLong);
    }
});

上述代碼執行結果如下:

accept-aLong-->1
accept--->4
accept--->5
accept-aLong-->2
accept--->4
accept--->5
accept--->6
accept--->7

此外,還有與之相關的操作符:concatMapDelayError、concatMapEager、concatMapEagerDelayError concatMapIterable、flatMapIterable 、switchMapDelayError,都是上述操作符的擴展,這裡就不再介紹了。

groupBy 操作符#

groupBy 操作符會對接收的數據按照指定的規則進行分類,然後再被 GroupedObservable 等訂閱輸出,官方示意圖如下:

image

如下面的事件發送過程,我們會按照成績進行分組輸出,具體如下:

List<DataBean> beanList = new ArrayList<>();
beanList.add(new DataBean("成績是95分", 95));
beanList.add(new DataBean("成績是70分", 70));
beanList.add(new DataBean("成績是56分", 56));
beanList.add(new DataBean("成績是69分", 69));
beanList.add(new DataBean("成績是90分", 90));
beanList.add(new DataBean("成績是46分", 46));
beanList.add(new DataBean("成績是85分", 85));

Observable.fromIterable(beanList)
        .groupBy(new Function<DataBean, String>() {
            @Override
            public String apply(DataBean dataBean) throws Exception {
                int score = dataBean.getScore();
                if (score >= 80) {
                    return "A";
                }

                if (score >= 60 && score < 80) {
                    return "B";
                }

                if (score < 60) {
                    return "C";
                }
                return null;
            }
        })
        .subscribe(new Consumer<GroupedObservable<String, DataBean>>() {
            @Override
            public void accept(final GroupedObservable<String, DataBean> groupedObservable) throws Exception {
                groupedObservable.subscribe(new Consumer<DataBean>() {
                    @Override
                    public void accept(DataBean dataBean) throws Exception {
                        Log.i(TAG, "accept--->"+ groupedObservable.getKey() + "組--->"+dataBean.getDesc());
                    }
                });
            }
        });

上述代碼的執行結果如下:

accept--->A組--->成績是95分 
accept--->B組--->成績是70分
accept--->C組--->成績是56分
accept--->B組--->成績是69分
accept--->A組--->成績是90分
accept--->C組--->成績是46分
accept--->A組--->成績是85分

cast 操作符#

cast 操作符用於類型轉化,cast 操作符官方示意圖如下:

image

測試代碼如下:

Observable.just(1,2,3,4,5)
        .cast(String.class)
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String String) throws Exception {
                Log.i(TAG, "accept--->" + String);
            }
        });

測試會出現如下異常:

java.lang.ClassCastException: Cannot cast java.lang.Integer to java.lang.String

從結果可知,發現不同類型之間轉化會出現類型轉化異常,cast 操作符並不能進行不同類型之間的轉化,但是可以使用 cast 操作來校驗發送的事件數據類型是不是指定的類型。

scan 操作符#

scan 操作符會依次掃描每兩個元素,第一个元素沒有上一个元素,則第一個元素的上一个元素會被忽略,當掃描第二個元素時,會獲取到第一個元素,之後 apply 方法的返回值會作為上一个元素的值參與計算,最終返回轉化後的結果,scan 官方示意圖如下:

image

看一下下面的事件發送過程,第一次掃描時,第一個元素是 1,這裡相當於 last,第二個元素是 2 ,這裡相當於 item,此時 apply 方法返回的結果是 2,這個 2 會作為 last 的值參與下一次掃描計算,則下一次返回的值肯定是 2 * 3,也就是 6,測試代碼如下:

Observable.just(1, 2, 3, 4, 5)
        .scan(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer last, Integer item) throws Exception {
                Log.i(TAG, "accept--last->" + last);
                Log.i(TAG, "accept--item->" + item);
                return last * item;
            }
        })
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.i(TAG, "accept--->" + integer);
            }
        });

上述代碼的執行結果如下:

accept--->1
accept--last->1
accept--item->2
accept--->2
accept--last->2
accept--item->3
accept--->6
accept--last->6
accept--item->4
accept--->24
accept--last->24
accept--item->5
accept--->120

To 操作符#

toList()#

toList 操作符會將發送的一系列數據轉換成 List,然後一次性發送出去,toList 的官方示意圖如下:

image

測試代碼如下:

Observable.just(1, 2, 3, 4)
        .toList()
        .subscribe(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> integers) throws Exception {
                Log.i(TAG, "accept--->" + integers);
            }
        });

上述代碼的執行結果如下:

accept--->[1, 2, 3, 4]

toMap(keySelector)#

toMap 操作符會將要發送的事件按照指定的規則轉化為 Map 形式,然後一次性發送出去,toMap 操作符官方示意圖如下:

image

測試代碼如下:

Observable.just(1, 2, 3, 4)
        .toMap(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "key"+integer;
            }
        })
        .subscribe(new Consumer<Map<String, Integer>>() {
            @Override
            public void accept(Map<String, Integer> map) throws Exception {
                Log.i(TAG, "accept--->" + map);
            }
        });

上述代碼的執行結果如下:

accept--->{key2=2, key4=4, key1=1, key3=3}

RxJava 中轉換型操作符基本如上,具體使用還要集合到實際需求中。

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。