RxJava is the open-source implementation of ReactiveX on Java, a library for asynchronous programming and event-based programming using observable sequences. This is the introduction from the official website, with a main focus on asynchronous programming, chaining calls, and event sequences.
- Introduction to RxJava
- Concepts
- Basic Implementation
- Just Operator
- From Operator
- Defer Operator
- Empty Operator
- Never Operator
- Timer Operator
- Interval Operator
- Range Operator
- Summary
Introduction to RxJava#
implementation "io.reactivex.rxjava2:rxjava:2.2.3"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
Concepts#
Several important concepts in RxJava are: Observer, Observable, and event sequences. The event sequence is completely controlled by the Observable itself. So how does the Observable notify the Observer when needed? This requires establishing a subscription relationship between the Observable and the Observer. Once the subscription relationship is established, when the Observable changes, the Observer can receive the changes immediately.
In RxJava2, there are four event callback methods for the Observer:
- onSubscribe: Used to unsubscribe
- onNext: Called by the Observer to receive the event sequence sent when an event is emitted
- onError: Called by the Observer to indicate an error in the event sequence, and no further events will be allowed to be sent
- onComplete: Called by the Observer to indicate that the event sequence has been completed, allowing events to be sent
Note:
- After onError is called, no further events are allowed to be sent. After onComplete is called, events can be sent. Regardless of whether events can continue to be sent, neither will receive messages once called;
- onError and onComplete are mutually exclusive, only one can be called. If you call onError after onComplete, the program will crash. However, calling onComplete after onError does not crash because no events are allowed to be sent after onError, so there will naturally be no error;
- Among the four callback methods, once the subscription relationship is established, the onSubscribe method will be called. The triggering of onNext, onError, and onComplete methods is entirely determined by the Observable, which can lead to misunderstandings.
Basic Implementation#
- Create an Observer, which decides how to handle events when they occur, as shown below:
//Observer
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//Unsubscribe
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
//Called by the Observer when an event is emitted
Log.i(TAG, "onNext--->"+s);
}
@Override
public void onError(Throwable e) {
//Called by the Observer (event sequence error)
Log.i(TAG, "onError--->");
}
@Override
public void onComplete() {
//Called by the Observer (event sequence completed)
Log.i(TAG, "onComplete--->");
}
};
- Create an Observable, which decides when to trigger events and what kind of events to trigger, as shown below:
//Observable
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Event1");
emitter.onNext("Event2");
emitter.onComplete();
emitter.onNext("Event3");
}
});
- Establish a subscription relationship between the Observer and the Observable, as shown below:
//Establish subscription relationship between Observer and Observable
observable.subscribe(observer);
The output of the above code is as follows:
onSubscribe--->
onNext--->Event1
onNext--->Event2
onComplete--->
Clearly, since the onComplete method is called after sending Event2, the subsequent event Event3 will not be received by the Observer.
The above code can also be written like this, with the same result, as shown below:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Event1");
emitter.onNext("Event2");
emitter.onComplete();
emitter.onNext("Event3");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->"+s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
In the above code, the create method of Observable is used to create the Observable and send related events. To help understand, here is the official diagram of the create operator:
Observable also provides many static methods to create Observables, which will be introduced in the following sections.
Just Operator#
Using just can create an Observable that emits specified events, with a maximum of 10 events. Compared to create, it simplifies the processing flow to some extent. The overloaded methods of just are as follows:
public static <T> Observable<T> just(T item)
public static <T> Observable<T> just(T item1, T item2)
public static <T> Observable<T> just(T item1, T item2, T item3)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
Here is a simple usage of the just operator:
//Simple usage of just operator
Observable.just("Event1", "Event2", "Event3")
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
The output of the above code is as follows:
onSubscribe--->
onNext--->Event1
onNext--->Event2
onNext--->Event3
onComplete--->
Let's look at the official diagram of the just operator. Below is a diagram showing just sending four events:
From Operator#
Using from-related operators can create Observables that emit arrays, collections (Iterable), and asynchronous tasks (future). The from-related operators can be divided into the following categories:
//Array
public static <T> Observable<T> fromArray(T... items)
//Collection
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
//Asynchronous task
public static <T> Observable<T> fromFuture(Future<? extends T> future)
//Asynchronous task + timeout
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit)
//Asynchronous task + timeout + scheduler
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler)
//Asynchronous task + scheduler
public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler)
//Publisher in Reactive Streams, similar to create operator, event emission is determined by the publisher (Observable)
public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)
fromArray/fromIterable#
Here is the usage of fromArray:
//Simple usage of fromArray operator
String[] events = {"Event1", "Event2", "Event3"};
Observable.fromArray(events).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
Let's look at the official diagram of fromArray:
Here is the usage of fromIterable:
//Simple usage of fromIterable operator
List<String> list = new ArrayList<>();
list.add("Event1");
list.add("Event2");
list.add("Event3");
Observable.fromIterable(list).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
Let's look at the official diagram of fromIterable:
The output of the above code is as follows:
onSubscribe--->
onNext--->Event1
onNext--->Event2
onNext--->Event3
onComplete--->
fromCallable#
Callable is located in the java.util.concurrent package, similar to Runnable, but with a return value. Events emitted by fromCallable are emitted from the main thread. If not subscribed, the operations inside call will not be executed. When using fromCallable, pay attention to the following points:
- For time-consuming tasks, use subscribeOn to switch the subscription thread;
- To receive the emitted values of the Observable for time-consuming tasks, use observeOn to switch to the Main thread;
- To avoid memory leaks and other issues, cancel the subscription in the corresponding onDestroy method.
Here is a simple usage of fromCallable:
//Simple usage of fromCallable operator
Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
//Other operations...
return "call";
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s + Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
The execution result is as follows:
onSubscribe--->
onNext--->call
onComplete--->
Let's look at the official diagram of fromCallable:
fromFuture#
From the above, we can see that fromFuture has four overloaded methods, where parameters can specify asynchronous tasks, task timeout, scheduler, etc. First, let's understand the Future interface, which is located in the java.util.concurrent package. Its main function is to determine whether the asynchronous task of Runnable and Callable has been executed, retrieve the task result, and cancel the specific task. Since Runnable and Callable are executed with threads, this means that events emitted by fromFuture are emitted from a non-Main thread. If executing a time-consuming task, remember to use subscribeOn to switch the subscription thread. Below is an example of using FutureTask to illustrate the usage of fromFuture.
Create a Callable to execute an asynchronous task, as shown below:
//Asynchronous task
private class MCallable implements Callable<String> {
@Override
public String call() throws Exception {
Log.i(TAG, "Task execution starts--->");
Thread.sleep(5000);
Log.i(TAG, "Task execution ends--->");
return "MCallable";
}
}
Then, create a FutureTask, as shown below:
//Create FutureTask
MCallable mCallable = new MCallable();
FutureTask<String> mFutureTask = new FutureTask<>(mCallable);
Next, use Thread to execute the created Future, as shown below:
//Execute FutureTask
new Thread(mFutureTask).start();
Finally, use fromFuture to create the corresponding Observable and subscribe, as shown below:
//fromFuture
Observable.fromFuture(mFutureTask)
.subscribeOn(Schedulers.io()) //Switch subscription thread
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s + Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
The expected output of the above code is as follows:
Task execution starts--->
onSubscribe--->
Task execution ends--->
onNext--->MCallable
onComplete--->
Let's look at the official diagram of fromFuture. The following diagram shows the fromFuture method with a parameter Future:
The asynchronous task delays for 5 seconds. If we use the overloaded method of fromFuture to specify a timeout of 4 seconds, as shown below:
//Specify timeout of 4s
Observable.fromFuture(mFutureTask, 4, TimeUnit.SECONDS, Schedulers.io())
//...
At this time, since the asynchronous task cannot be completed within 4 seconds, the Observer will trigger the onError method accordingly, and the execution result is as follows:
Task execution starts--->
onSubscribe--->
onError--->java.util.concurrent.TimeoutException
Task execution ends--->
So how do we cancel this asynchronous task? This is also one of the advantages of Future, allowing the task to be canceled at will, as shown below:
//Cancel asynchronous task
public void cancelTask(View view) {
if (mFutureTask.isDone()) {
Log.i(TAG, "Task has been completed--->");
} else {
Log.i(TAG, "Task is executing--->");
boolean cancel = mFutureTask.cancel(true);
Log.i(TAG, "Task cancellation successful--cancel->" + cancel);
Log.i(TAG, "Task cancellation successful--isCancelled->" + mFutureTask.isCancelled());
}
}
Below is the execution result when canceling the task during execution:
Task execution starts--->
onSubscribe--->
Task is executing--->
Task cancellation successful--cancel->true
Task cancellation successful--isCancelled->true
onError--->java.util.concurrent.CancellationException
Thus, the executing asynchronous task has been canceled. This part of the content is more about Java Future-related knowledge.
Defer Operator#
Using defer to create an Observable means that the Observable will only be created and relevant events sent when subscribed. Here is the usage of the defer operator:
//defer
defer = "old";
Observable<String> observable = Observable.defer(new Callable<ObservableSource<String>>() {
@Override
public ObservableSource<String> call() throws Exception {
return Observable.just(defer);
}
});
defer = "new";
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->"+s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
The execution result of the above code is as follows:
onSubscribe--->
onNext--->new
onComplete--->
Clearly, the latest Observable was created just before subscription, and the data received in onNext is also the latest. To understand the defer operator, let's look at the official diagram of the defer operator:
Empty Operator#
Using the empty operator can create an Observable that does not emit any data but terminates normally, as shown below:
//empty
Observable.empty().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Object o) {
Log.i(TAG, "onNext--->"+o);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
The output of the above code is as follows:
onSubscribe--->
onComplete--->
To facilitate understanding of the empty operator's usage, let's look at some official diagrams of the empty operator:
Never Operator#
Using the never operator can create an Observable that does not emit any data and does not terminate, as shown below:
//never
Observable.never().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Object o) {
Log.i(TAG, "onNext--->"+o);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
The output of the above code is as follows:
onSubscribe--->
To facilitate understanding of the never operator's usage, let's look at some official diagrams of the never operator:
Timer Operator#
The timer operator can create an Observable that emits a fixed value of 0 after a delay, and you can also specify the scheduler. The overloaded methods of timer are as follows:
//Delay
public static Observable<Long> timer(long delay, TimeUnit unit)
//Delay + scheduler
public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler)
Here is the usage of the timer operator:
//timer
Observable.timer(3, TimeUnit.SECONDS, Schedulers.io()).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Long s) {
Log.i(TAG, "onNext--->"+s);
Log.i(TAG, "Current thread--->"+Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
The execution result of the above code is as follows:
onSubscribe--->
//Received data after a delay of 3 seconds
onNext--->0
Current thread--->RxCachedThreadScheduler-1
onComplete--->
To facilitate understanding of the timer operator's usage, let's look at some official diagrams of the timer operator. Below is an example of the timer specifying the delay and scheduler:
Interval Operator#
Using the interval operator can create an Observable that emits integer values at fixed time intervals. The interval can specify the initial delay, time interval, scheduler, etc. The overloaded methods of interval are as follows:
//Initial delay + time interval
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
//Initial delay + time interval + scheduler
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
//Time interval
public static Observable<Long> interval(long period, TimeUnit unit)
//Time interval + scheduler
public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler)
Here is the usage of the interval operator:
//interval
Observable.interval(3, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Long aLong) {
Log.i(TAG, "onNext--->"+aLong);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
After executing the above code, it will continuously send integer value events every 3 seconds, and the execution result is as follows:
onSubscribe--->
onNext--->0
onNext--->1
onNext--->2
...
To facilitate understanding of the interval operator's usage, let's look at some official diagrams of the interval operator. Below is an example of the interval specifying the interval time and time unit:
Range Operator#
Using the range operator can create an Observable that emits a specified range of integer values. The methods related to range have two, differing only in the representation of the value range. The two method declarations are as follows:
// int
public static Observable<Integer> range(final int start, final int count)
// long
public static Observable<Long> rangeLong(long start, long count)
Here is the usage of the range operator:
//range
Observable.range(1, 5).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext--->"+integer);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
The execution result of the above code is as follows:
onSubscribe--->
onNext--->1
onNext--->2
onNext--->3
onNext--->4
onNext--->5
onComplete--->
To facilitate understanding of the range operator's usage, let's look at some official diagrams of the range operator:
Summary#
This article mainly introduces the basic knowledge related to RxJava2 and the understanding and usage of creation operators in RxJava2.