banner
jzman

jzman

Coding、思考、自觉。
github

RxJava2创建型操作符

RxJava 是 ReactiveX 在 Java 上的开源的实现,一个用于通过使用可观察序列来进行异步编程和基于事件的程序的库,这是官网的介绍,主要关注点是异步编程和链式调用以及事件序列。

  1. 引入 RxJava
  2. 概念
  3. 基本实现
  4. Just 操作符
  5. from 操作符
  6. defer 操作符
  7. empty 操作符
  8. never 操作符
  9. timer 操作符
  10. interval 操作符
  11. range 操作符
  12. 总结

引入 RxJava#

implementation "io.reactivex.rxjava2:rxjava:2.2.3"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'

概念#

RxJava 中的几个重要概念是:观察者 (Observer) 、被观察者 (Observable) 和事件序列,事件序列完全由被观察者者自己控制,那么被观察者如果在需要时通知观察者呢,这就需要被观察者与观察者之间建立订阅关系。建立订阅关系后,当被观察者发生变化,观察者就能在第一时间接收被观察者的变化。

在 RxJava2 中观察者 (Observer) 的事件回调方法有四个:

  • onSubscribe:用于解除订阅关系
  • onNext:发送事件时观察者回调该方法接收发送过来的事件序列
  • onError:发送事件时观察者回调该方法表示发送事件序列异常,将不再允许发送事件
  • onComplete:发送事件时观察者回调该方法表示事件序列发送完毕,允许发送事件

注意

  1. onError 调用后不允许继续发送事件,onComplete 调用后允许发送事件,无论是否可以继续发送事件,两者被调用观察者都不会接收消息;
  2. onError 和 onComplete 互斥只允许调用其中一个,如果你在 onComplete 之后调用 onError 程序必然会崩溃,但是 onError 之后调用 onComplete 不崩溃是因为 onError 之后不允许发送事件,自然不会出错;
  3. 四个回调方法中,观察者和被观察者一旦建立订阅关系 onSubscribe 方法就会被回调,onNext、onError、onComplete 方法的回调完全由被观察者决定是否触发,这里容易产生误解。

基本实现#

  1. 创建观察者 Observer ,观察者决定时间发生的时候该如何处理,具体参考如下:
//观察者
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        //解除订阅
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        //发送事件时观察者回调
        Log.i(TAG, "onNext--->"+s);
    }

    @Override
    public void onError(Throwable e) {
        //发送事件时观察者回调(事件序列发生异常)
        Log.i(TAG, "onError--->");
    }

    @Override
    public void onComplete() {
        //发送事件时观察者回调(事件序列发送完毕)
        Log.i(TAG, "onComplete--->");
    }
};
  1. 创建被观察者 Observable,被观察者决定什么时出触发事件以及触发何种事件,具体参考如下:
//被观察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("Event1");
        emitter.onNext("Event2");
        emitter.onComplete();
        emitter.onNext("Event3");
    }
});
  1. 建立观察者与被观察之间的订阅关系,具体参考如下:
//建立观察者与被观察者之间的订阅关系
observable.subscribe(observer);

上述代码的输出结果参考如下:

onSubscribe--->
onNext--->Event1
onNext--->Event2
onComplete--->

显然,由于在 发送完 Event2 之后就调用了 onComplete 方法,之后发送的事件 Event3 将不会被观察者收到。

上面代码还可以这样写,结果是一样的,具体参考如下:

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("Event1");
        emitter.onNext("Event2");
        emitter.onComplete();
        emitter.onNext("Event3");
    }
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->"+s);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->");
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上面代码中使用了 Observable 的 create 方法来创建 Observable,并以此来进行相关事件的发送,为帮助理解来看一下官方的关于 create 操作符的示意图:

create

Observable 中还提供了很多的静态方法来创建 Observable,下文将会介绍这些常用方法。

Just 操作符#

使用 just 可以创建一个发送指定事件的 Observable,just 发送事件的上限 10,即最多发送 10 个事件,相较 create 在一定程度上简化了处理流程,just 重载的方法如下:

public static <T> Observable<T> just(T item) 
public static <T> Observable<T> just(T item1, T item2)
public static <T> Observable<T> just(T item1, T item2, T item3)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)

下面是 just 操作符的简单使用:

//just操作符的简单使用
Observable.just("Event1", "Event2", "Event3")
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG, "onSubscribe--->");
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext--->" + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError--->");
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete--->");
            }
        });

上述代码的输出结果如下:

onSubscribe--->
onNext--->Event1
onNext--->Event2
onNext--->Event3
onComplete--->

来看一下官方的关于 just 操作符的示意图,下面是 just 发送四个事件的示意图,具体如下:

image

from 操作符#

使用 from 相关的操作符可以创建发送数组 (array)、集合 (Iterable) 以及异步任务 (future) 的 Observable,可将 from 相关的操作符分为如下几类:

//数组
public static <T> Observable<T> fromArray(T... items)
//集合
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
//异步任务
public static <T> Observable<T> fromFuture(Future<? extends T> future)
//异步任务+超时时间
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit)
//异步任务+超时时间+线程调度器
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler)
//异步任务+线程调度器
public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler)
//Reactive Streams中的发布者,使用方式类似create操作符,事件的发送由发布者(被观察者)自行决定
public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)

fromArray/fromIterable#

下面是 fromArray 的使用方式,具体如下:

//fromArray操作符的简单使用
String[] events = {"Event1", "Event2", "Event3"};
Observable.fromArray(events).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->" + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->");
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

看一下 fromArray 的官方示意图,具体如下:

image

下面是 fromIterable 的使用方式,具体如下:

//fromIterable操作符的简单使用
List<String> list = new ArrayList<>();
list.add("Event1");
list.add("Event2");
list.add("Event3");
Observable.fromIterable(list).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->" + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->" + e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

看一下 fromIterable 的官方示意图,具体如下:

image

上述代码的输出参考如下:

onSubscribe--->
onNext--->Event1
onNext--->Event2
onNext--->Event3
onComplete--->

fromCallable#

Callable 位于 java.util.concurrent 包下,和 Runnable 类似,但是带有返回值,使用 fromCallable 发出的事件是从主线程发出的,如果不订阅则不会执行 call 里面的操作,使用 fromCallable 要注意以下几点:

  1. 涉及耗时任务要使用 subscribeOn 切换订阅线程;
  2. 执行耗时任务是接收 Observable 的发射值要使用 observeOn 切换到 Main 线程接收;
  3. 为避免内存泄漏等问题,在相应的 onDestroy 方法中取消订阅。

下面是 fromCallable 的简单使用,参考如下:

//fromCallable操作符的简单使用
Observable.fromCallable(new Callable<String>() {
    @Override
    public String call() throws Exception {
        //其他操作...
        return "call";
    }
}).subscribe(new Observer<String>() {

    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->" + s+Thread.currentThread());
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->" + e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上述到执行结果如下:

onSubscribe--->
onNext--->call
onComplete--->

看一下 fromCallable 的官方示意图,具体如下:

image

fromFuture#

从上面可知 fromFuture 有四个重载方法,参数中可以指定异步任务、任务超时时间、线程调度器等,先来认识一下 Future 接口,Future 接口位于 java.util.concurrent 包下,主要作用是对 Runnable 和 Callable 的异步任务执行进行任务是否执行的判断、任务结果的获取、具体任务的取消等,而 Runnable 和 Callable 伴随着线程的执行,这就意味着使用 fromFuture 发出的事件是从非 Main 线程发出,如果执行耗时任务要记得使用 subscribeOn 切换订阅线程,下面以 FutureTask 为例来说明 fromFuture 的使用方式。

创建一个 Callable 用来执行异步任务,参考如下:

//异步任务
private class MCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        Log.i(TAG, "任务执行开始--->");
        Thread.sleep(5000);
        Log.i(TAG, "任务执行结束--->");
        return "MCallable";
    }
}

然后,创建一个 FutureTask ,参考如下:

//创建FutureTask
MCallable mCallable = new MCallable();
FutureTask<String> mFutureTask = new FutureTask<>(mCallable);

然后,使用 Thread 执行上面创建的 Future,参考如下:

//执行FutureTask
new Thread(mFutureTask).start();

最后,使用 fromFuture 创建与之对应的 Observeable 并订阅,参考如下:

//fromFuture
Observable.fromFuture(mFutureTask)
        .subscribeOn(Schedulers.io()) //切换订阅线程
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG, "onSubscribe--->");
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext--->" + s+Thread.currentThread());
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError--->" + e);
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete--->");
            }
        });

上述代码的只想结果如下:

任务执行开始--->
onSubscribe--->
任务执行结束--->
onNext--->MCallable
onComplete--->

看一下 fromFuture 的官方示意图,下面的示意图是 fromFuture 方法携带一个参数 Future 的示意图,具体如下:

image

上面的异步任务延时 5 秒,如果使用 fromFuture 的重载方法指定超时时间为 4 秒,参考如下:

//指定超时时间为4s
Observable.fromFuture(mFutureTask,4, TimeUnit.SECONDS,Schedulers.io())
//...

此时,由于异步任务不能在 4 秒内完成,Observer 会相应的被触发 onError 方法,执行结果参考如下:

任务执行开始--->
onSubscribe---> 
onError--->java.util.concurrent.TimeoutException
任务执行结束--->

那么如何取消这个异步任务呢,这也正是 Future 的优点所在,可以随意的取消这个任务,具体参考如下:

//异步任务的取消
public void cancelTask(View view) {
    if (mFutureTask.isDone()) {
        Log.i(TAG, "任务已经完成--->");
    } else {
        Log.i(TAG, "任务正在执行--->");
        boolean cancel = mFutureTask.cancel(true);
        Log.i(TAG, "任务取消是否成功--cancel->" + cancel);
        Log.i(TAG, "任务取消是否成功--isCancelled->" + mFutureTask.isCancelled());
    }
}

下面是在任务执行过程中取消任务的执行结果,参考如下:

任务执行开始--->
onSubscribe--->
任务正在执行--->
任务取消是否成功--cancel->true
任务取消是否成功--isCancelled->true
onError--->java.util.concurrent.CancellationException

这样就取消了正在执行的异步任务,这部分内容更多的是关于 Java Future 相关的知识。

defer 操作符#

使用 defer 创建 Observable 时,只有在订阅时去才会创建 Observable 并发送相关的事件,下面是 defer 操作符的使用,参考如下:

//defer
defer = "old";
Observable<String> observable = Observable.defer(new Callable<ObservableSource<String>>() {
    @Override
    public ObservableSource<String> call() throws Exception {
        return Observable.just(defer);
    }
});

defer = "new";
observable.subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->"+s);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上述代码的执行结果如下:

onSubscribe--->
onNext--->new
onComplete--->

显然,最终在订阅之前 Observable 工厂又创建了最新的 Observable,onNext 中接收的数据也是最新的,为了理解 defer 操作符,来看一下官方 defer 操作符的示意图:

image

empty 操作符#

使用 empty 操作符可以创建一个不发生任何数据但正常终止的 Observable,参考如下:

//empty
Observable.empty().subscribe(new Observer<Object>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Object o) {
        Log.i(TAG, "onNext--->"+o);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上述代码的输出结果如下:

onSubscribe--->
onComplete--->

为了方便理解 empty 操作符的使用,来看一些 empty 操作符的官方示意图:

image

never 操作符#

使用 never 操作符可以创建一个不发生任何数据也不终止的 Observable,参考如下:

//never
Observable.never().subscribe(new Observer<Object>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Object o) {
        Log.i(TAG, "onNext--->"+o);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上述代码的输出结果如下:

onSubscribe--->

为了方便理解 never 操作符的使用,来看一些 never 操作符的官方示意图:

image

timer 操作符#

timer 操作符可以创建一个带延时的发送固定数值 0 的 Observable,还可以指定线程调度器,timer 重载方法如下:

//延时
public static Observable<Long> timer(long delay, TimeUnit unit)
//延时+线程调度器
public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler) 

下面是 timer 的使用方式:

//timer
Observable.timer(3, TimeUnit.SECONDS, Schedulers.io()).subscribe(new Observer<Long>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Long s) {
        Log.i(TAG, "onNext--->"+s);
        Log.i(TAG, "当前线程--->"+Thread.currentThread().getName());
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上述代码的执行结果如下:

onSubscribe--->
//延时3秒收到数据
onNext--->0
当前线程--->RxCachedThreadScheduler-1
onComplete--->

为了方便理解 timer 操作符的使用,来看一些 timer 操作符的官方示意图,下面以 timer 指定延时器和线程调度器的方式为例,具体如下:

image

interval 操作符#

使用 interval 操作符可以创建一个可以以固定时间间隔发送整数值的一个 Observable,interval 可以指定初始延时时间、时间间隔、线程调度器等,interval 重载方法如下:

//初始延时+时间间隔
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) 
//初始延时+时间间隔+线程调度器
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler
scheduler)
//时间间隔
public static Observable<Long> interval(long period, TimeUnit unit)
//时间间隔+线程调度器
public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler) 

下面是 interval 的使用方式:

//interval
Observable.interval(3,TimeUnit.SECONDS).subscribe(new Observer<Long>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Long aLong) {
        Log.i(TAG, "onNext--->"+aLong);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上述代码执行后就会以每个 3 秒持续发送值为整数的事件,执行结果如下:

onSubscribe--->
onNext--->0
onNext--->1
onNext--->2
...

为了方便理解 interval 操作符的使用,来看一些 interval 操作符的官方示意图,下面以 interval 指定间隔时间和时间单位的方式为例,具体如下:

image

range 操作符#

使用 range 操作符可以创建一个可以发送指定整数范围值的一个 Observable,range 相关的方法有两个,只是数值的范围表示不同,两个方法声明如下:

// int
public static Observable<Integer> range(final int start, final int count)
// long
public static Observable<Long> rangeLong(long start, long count)

下面是 range 的使用方式,具体参考如下:

//range
Observable.range(1,5).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Integer integer) {
        Log.i(TAG, "onNext--->"+integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上述代码的执行结果如下:

onSubscribe--->
onNext--->1
onNext--->2
onNext--->3
onNext--->4
onNext--->5
onComplete--->

为了方便理解 range 操作符的使用,来看一些 range 操作符的官方示意图:

image

总结#

这篇文章主要介绍了 RxJava2 相关基础知识以及 RxJava2 中创建型操作符的理解和使用。

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。