banner
jzman

jzman

Coding、思考、自觉。
github

RxJava2変換型オペレーター

上篇文章では、作成型オペレーターの使用について紹介しました。今日は、RxJava における変換型オペレーターの使用について見ていきましょう。一般的な変換型オペレーターは以下の通りです:

  1. buffer オペレーター
  2. window オペレーター
  3. map オペレーター
  4. groupBy オペレーター
  5. cast オペレーター
  6. scan オペレーター
  7. To オペレーター

buffer オペレーター#

buffer オペレーターは多くのオーバーロードメソッドを持っていますが、ここでは典型的なものをいくつか選んで、buffer オペレーターの使用方法を説明します。buffer オペレーターの使用は以下の 3 つのカテゴリに分けられます。具体的には以下の通りです:

//第一類
public final Observable<List<T>> buffer(int count) 
public final Observable<List<T>> buffer(int count, int skip) 
//第二類
public final Observable<List<T>> buffer(long timespan, TimeUnit unit)
public final Observable<List<T>> buffer(long timespan, long timeskip, TimeUnit unit) 
//第三類
public final <B> Observable<List<T>> buffer(ObservableSource<B> boundary)
public final <TOpening, TClosing> Observable<List<T>> buffer(
            ObservableSource<? extends TOpening> openingIndicator,
            Function<? super TOpening, ? extends ObservableSource<? extends TClosing>> closingIndicator)

buffer(int count)#

buffer オペレーターは、Observable を Observable に変換します。この Observable は、元のデータを収集し、そのキャッシュされたデータの集合を送信します。buffer は、送信される単一のイベントを要素の集合に変換します。以下はこの場合の公式の示意図です:

image

以下のイベントの送信プロセスでは、buffer を設定しない場合は 4 回送信する必要がありますが、以下の buffer を使用して変換すると、2 回の送信で済みます。テストコードは以下の通りです:

count = 0;
Observable.just("Event1", "Event2", "Event3", "Event4")
        .buffer(2)
        .subscribe(new Consumer<List<String>>() {
            @Override
            public void accept(List<String> strings) throws Exception {
                count++;
                Log.i(TAG, "第" + count + "次接収...");
                Log.i(TAG, "accept--->" + strings.size());
                Log.i(TAG, "受信したデータ...");
                for (String str : strings) {
                    Log.i(TAG, "accept--->" + strings.size() + "---" + str);
                }
            }
        });

上記のコードの実行結果は以下の通りです:

第1次接収...
accept--->2
受信したデータ...
accept--->2---Event1
accept--->2---Event2
第2次接収...
accept--->2
受信したデータ...
accept--->2---Event3
accept--->2---Event4

buffer(int count, int skip)#

buffer (int count) と比較して、skip は次にソース Observable から変換される Observable がイベントを収集する位置を指定できます。count が skip と等しい場合、buffer (int count, int skip) は buffer (int count) と同等です。公式の示意図は以下の通りです:

image

以下のイベント送信プロセスでは、3 つのイベントごとに 1 つのグループとして送信されますが、毎回データを収集する位置パラメータ skip が 2 であるため、毎回収集されるデータには重複があります。テストコードは以下の通りです:

count = 0;
Observable.just("Event1", "Event2", "Event3", "Event4", "Event5")
        .buffer(3, 2)
        .subscribe(new Consumer<List<String>>() {
            @Override
            public void accept(List<String> strings) throws Exception {
                count++;
                Log.i(TAG, "第" + count + "次接収...");
                Log.i(TAG, "accept--->" + strings.size());
                Log.i(TAG, "受信したデータ...");
                for (String str : strings) {
                    Log.i(TAG, "accept--->" + strings.size() + "---" + str);
                }
            }
        });

上記のコードの実行結果は以下の通りです:

第1次接収...
accept--->3
受信したデータ...
accept--->3---Event1
accept--->3---Event2
accept--->3---Event3
第2次接収...
accept--->3
受信したデータ...
accept--->3---Event3
accept--->3---Event4
accept--->3---Event5
第3次接収...
accept--->1
受信したデータ...
accept--->1---Event5

buffer(long timespan, TimeUnit unit)#

buffer オペレーターは、Observable を新しい Observable に変換します。timespan は新しい Observable がキャッシュされたデータを発信する時間間隔を決定します。公式の示意図は以下の通りです:

image

以下のイベント送信プロセスでは、ソース Observable が 2 秒ごとにイベントを送信し、buffer によって新しく生成された Observable は 1 秒ごとにキャッシュされたイベントの集合を送信します。もちろん、こうすることで間隔の時間帯にデータを収集できず、データが失われる可能性があります。テストコードは以下の通りです:

Observable.intervalRange(1,8,0,2, TimeUnit.SECONDS)
        .buffer(1,TimeUnit.SECONDS)
        .subscribe(new Consumer<List<Long>>() {
            @Override
            public void accept(List<Long> longs) throws Exception {
                Log.i(TAG, "accept--->" + String.valueOf(longs));
            }
        });

上記のコードの実行結果は以下の通りです:

accept--->[1]
accept--->[]
accept--->[2]
accept--->[]
accept--->[3]
accept--->[]
accept--->[4]
accept--->[]
accept--->[5]

buffer(long timespan, long timeskip, TimeUnit unit)#

buffer オペレーターは、Observable を Observable に変換します。timeskip は新しく生成された Observable が定期的に新しいバッファを開始することを決定し、新しい Observable は timespan の時間間隔内に収集されたイベントの集合を発信します。公式の示意図は以下の通りです:

image

以下のイベント送信プロセスでは、ソース Observable が 1 秒ごとに 1 から 12 の整数を送信し、buffer によって新しく生成された Observable は 5 秒ごとにソース Observable が送信したイベントを受信します。テストコードは以下の通りです:

Observable.intervalRange(1,12,0,1, TimeUnit.SECONDS)
        .buffer(1,5, TimeUnit.SECONDS)
        .subscribe(new Consumer<List<Long>>() {
            @Override
            public void accept(List<Long> longs) throws Exception {
                Log.i(TAG, "accept--->" + String.valueOf(longs));
            }
        });

上記のコードの実行結果は以下の通りです:

accept--->[1]
accept--->[6]
accept--->[11]

buffer(ObservableSource boundary)#

buffer (boundary) は、boundary という Observable を監視します。この Observable がイベントを発信するたびに、新しい List を作成し、元の Observable から送信されたイベントを収集し、収集したデータを送信します。公式の示意図は以下の通りです:

image

以下のイベント送信プロセスでは、収集された元のイベントは時間間隔の違いにより、最終的に送信される収集されたイベントの数も異なります。テストコードは以下の通りです:

Observable.intervalRange(1,10,0,2, TimeUnit.SECONDS)
        .buffer(Observable.interval(3, TimeUnit.SECONDS))
        .subscribe(new Consumer<List<Long>>() {
            @Override
            public void accept(List<Long> longs) throws Exception {
                Log.i(TAG, "accept--->" + String.valueOf(longs));
            }
        });

上記のコードの実行結果は以下の通りです:

accept--->[1, 2]
accept--->[3]
accept--->[4, 5]
accept--->[6]
accept--->[7, 8]
accept--->[9]
accept--->[10]

buffer(openingIndicator, closingIndicator)#

buffer (openingIndicator, closingIndicator) は、openingIndicator という Observable を監視します。この Observable がイベントを発信するたびに、List を作成し、元の Observable から送信されたデータを収集します。そして収集したデータを closingIndicator に渡します。closingIndicator は Observable を返し、この buffer は closingIndicator が返す Observable を監視し、その Observable のデータを検出すると、先ほど openingIndicator から得たデータを発信します。以下はこの場合の公式の示意図です:

image

以下の時間送信プロセスでは、元の Observable が 1 秒ごとに 1 から 12 の整数を送信し、openingIndicator という Observable が 3 秒ごとに List を作成して送信されたイベントを収集し、その収集したデータを closingIndicator に渡します。closingIndicator は 1 秒遅れて openingIndicator から取得したデータを送信します。テストコードは以下の通りです:

Observable openingIndicator = Observable.interval(3, TimeUnit.SECONDS);
Observable closingIndicator = Observable.timer(1,TimeUnit.SECONDS);
Observable.intervalRange(1,12,0,1, TimeUnit.SECONDS)
        .buffer(openingIndicator, new Function<Object, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Object o) throws Exception {
                return closingIndicator;
            }
        })
        .subscribe(new Consumer<List<Long>>() {
            @Override
            public void accept(List<Long> longs) throws Exception {
                Log.i(TAG, "accept--->" + String.valueOf(longs));
            }
        });

上記のコードの実行結果は以下の通りです:

accept--->[4, 5]
accept--->[7]
accept--->[10]

window オペレーター#

ここでは、window (long count) を例にして window オペレーターの使用を紹介します。window オペレーターの使用は buffer の使用に似ていますが、異なる点は、buffer によって変換された Observable がソース Observable の送信イベントの集合を送信するのに対し、window オペレーターによって変換された Observable は、ソース Observable の送信イベントを count 個ずつ順次送信します。このオペレーターの公式の示意図は以下の通りです:

image

テストコードは以下の通りです:

Observable.just("Event1", "Event2", "Event3", "Event4")
        .window(2)
        .subscribe(new Consumer<Observable<String>>() {
            @Override
            public void accept(Observable<String> stringObservable) throws Exception {
                Log.i(TAG, "accept--Observable->");
                stringObservable.subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.i(TAG, "accept--->" + s);
                    }
                });
            }
        });

上記のコードの実行結果は以下の通りです:

accept--Observable->
accept--->Event1
accept--->Event2
accept--Observable->
accept--->Event3
accept--->Event4

map オペレーター#

map(mapper)#

map オペレーターは、送信されるデータの型を変換することができます。map オペレーションの公式の示意図は以下の通りです:

image

以下のイベント送信プロセスでは、map オペレーターを通じて、ソース Observable が送信するイベントをさらに加工・変換できます。テストコードは以下の通りです:

Observable.just("Event1", "Event2", "Event3", "Event4")
        .map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                return "this is " + s;
            }
        }).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.i(TAG, "accept--->" + s);
    }
});

上記のコードの実行結果は以下の通りです:

accept--->this is Event1
accept--->this is Event2
accept--->this is Event3
accept--->this is Event4

flatMap(mapper)#

flatMap オペレーターを使用すると、ソース Observable がイベントを発信する際に、複数のイベントを発信できる Observable に変換されます。これらの Observable は最終的に同じ Observable に統合され、その Observable がこれらのイベントを一括で発信します。ここでは、一般的に使用される flatMap (mapper) を例にして、公式の示意図は以下の通りです:

image

以下のイベント送信プロセスでは、flatMap オペレーターを使用した後、ソース Observable がイベントを送信する際に、対応する Observable が生成され、最終的に送信されるイベントは同じ Observable に統合され、その結果が観察者にコールバックされます。テストコードは以下の通りです:

final Observable observable = Observable.just("Event5", "Event6");
Observable.just("Event1", "Event2", "Event3", "Event4")
        .flatMap(new Function<String, Observable<String>>() {
            @Override
            public Observable<String> apply(String s) throws Exception {
                return observable;
            }
        }).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.i(TAG, "accept--->" + s);
    }
});

上記のコードの実行結果は以下の通りです:

accept--->Event5
accept--->Event6
accept--->Event5
accept--->Event6
accept--->Event5
accept--->Event6
accept--->Event5
accept--->Event6

concatMap(mapper)#

concatMap の使用は flatMap の使用に似ていますが、concatMap はイベント受信の順序を保証することができ、flatMap は順序を保証できません。concatMap オペレーターの公式の示意図は以下の通りです:

image

以下のイベント送信プロセスでは、ソース Observable が整数 1 を送信する際に 3 秒遅延し、その後他のイベントを送信します。テストコードは以下の通りです:

Observable.intervalRange(1, 2, 0, 1, TimeUnit.SECONDS)
        .concatMap(new Function<Long, ObservableSource<Long>>() {
            @Override
            public ObservableSource<Long> apply(Long aLong) throws Exception {
                int delay = 0;
                if (aLong == 1) {
                    delay = 3;
                }
                return Observable.intervalRange(4, 4, delay, 1, TimeUnit.SECONDS);
            }
        }).subscribe(new Consumer<Long>() {
    @Override
    public void accept(Long aLong) throws Exception {
        Log.i(TAG, "accept--->" + aLong);
    }
});

concatMap オペレーターを使用した上記のコードの実行結果は以下の通りです:

accept--->4
accept--->5
accept--->6
accept--->7
accept--->4
accept--->5
accept--->6
accept--->7

flatMap オペレーターを使用した上記のコードの実行結果は以下の通りです:

accept--->4
accept--->5
accept--->6
accept--->4
accept--->7
accept--->5
accept--->6
accept--->7

このように、concatMap は flatMap に比べてイベント受信の順序を保証できます。

switchMap(mapper)#

ソース Observable がイベントを送信すると、複数のイベントを送信できる Observable に変換されます。switchMap オペレーターは現在の Observable のみを考慮します。つまり、ソース Observable が新しいイベントを送信するたびに、前の複数のイベントを送信する Observable を破棄します。公式の示意図は以下の通りです:

image

以下のイベント送信プロセスでは、ソース Observable が 2 秒ごとに 1 と 2 を送信し、変換された複数のイベントを送信できる Observable が 1 秒ごとに 4 から始まる整数を送信します。switchMap オペレーターを使用すると、ソース Observable が整数 1 を送信したとき、この新しい複数のイベントを送信できる Observable は 2 つの整数、つまり 4 と 5 を送信した後、送信を停止します。なぜなら、その時ソース Observable が再びイベントを送信し始め、前の複数のイベントを送信できる Observable が破棄され、新しいソース Observable のイベント送信の監視が始まるからです。テストコードは以下の通りです:

Observable.intervalRange(1, 2, 0, 2, TimeUnit.SECONDS)
        .switchMap(new Function<Long, ObservableSource<Long>>() {
            @Override
            public ObservableSource<Long> apply(Long aLong) throws Exception {
                Log.i(TAG, "accept-aLong-->" + aLong);
                return Observable.intervalRange(4, 4, 0, 1, TimeUnit.SECONDS);
            }
        }).subscribe(new Consumer<Long>() {
    @Override
    public void accept(Long aLong) throws Exception {
        Log.i(TAG, "accept--->" + aLong);
    }
});

上記のコードの実行結果は以下の通りです:

accept-aLong-->1
accept--->4
accept--->5
accept-aLong-->2
accept--->4
accept--->5
accept--->6
accept--->7

また、concatMapDelayError、concatMapEager、concatMapEagerDelayError、concatMapIterable、flatMapIterable、switchMapDelayError など、これらのオペレーターに関連するオペレーターもありますが、ここでは紹介しません。

groupBy オペレーター#

groupBy オペレーターは、受信したデータを指定されたルールに従って分類し、その後 GroupedObservable などが出力を購読します。公式の示意図は以下の通りです:

image

以下のイベント送信プロセスでは、成績に基づいてグループ化して出力します。具体的には以下の通りです:

List<DataBean> beanList = new ArrayList<>();
beanList.add(new DataBean("成績は95点です", 95));
beanList.add(new DataBean("成績は70点です", 70));
beanList.add(new DataBean("成績は56点です", 56));
beanList.add(new DataBean("成績は69点です", 69));
beanList.add(new DataBean("成績は90点です", 90));
beanList.add(new DataBean("成績は46点です", 46));
beanList.add(new DataBean("成績は85点です", 85));

Observable.fromIterable(beanList)
        .groupBy(new Function<DataBean, String>() {
            @Override
            public String apply(DataBean dataBean) throws Exception {
                int score = dataBean.getScore();
                if (score >= 80) {
                    return "A";
                }

                if (score >= 60 && score < 80) {
                    return "B";
                }

                if (score < 60) {
                    return "C";
                }
                return null;
            }
        })
        .subscribe(new Consumer<GroupedObservable<String, DataBean>>() {
            @Override
            public void accept(final GroupedObservable<String, DataBean> groupedObservable) throws Exception {
                groupedObservable.subscribe(new Consumer<DataBean>() {
                    @Override
                    public void accept(DataBean dataBean) throws Exception {
                        Log.i(TAG, "accept--->"+ groupedObservable.getKey() + "グループ--->"+dataBean.getDesc());
                    }
                });
            }
        });

上記のコードの実行結果は以下の通りです:

accept--->Aグループ--->成績は95点です 
accept--->Bグループ--->成績は70点です
accept--->Cグループ--->成績は56点です
accept--->Bグループ--->成績は69点です
accept--->Aグループ--->成績は90点です
accept--->Cグループ--->成績は46点です
accept--->Aグループ--->成績は85点です

cast オペレーター#

cast オペレーターは型変換に使用されます。cast オペレーターの公式の示意図は以下の通りです:

image

テストコードは以下の通りです:

Observable.just(1,2,3,4,5)
        .cast(String.class)
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String String) throws Exception {
                Log.i(TAG, "accept--->" + String);
            }
        });

テストでは以下のような例外が発生します:

java.lang.ClassCastException: Cannot cast java.lang.Integer to java.lang.String

結果からわかるように、異なる型間の変換では型変換例外が発生します。cast オペレーターは異なる型間の変換を行うことはできませんが、送信されるイベントデータの型が指定された型であるかどうかを検証するために使用できます。

scan オペレーター#

scan オペレーターは、順に 2 つの要素をスキャンします。最初の要素には前の要素がない場合、最初の要素の前の要素は無視されます。2 番目の要素をスキャンする際に、最初の要素を取得し、その後 apply メソッドの戻り値が前の要素の値として計算に参加し、最終的に変換された結果を返します。scan の公式の示意図は以下の通りです:

image

以下のイベント送信プロセスを見てみましょう。最初のスキャンでは、最初の要素は 1 で、ここでは last に相当します。2 番目の要素は 2 で、ここでは item に相当します。この時 apply メソッドの戻り値は 2 で、この 2 は次のスキャン計算に参加する last の値となります。次回の戻り値は必ず 2 * 3、つまり 6 になります。テストコードは以下の通りです:

Observable.just(1, 2, 3, 4, 5)
        .scan(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer last, Integer item) throws Exception {
                Log.i(TAG, "accept--last->" + last);
                Log.i(TAG, "accept--item->" + item);
                return last * item;
            }
        })
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.i(TAG, "accept--->" + integer);
            }
        });

上記のコードの実行結果は以下の通りです:

accept--->1
accept--last->1
accept--item->2
accept--->2
accept--last->2
accept--item->3
accept--->6
accept--last->6
accept--item->4
accept--->24
accept--last->24
accept--item->5
accept--->120

To オペレーター#

toList()#

toList オペレーターは、送信された一連のデータを List に変換し、一度に送信します。toList の公式の示意図は以下の通りです:

image

テストコードは以下の通りです:

Observable.just(1, 2, 3, 4)
        .toList()
        .subscribe(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> integers) throws Exception {
                Log.i(TAG, "accept--->" + integers);
            }
        });

上記のコードの実行結果は以下の通りです:

accept--->[1, 2, 3, 4]

toMap(keySelector)#

toMap オペレーターは、送信されるイベントを指定されたルールに従って Map 形式に変換し、一度に送信します。toMap オペレーターの公式の示意図は以下の通りです:

image

テストコードは以下の通りです:

Observable.just(1, 2, 3, 4)
        .toMap(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "key"+integer;
            }
        })
        .subscribe(new Consumer<Map<String, Integer>>() {
            @Override
            public void accept(Map<String, Integer> map) throws Exception {
                Log.i(TAG, "accept--->" + map);
            }
        });

上記のコードの実行結果は以下の通りです:

accept--->{key2=2, key4=4, key1=1, key3=3}

RxJava における変換型オペレーターは基本的に以上の通りです。具体的な使用は実際のニーズに応じて行う必要があります。

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。