banner
jzman

jzman

Coding、思考、自觉。
github

RxJava2の作成型オペレーター

RxJava は、ReactiveX の Java におけるオープンソース実装であり、観測可能なシーケンスを使用して非同期プログラミングおよびイベントベースのプログラムを行うためのライブラリです。これは公式サイトの紹介であり、主な焦点は非同期プログラミング、チェーン呼び出し、およびイベントシーケンスです。

  1. RxJava の導入
  2. 概念
  3. 基本的な実装
  4. Just オペレーター
  5. from オペレーター
  6. defer オペレーター
  7. empty オペレーター
  8. never オペレーター
  9. timer オペレーター
  10. interval オペレーター
  11. range オペレーター
  12. まとめ

RxJava の導入#

implementation "io.reactivex.rxjava2:rxjava:2.2.3"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'

概念#

RxJava におけるいくつかの重要な概念は、オブザーバー(Observer)、オブザーバブル(Observable)、およびイベントシーケンスです。イベントシーケンスは完全にオブザーバブル自身によって制御されます。では、オブザーバブルが必要なときにオブザーバーに通知するにはどうすればよいのでしょうか。これには、オブザーバブルとオブザーバーの間に購読関係を確立する必要があります。購読関係を確立した後、オブザーバブルが変化すると、オブザーバーは最初にオブザーバブルの変化を受け取ることができます。

RxJava2 におけるオブザーバー(Observer)のイベントコールバックメソッドは 4 つあります:

  • onSubscribe:購読関係を解除するために使用されます
  • onNext:イベントを送信する際にオブザーバーがこのメソッドをコールバックして送信されたイベントシーケンスを受け取ります
  • onError:イベントを送信する際にオブザーバーがこのメソッドをコールバックしてイベントシーケンスに異常が発生したことを示し、以降のイベント送信を許可しません
  • onComplete:イベントを送信する際にオブザーバーがこのメソッドをコールバックしてイベントシーケンスの送信が完了したことを示し、イベントの送信を許可します

注意

  1. onError が呼び出された後はイベントの送信を続けることはできず、onComplete が呼び出された後はイベントの送信を許可します。どちらが呼び出されてもオブザーバーはメッセージを受け取ることはありません。
  2. onError と onComplete は排他的であり、どちらか一方のみを呼び出すことが許可されています。onComplete の後に onError を呼び出すとプログラムは必ずクラッシュしますが、onError の後に onComplete を呼び出してもクラッシュしないのは、onError の後はイベントの送信が許可されないため、自然にエラーが発生しないからです。
  3. 4 つのコールバックメソッドの中で、オブザーバーとオブザーバブルが一度購読関係を確立すると onSubscribe メソッドがコールバックされ、onNext、onError、onComplete メソッドのコールバックは完全にオブザーバブルによってトリガーされるかどうかが決まります。ここには誤解を招く可能性があります。

基本的な実装#

  1. オブザーバー(Observer)を作成します。オブザーバーは、イベントが発生したときにどのように処理するかを決定します。具体的には以下を参照してください:
//オブザーバー
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        //購読解除
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        //イベントを送信する際にオブザーバーがコールバック
        Log.i(TAG, "onNext--->"+s);
    }

    @Override
    public void onError(Throwable e) {
        //イベントを送信する際にオブザーバーがコールバック(イベントシーケンスに異常が発生)
        Log.i(TAG, "onError--->");
    }

    @Override
    public void onComplete() {
        //イベントを送信する際にオブザーバーがコールバック(イベントシーケンスの送信が完了)
        Log.i(TAG, "onComplete--->");
    }
};
  1. オブザーバブル(Observable)を作成します。オブザーバブルは、いつイベントをトリガーし、どのようなイベントをトリガーするかを決定します。具体的には以下を参照してください:
//オブザーバブル
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("Event1");
        emitter.onNext("Event2");
        emitter.onComplete();
        emitter.onNext("Event3");
    }
});
  1. オブザーバーとオブザーバブルの間に購読関係を確立します。具体的には以下を参照してください:
//オブザーバーとオブザーバブルの間に購読関係を確立
observable.subscribe(observer);

上記のコードの出力結果は以下の通りです:

onSubscribe--->
onNext--->Event1
onNext--->Event2
onComplete--->

明らかに、Event2 を送信した後に onComplete メソッドが呼び出されたため、その後送信されたイベント Event3 はオブザーバーには届きません。

上記のコードはこのように書くこともでき、結果は同じです。具体的には以下を参照してください:

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("Event1");
        emitter.onNext("Event2");
        emitter.onComplete();
        emitter.onNext("Event3");
    }
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->"+s);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->");
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上記のコードでは、Observable の create メソッドを使用して Observable を作成し、それを使用して関連するイベントを送信します。理解を助けるために、公式の create オペレーターの図を見てみましょう:

create

Observable は、Observable を作成するための多くの静的メソッドも提供しています。以下では、これらの一般的なメソッドについて説明します。

Just オペレーター#

just を使用すると、指定されたイベントを送信する Observable を作成できます。just のイベント送信の上限は 10 であり、最大で 10 個のイベントを送信できます。create に比べて処理フローをある程度簡素化しています。just のオーバーロードメソッドは以下の通りです:

public static <T> Observable<T> just(T item) 
public static <T> Observable<T> just(T item1, T item2)
public static <T> Observable<T> just(T item1, T item2, T item3)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)

以下は just オペレーターの簡単な使用例です:

//justオペレーターの簡単な使用
Observable.just("Event1", "Event2", "Event3")
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG, "onSubscribe--->");
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext--->" + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError--->");
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete--->");
            }
        });

上記のコードの出力結果は以下の通りです:

onSubscribe--->
onNext--->Event1
onNext--->Event2
onNext--->Event3
onComplete--->

公式の just オペレーターの図を見てみましょう。以下は just が 4 つのイベントを送信する際の図です:

image

from オペレーター#

from 関連のオペレーターを使用すると、配列(array)、コレクション(Iterable)、および非同期タスク(future)を送信する Observable を作成できます。from 関連のオペレーターは以下のように分類できます:

//配列
public static <T> Observable<T> fromArray(T... items)
//コレクション
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
//非同期タスク
public static <T> Observable<T> fromFuture(Future<? extends T> future)
//非同期タスク+タイムアウト
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit)
//非同期タスク+タイムアウト+スレッドスケジューラー
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler)
//非同期タスク+スレッドスケジューラー
public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler)
//Reactive Streamsのパブリッシャー、使用方法はcreateオペレーターに似ており、イベントの送信はパブリッシャー(オブザーバブル)によって自ら決定されます
public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)

fromArray/fromIterable#

以下は fromArray の使用方法です:

//fromArrayオペレーターの簡単な使用
String[] events = {"Event1", "Event2", "Event3"};
Observable.fromArray(events).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->" + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->");
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

fromArray の公式図を見てみましょう:

image

以下は fromIterable の使用方法です:

//fromIterableオペレーターの簡単な使用
List<String> list = new ArrayList<>();
list.add("Event1");
list.add("Event2");
list.add("Event3");
Observable.fromIterable(list).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->" + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->" + e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

fromIterable の公式図を見てみましょう:

image

上記のコードの出力結果は以下の通りです:

onSubscribe--->
onNext--->Event1
onNext--->Event2
onNext--->Event3
onComplete--->

fromCallable#

Callable は java.util.concurrent パッケージにあり、Runnable に似ていますが、戻り値を持っています。fromCallable から発信されるイベントはメインスレッドから発信されます。購読しなければ call 内の操作は実行されません。fromCallable を使用する際には以下の点に注意してください:

  1. 時間のかかるタスクには subscribeOn を使用して購読スレッドを切り替えます。
  2. 時間のかかるタスクの Observable から発信された値を受け取るには observeOn を使用してメインスレッドに切り替えます。
  3. メモリリークなどの問題を避けるために、対応する onDestroy メソッドで購読をキャンセルします。

以下は fromCallable の簡単な使用例です:

//fromCallableオペレーターの簡単な使用
Observable.fromCallable(new Callable<String>() {
    @Override
    public String call() throws Exception {
        //他の操作...
        return "call";
    }
}).subscribe(new Observer<String>() {

    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->" + s + Thread.currentThread());
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->" + e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上記の実行結果は以下の通りです:

onSubscribe--->
onNext--->call
onComplete--->

fromCallable の公式図を見てみましょう:

image

fromFuture#

上記からわかるように、fromFuture には 4 つのオーバーロードメソッドがあり、パラメータに非同期タスク、タスクのタイムアウト、スレッドスケジューラーなどを指定できます。まず、Future インターフェースを理解しましょう。Future インターフェースは java.util.concurrent パッケージにあり、Runnable および Callable の非同期タスクの実行に対するタスクの実行状況の判断、タスク結果の取得、特定のタスクのキャンセルなどを行います。Runnable および Callable はスレッドの実行に伴うものであり、これは fromFuture から発信されるイベントがメインスレッド以外から発信されることを意味します。時間のかかるタスクを実行する場合は、subscribeOn を使用して購読スレッドを切り替えることを忘れないでください。以下は FutureTask を例にして fromFuture の使用方法を説明します。

非同期タスクを実行するための Callable を作成します。以下を参照してください:

//非同期タスク
private class MCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        Log.i(TAG, "タスク実行開始--->");
        Thread.sleep(5000);
        Log.i(TAG, "タスク実行終了--->");
        return "MCallable";
    }
}

次に、FutureTask を作成します。以下を参照してください:

//FutureTaskの作成
MCallable mCallable = new MCallable();
FutureTask<String> mFutureTask = new FutureTask<>(mCallable);

次に、Thread を使用して上記で作成した Future を実行します。以下を参照してください:

//FutureTaskの実行
new Thread(mFutureTask).start();

最後に、fromFuture を使用して対応する Observable を作成し、購読します。以下を参照してください:

//fromFuture
Observable.fromFuture(mFutureTask)
        .subscribeOn(Schedulers.io()) //購読スレッドを切り替え
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG, "onSubscribe--->");
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext--->" + s + Thread.currentThread());
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError--->" + e);
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete--->");
            }
        });

上記のコードの出力結果は以下の通りです:

タスク実行開始--->
onSubscribe--->
タスク実行終了--->
onNext--->MCallable
onComplete--->

fromFuture の公式図を見てみましょう。以下の図は fromFuture メソッドが 1 つのパラメータ Future を持つ場合の図です:

image

上記の非同期タスクは 5 秒の遅延があります。もし fromFuture のオーバーロードメソッドでタイムアウトを 4 秒に指定した場合、以下のようになります:

//タイムアウトを4秒に指定
Observable.fromFuture(mFutureTask, 4, TimeUnit.SECONDS, Schedulers.io())
//...

この場合、非同期タスクが 4 秒以内に完了できないため、Observer は onError メソッドがトリガーされ、実行結果は以下の通りです:

タスク実行開始--->
onSubscribe---> 
onError--->java.util.concurrent.TimeoutException
タスク実行終了--->

では、この非同期タスクをキャンセルするにはどうすればよいのでしょうか。これが Future の利点であり、タスクを自由にキャンセルできます。具体的には以下を参照してください:

//非同期タスクのキャンセル
public void cancelTask(View view) {
    if (mFutureTask.isDone()) {
        Log.i(TAG, "タスクはすでに完了しています--->");
    } else {
        Log.i(TAG, "タスクは実行中です--->");
        boolean cancel = mFutureTask.cancel(true);
        Log.i(TAG, "タスクキャンセルは成功しましたか--cancel->" + cancel);
        Log.i(TAG, "タスクキャンセルは成功しましたか--isCancelled->" + mFutureTask.isCancelled());
    }
}

以下はタスク実行中にタスクをキャンセルした場合の実行結果です:

タスク実行開始--->
onSubscribe--->
タスクは実行中です--->
タスクキャンセルは成功しましたか--cancel->true
タスクキャンセルは成功しましたか--isCancelled->true
onError--->java.util.concurrent.CancellationException

これにより、実行中の非同期タスクがキャンセルされました。この部分は Java Future に関連する知識についての内容が多いです。

defer オペレーター#

defer を使用して Observable を作成する際、購読時にのみ Observable が作成され、関連するイベントが送信されます。以下は defer オペレーターの使用例です:

//defer
defer = "old";
Observable<String> observable = Observable.defer(new Callable<ObservableSource<String>>() {
    @Override
    public ObservableSource<String> call() throws Exception {
        return Observable.just(defer);
    }
});

defer = "new";
observable.subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->"+s);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上記のコードの実行結果は以下の通りです:

onSubscribe--->
onNext--->new
onComplete--->

明らかに、最終的に購読前に Observable ファクトリーが最新の Observable を作成し、onNext で受け取るデータも最新のものです。defer オペレーターを理解するために、公式の defer オペレーターの図を見てみましょう:

image

empty オペレーター#

empty オペレーターを使用すると、データを発生させずに正常に終了する Observable を作成できます。以下を参照してください:

//empty
Observable.empty().subscribe(new Observer<Object>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Object o) {
        Log.i(TAG, "onNext--->"+o);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上記のコードの出力結果は以下の通りです:

onSubscribe--->
onComplete--->

empty オペレーターの使用を理解しやすくするために、いくつかの empty オペレーターの公式図を見てみましょう:

image

never オペレーター#

never オペレーターを使用すると、データを発生させず、終了もしない Observable を作成できます。以下を参照してください:

//never
Observable.never().subscribe(new Observer<Object>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Object o) {
        Log.i(TAG, "onNext--->"+o);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上記のコードの出力結果は以下の通りです:

onSubscribe--->

never オペレーターの使用を理解しやすくするために、いくつかの never オペレーターの公式図を見てみましょう:

image

timer オペレーター#

timer オペレーターを使用すると、指定した遅延で固定値 0 を送信する Observable を作成できます。また、スレッドスケジューラーを指定することもできます。timer のオーバーロードメソッドは以下の通りです:

//遅延
public static Observable<Long> timer(long delay, TimeUnit unit)
//遅延+スレッドスケジューラー
public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler) 

以下は timer の使用方法です:

//timer
Observable.timer(3, TimeUnit.SECONDS, Schedulers.io()).subscribe(new Observer<Long>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Long s) {
        Log.i(TAG, "onNext--->"+s);
        Log.i(TAG, "現在のスレッド--->"+Thread.currentThread().getName());
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上記のコードの実行結果は以下の通りです:

onSubscribe--->
//3秒遅延してデータを受信
onNext--->0
現在のスレッド--->RxCachedThreadScheduler-1
onComplete--->

timer オペレーターの使用を理解しやすくするために、いくつかの timer オペレーターの公式図を見てみましょう。以下は timer が遅延器とスレッドスケジューラーを指定した場合の例です:

image

interval オペレーター#

interval オペレーターを使用すると、固定の時間間隔で整数値を送信する Observable を作成できます。interval は初期遅延時間、時間間隔、スレッドスケジューラーなどを指定できます。interval のオーバーロードメソッドは以下の通りです:

//初期遅延+時間間隔
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) 
//初期遅延+時間間隔+スレッドスケジューラー
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
//時間間隔
public static Observable<Long> interval(long period, TimeUnit unit)
//時間間隔+スレッドスケジューラー
public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler) 

以下は interval の使用方法です:

//interval
Observable.interval(3, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Long aLong) {
        Log.i(TAG, "onNext--->"+aLong);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上記のコードを実行すると、3 秒ごとに整数値のイベントが送信されます。実行結果は以下の通りです:

onSubscribe--->
onNext--->0
onNext--->1
onNext--->2
...

interval オペレーターの使用を理解しやすくするために、いくつかの interval オペレーターの公式図を見てみましょう。以下は interval が間隔時間と時間単位を指定した場合の例です:

image

range オペレーター#

range オペレーターを使用すると、指定された整数範囲の値を送信する Observable を作成できます。range に関連するメソッドは 2 つあり、数値の範囲の表現が異なります。2 つのメソッドの宣言は以下の通りです:

// int
public static Observable<Integer> range(final int start, final int count)
// long
public static Observable<Long> rangeLong(long start, long count)

以下は range の使用方法です。具体的には以下を参照してください:

//range
Observable.range(1, 5).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Integer integer) {
        Log.i(TAG, "onNext--->"+integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上記のコードの実行結果は以下の通りです:

onSubscribe--->
onNext--->1
onNext--->2
onNext--->3
onNext--->4
onNext--->5
onComplete--->

range オペレーターの使用を理解しやすくするために、いくつかの range オペレーターの公式図を見てみましょう:

image

まとめ#

この記事では、主に RxJava2 に関連する基礎知識と RxJava2 における作成型オペレーターの理解と使用について説明しました。

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。