RxJava 是 ReactiveX 在 Java 上的開源的實現,一個用於通過使用可觀察序列來進行異步編程和基於事件的程序的庫,這是官網的介紹,主要關注點是異步編程和鏈式調用以及事件序列。
- 引入 RxJava
- 概念
- 基本實現
- Just 操作符
- from 操作符
- defer 操作符
- empty 操作符
- never 操作符
- timer 操作符
- interval 操作符
- range 操作符
- 總結
引入 RxJava#
implementation "io.reactivex.rxjava2:rxjava:2.2.3"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
概念#
RxJava 中的幾個重要概念是:觀察者 (Observer) 、被觀察者 (Observable) 和事件序列,事件序列完全由被觀察者者自己控制,那麼被觀察者如果在需要時通知觀察者呢,這就需要被觀察者與觀察者之間建立訂閱關係。建立訂閱關係後,當被觀察者發生變化,觀察者就能在第一時間接收被觀察者的變化。
在 RxJava2 中觀察者 (Observer) 的事件回調方法有四個:
- onSubscribe:用於解除訂閱關係
- onNext:發送事件時觀察者回調該方法接收發送過來的事件序列
- onError:發送事件時觀察者回調該方法表示發送事件序列異常,將不再允許發送事件
- onComplete:發送事件時觀察者回調該方法表示事件序列發送完畢,允許發送事件
注意:
- onError 調用後不允許繼續發送事件,onComplete 調用後允許發送事件,無論是否可以繼續發送事件,兩者被調用觀察者都不會接收消息;
- onError 和 onComplete 互斥只允許調用其中一個,如果你在 onComplete 之後調用 onError 程序必然會崩潰,但是 onError 之後調用 onComplete 不崩潰是因為 onError 之後不允許發送事件,自然不會出錯;
- 四個回調方法中,觀察者和被觀察者一旦建立訂閱關係 onSubscribe 方法就會被回調,onNext、onError、onComplete 方法的回調完全由被觀察者決定是否觸發,這裡容易產生誤解。
基本實現#
- 創建觀察者 Observer ,觀察者決定時間發生的時候該如何處理,具體參考如下:
//觀察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//解除訂閱
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
//發送事件時觀察者回調
Log.i(TAG, "onNext--->"+s);
}
@Override
public void onError(Throwable e) {
//發送事件時觀察者回調(事件序列發生異常)
Log.i(TAG, "onError--->");
}
@Override
public void onComplete() {
//發送事件時觀察者回調(事件序列發送完畢)
Log.i(TAG, "onComplete--->");
}
};
- 創建被觀察者 Observable,被觀察者決定什麼時出觸發事件以及觸發何種事件,具體參考如下:
//被觀察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Event1");
emitter.onNext("Event2");
emitter.onComplete();
emitter.onNext("Event3");
}
});
- 建立觀察者與被觀察之間的訂閱關係,具體參考如下:
//建立觀察者與被觀察者之間的訂閱關係
observable.subscribe(observer);
上述代碼的輸出結果參考如下:
onSubscribe--->
onNext--->Event1
onNext--->Event2
onComplete--->
顯然,由於在 發送完 Event2 之後就調用了 onComplete 方法,之後發送的事件 Event3 將不會被觀察者收到。
上面代碼還可以這樣寫,結果是相同的,具體參考如下:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Event1");
emitter.onNext("Event2");
emitter.onComplete();
emitter.onNext("Event3");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->"+s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
上面代碼中使用了 Observable 的 create 方法來創建 Observable,並以此來進行相關事件的發送,為幫助理解來看一下官方的關於 create 操作符的示意圖:
Observable 中還提供了很多的靜態方法來創建 Observable,下文將會介紹這些常用方法。
Just 操作符#
使用 just 可以創建一個發送指定事件的 Observable,just 發送事件的上限 10,即最多發送 10 個事件,相較 create 在一定程度上簡化了處理流程,just 重載的方法如下:
public static <T> Observable<T> just(T item)
public static <T> Observable<T> just(T item1, T item2)
public static <T> Observable<T> just(T item1, T item2, T item3)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
下面是 just 操作符的簡單使用:
//just操作符的簡單使用
Observable.just("Event1", "Event2", "Event3")
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
上述代碼的輸出結果如下:
onSubscribe--->
onNext--->Event1
onNext--->Event2
onNext--->Event3
onComplete--->
來看一下官方的關於 just 操作符的示意圖,下面是 just 發送四個事件的示意圖,具體如下:
from 操作符#
使用 from 相關的操作符可以創建發送數組 (array)、集合 (Iterable) 以及異步任務 (future) 的 Observable,可將 from 相關的操作符分為如下幾類:
//數組
public static <T> Observable<T> fromArray(T... items)
//集合
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
//異步任務
public static <T> Observable<T> fromFuture(Future<? extends T> future)
//異步任務+超時時間
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit)
//異步任務+超時時間+線程調度器
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler)
//異步任務+線程調度器
public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler)
//Reactive Streams中的發布者,使用方式類似create操作符,事件的發送由發布者(被觀察者)自行決定
public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)
fromArray/fromIterable#
下面是 fromArray 的使用方式,具體如下:
//fromArray操作符的簡單使用
String[] events = {"Event1", "Event2", "Event3"};
Observable.fromArray(events).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
看一下 fromArray 的官方示意圖,具體如下:
下面是 fromIterable 的使用方式,具體如下:
//fromIterable操作符的簡單使用
List<String> list = new ArrayList<>();
list.add("Event1");
list.add("Event2");
list.add("Event3");
Observable.fromIterable(list).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
看一下 fromIterable 的官方示意圖,具體如下:
上述代碼的輸出參考如下:
onSubscribe--->
onNext--->Event1
onNext--->Event2
onNext--->Event3
onComplete--->
fromCallable#
Callable 位於 java.util.concurrent 包下,和 Runnable 類似,但是帶有返回值,使用 fromCallable 發出的事件是從主線程發出的,如果不訂閱則不會執行 call 裡面的操作,使用 fromCallable 要注意以下幾點:
- 涉及耗時任務要使用 subscribeOn 切換訂閱線程;
- 執行耗時任務是接收 Observable 的發射值要使用 observeOn 切換到 Main 線程接收;
- 為避免內存泄漏等問題,在相應的 onDestroy 方法中取消訂閱。
下面是 fromCallable 的簡單使用,參考如下:
//fromCallable操作符的簡單使用
Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
//其他操作...
return "call";
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s+Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
上述到執行結果如下:
onSubscribe--->
onNext--->call
onComplete--->
看一下 fromCallable 的官方示意圖,具體如下:
fromFuture#
從上面可知 fromFuture 有四個重載方法,參數中可以指定異步任務、任務超時時間、線程調度器等,先來認識一下 Future 接口,Future 接口位於 java.util.concurrent 包下,主要作用是對 Runnable 和 Callable 的異步任務執行進行任務是否執行的判斷、任務結果的獲取、具體任務的取消等,而 Runnable 和 Callable 伴隨著線程的執行,這就意味著使用 fromFuture 發出的事件是從非 Main 線程發出,如果執行耗時任務要記得使用 subscribeOn 切換訂閱線程,下面以 FutureTask 為例來說明 fromFuture 的使用方式。
創建一個 Callable 用來執行異步任務,參考如下:
//異步任務
private class MCallable implements Callable<String> {
@Override
public String call() throws Exception {
Log.i(TAG, "任務執行開始--->");
Thread.sleep(5000);
Log.i(TAG, "任務執行結束--->");
return "MCallable";
}
}
然後,創建一個 FutureTask ,參考如下:
//創建FutureTask
MCallable mCallable = new MCallable();
FutureTask<String> mFutureTask = new FutureTask<>(mCallable);
然後,使用 Thread 執行上面創建的 Future,參考如下:
//執行FutureTask
new Thread(mFutureTask).start();
最後,使用 fromFuture 創建與之對應的 Observeable 並訂閱,參考如下:
//fromFuture
Observable.fromFuture(mFutureTask)
.subscribeOn(Schedulers.io()) //切換訂閱線程
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s+Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
上述代碼的只想結果如下:
任務執行開始--->
onSubscribe--->
任務執行結束--->
onNext--->MCallable
onComplete--->
看一下 fromFuture 的官方示意圖,下面的示意圖是 fromFuture 方法攜帶一個參數 Future 的示意圖,具體如下:
上面的異步任務延遲 5 秒,如果使用 fromFuture 的重載方法指定超時時間為 4 秒,參考如下:
//指定超時時間為4s
Observable.fromFuture(mFutureTask,4, TimeUnit.SECONDS,Schedulers.io())
//...
此時,由於異步任務不能在 4 秒內完成,Observer 會相應的被觸發 onError 方法,執行結果參考如下:
任務執行開始--->
onSubscribe--->
onError--->java.util.concurrent.TimeoutException
任務執行結束--->
那麼如何取消這個異步任務呢,這也正是 Future 的優點所在,可以隨意的取消這個任務,具體參考如下:
//異步任務的取消
public void cancelTask(View view) {
if (mFutureTask.isDone()) {
Log.i(TAG, "任務已經完成--->");
} else {
Log.i(TAG, "任務正在執行--->");
boolean cancel = mFutureTask.cancel(true);
Log.i(TAG, "任務取消是否成功--cancel->" + cancel);
Log.i(TAG, "任務取消是否成功--isCancelled->" + mFutureTask.isCancelled());
}
}
下面是在任務執行過程中取消任務的執行結果,參考如下:
任務執行開始--->
onSubscribe--->
任務正在執行--->
任務取消是否成功--cancel->true
任務取消是否成功--isCancelled->true
onError--->java.util.concurrent.CancellationException
這樣就取消了正在執行的異步任務,這部分內容更多的是關於 Java Future 相關的知識。
defer 操作符#
使用 defer 創建 Observable 時,只有在訂閱時去才會創建 Observable 並發送相關的事件,下面是 defer 操作符的使用,參考如下:
//defer
defer = "old";
Observable<String> observable = Observable.defer(new Callable<ObservableSource<String>>() {
@Override
public ObservableSource<String> call() throws Exception {
return Observable.just(defer);
}
});
defer = "new";
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->"+s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
上述代碼的執行結果如下:
onSubscribe--->
onNext--->new
onComplete--->
顯然,最終在訂閱之前 Observable 工廠又創建了最新的 Observable,onNext 中接收的數據也是最新的,為了理解 defer 操作符,來看一下官方 defer 操作符的示意圖:
empty 操作符#
使用 empty 操作符可以創建一個不發生任何數據但正常終止的 Observable,參考如下:
//empty
Observable.empty().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Object o) {
Log.i(TAG, "onNext--->"+o);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
上述代碼的輸出結果如下:
onSubscribe--->
onComplete--->
為了方便理解 empty 操作符的使用,來看一些 empty 操作符的官方示意圖:
never 操作符#
使用 never 操作符可以創建一個不發生任何數據也不終止的 Observable,參考如下:
//never
Observable.never().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Object o) {
Log.i(TAG, "onNext--->"+o);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
上述代碼的輸出結果如下:
onSubscribe--->
為了方便理解 never 操作符的使用,來看一些 never 操作符的官方示意圖:
timer 操作符#
timer 操作符可以創建一個帶延時的發送固定數值 0 的 Observable,還可以指定線程調度器,timer 重載方法如下:
//延時
public static Observable<Long> timer(long delay, TimeUnit unit)
//延時+線程調度器
public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler)
下面是 timer 的使用方式:
//timer
Observable.timer(3, TimeUnit.SECONDS, Schedulers.io()).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Long s) {
Log.i(TAG, "onNext--->"+s);
Log.i(TAG, "當前線程--->"+Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
上述代碼的執行結果如下:
onSubscribe--->
//延時3秒收到數據
onNext--->0
當前線程--->RxCachedThreadScheduler-1
onComplete--->
為了方便理解 timer 操作符的使用,來看一些 timer 操作符的官方示意圖,下面以 timer 指定延時器和線程調度器的方式為例,具體如下:
interval 操作符#
使用 interval 操作符可以創建一個可以以固定時間間隔發送整數值的一個 Observable,interval 可以指定初始延遲時間、時間間隔、線程調度器等,interval 重載方法如下:
//初始延時+時間間隔
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
//初始延時+時間間隔+線程調度器
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler
scheduler)
//時間間隔
public static Observable<Long> interval(long period, TimeUnit unit)
//時間間隔+線程調度器
public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler)
下面是 interval 的使用方式:
//interval
Observable.interval(3,TimeUnit.SECONDS).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Long aLong) {
Log.i(TAG, "onNext--->"+aLong);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
上述代碼執行後就會以每個 3 秒持續發送值為整數的事件,執行結果如下:
onSubscribe--->
onNext--->0
onNext--->1
onNext--->2
...
為了方便理解 interval 操作符的使用,來看一些 interval 操作符的官方示意圖,下面以 interval 指定間隔時間和時間單位的方式為例,具體如下:
range 操作符#
使用 range 操作符可以創建一個可以發送指定整數範圍值的一個 Observable,range 相關的方法有兩個,只是數值的範圍表示不同,兩個方法聲明如下:
// int
public static Observable<Integer> range(final int start, final int count)
// long
public static Observable<Long> rangeLong(long start, long count)
下面是 range 的使用方式,具體參考如下:
//range
Observable.range(1,5).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext--->"+integer);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
上述代碼的執行結果如下:
onSubscribe--->
onNext--->1
onNext--->2
onNext--->3
onNext--->4
onNext--->5
onComplete--->
為了方便理解 range 操作符的使用,來看一些 range 操作符的官方示意圖:
總結#
這篇文章主要介紹了 RxJava2 相關基礎知識以及 RxJava2 中創建型操作符的理解和使用。