The previous article here introduced the use of creation operators. Today, let's take a look at the usage of transformation operators in RxJava. The commonly used transformation operators are as follows:
- buffer operator
- window operator
- map operator
- groupBy operator
- cast operator
- scan operator
- To operator
buffer operator#
The buffer operator has many overloaded methods. Here, we will select a few typical ones to illustrate the use of the buffer operator. The usage of the buffer operator can be divided into the following three categories:
// First category
public final Observable<List<T>> buffer(int count)
public final Observable<List<T>> buffer(int count, int skip)
// Second category
public final Observable<List<T>> buffer(long timespan, TimeUnit unit)
public final Observable<List<T>> buffer(long timespan, long timeskip, TimeUnit unit)
// Third category
public final <B> Observable<List<T>> buffer(ObservableSource<B> boundary)
public final <TOpening, TClosing> Observable<List<T>> buffer(
ObservableSource<? extends TOpening> openingIndicator,
Function<? super TOpening, ? extends ObservableSource<? extends TClosing>> closingIndicator)
buffer(int count)#
The buffer operator converts an Observable into another Observable that collects the originally emitted data and then emits these cached data collections. The buffer converts individual emitted events into collections of elements. Below is the official diagram for this situation:
In the event sending process below, if buffer is not set, it requires four emissions. If the following buffer is used for conversion, only two emissions are needed. The test code is as follows:
count = 0;
Observable.just("Event1", "Event2", "Event3", "Event4")
.buffer(2)
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {
count++;
Log.i(TAG, "Received for the " + count + " time...");
Log.i(TAG, "accept--->" + strings.size());
Log.i(TAG, "Received data...");
for (String str : strings) {
Log.i(TAG, "accept--->" + strings.size() + "---" + str);
}
}
});
The execution result of the above code is as follows:
Received for the 1 time...
accept--->2
Received data...
accept--->2---Event1
accept--->2---Event2
Received for the 2 time...
accept--->2
Received data...
accept--->2---Event3
accept--->2---Event4
buffer(int count, int skip)#
Compared to buffer(int count), skip can specify the position of the next collection of events converted from the source Observable. If count equals skip, then buffer(int count, int skip) is equivalent to buffer(int count). The official diagram is as follows:
In the event sending process below, it is equivalent to sending a group of every 3 events, but the position parameter for collecting data each time is skip = 2, so there will be data duplication in the collected data each time. The test code is as follows:
count = 0;
Observable.just("Event1", "Event2", "Event3", "Event4", "Event5")
.buffer(3, 2)
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {
count++;
Log.i(TAG, "Received for the " + count + " time...");
Log.i(TAG, "accept--->" + strings.size());
Log.i(TAG, "Received data...");
for (String str : strings) {
Log.i(TAG, "accept--->" + strings.size() + "---" + str);
}
}
});
The execution result of the above code is as follows:
Received for the 1 time...
accept--->3
Received data...
accept--->3---Event1
accept--->3---Event2
accept--->3---Event3
Received for the 2 time...
accept--->3
Received data...
accept--->3---Event3
accept--->3---Event4
accept--->3---Event5
Received for the 3 time...
accept--->1
Received data...
accept--->1---Event5
buffer(long timespan, TimeUnit unit)#
The buffer operator converts an Observable into a new Observable. The timespan determines the time interval at which the new Observable emits cached data. The official diagram is as follows:
In the event sending process below, the source Observable emits events every 2 seconds, while the newly generated Observable from buffer emits cached event collections every 1 second. Of course, this may lead to data loss due to the inability to collect data during the interval. The test code is as follows:
Observable.intervalRange(1,8,0,2, TimeUnit.SECONDS)
.buffer(1,TimeUnit.SECONDS)
.subscribe(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> longs) throws Exception {
Log.i(TAG, "accept--->" + String.valueOf(longs));
}
});
The execution result of the above code is as follows:
accept--->[1]
accept--->[]
accept--->[2]
accept--->[]
accept--->[3]
accept--->[]
accept--->[4]
accept--->[]
accept--->[5]
buffer(long timespan, long timeskip, TimeUnit unit)#
The buffer operator converts an Observable into another Observable. The timeskip determines how often the newly generated Observable starts a new buffer, and then the new Observable emits the collection of events collected within the timespan time interval. The official diagram is as follows:
In the event sending process below, the source Observable emits integers from 1 to 12 every second, while the newly generated Observable from buffer receives events sent by the source Observable every 5 seconds. The test code is as follows:
Observable.intervalRange(1,12,0,1, TimeUnit.SECONDS)
.buffer(1,5, TimeUnit.SECONDS)
.subscribe(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> longs) throws Exception {
Log.i(TAG, "accept--->" + String.valueOf(longs));
}
});
The execution result of the above code is as follows:
accept--->[1]
accept--->[6]
accept--->[11]
buffer(ObservableSource boundary)#
buffer(boundary) monitors an Observable named boundary. Whenever this Observable emits an event, it creates a new List to start collecting events emitted from the original Observable and sends the collected data. The official diagram is as follows:
In the event sending process below, the number of collected original events sent will vary due to different time intervals. The test code is as follows:
Observable.intervalRange(1,10,0,2, TimeUnit.SECONDS)
.buffer(Observable.interval(3, TimeUnit.SECONDS))
.subscribe(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> longs) throws Exception {
Log.i(TAG, "accept--->" + String.valueOf(longs));
}
});
The execution result of the above code is as follows:
accept--->[1, 2]
accept--->[3]
accept--->[4, 5]
accept--->[6]
accept--->[7, 8]
accept--->[9]
accept--->[10]
buffer(openingIndicator, closingIndicator)#
buffer(openingIndicator, closingIndicator) monitors an Observable named openingIndicator. Each time this Observable emits an event, it creates a List to collect data sent from the original Observable and gives the collected data to closingIndicator. The closingIndicator returns an Observable that this buffer monitors. When this Observable emits data, it closes the List and emits the data collected from the openingIndicator. Below is the official diagram for this situation:
In the event sending process below, the original Observable emits integers from 1 to 12 at 1-second intervals, while the Observable named openingIndicator creates a List every 3 seconds to collect emitted events and gives the collected data to closingIndicator, which will emit the data received from the openingIndicator after a 1-second delay. The test code is as follows:
Observable openingIndicator = Observable.interval(3, TimeUnit.SECONDS);
Observable closingIndicator = Observable.timer(1,TimeUnit.SECONDS);
Observable.intervalRange(1,12,0,1, TimeUnit.SECONDS)
.buffer(openingIndicator, new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Object o) throws Exception {
return closingIndicator;
}
})
.subscribe(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> longs) throws Exception {
Log.i(TAG, "accept--->" + String.valueOf(longs));
}
});
The execution result of the above code is as follows:
accept--->[4, 5]
accept--->[7]
accept--->[10]
window operator#
Here, we will introduce the use of the window operator using window(long count) as an example. The usage of the window operator is similar to that of the buffer operator. The difference is that the Observable emitted by the buffer sends collections of events from the source Observable, while the Observable transformed by the window operator sends the events emitted by the source Observable in groups of count. The official diagram for this operator is as follows:
The test code is as follows:
Observable.just("Event1", "Event2", "Event3", "Event4")
.window(2)
.subscribe(new Consumer<Observable<String>>() {
@Override
public void accept(Observable<String> stringObservable) throws Exception {
Log.i(TAG, "accept--Observable->");
stringObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "accept--->" + s);
}
});
}
});
The execution result of the above code is as follows:
accept--Observable->
accept--->Event1
accept--->Event2
accept--Observable->
accept--->Event3
accept--->Event4
map operator#
map(mapper)#
The map operator can perform type conversion on the emitted data. The official diagram for the map operation is as follows:
In the event sending process below, the map operator can further process and convert the events emitted by the source Observable. The test code is as follows:
Observable.just("Event1", "Event2", "Event3", "Event4")
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return "this is " + s;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "accept--->" + s);
}
});
The execution result of the above code is as follows:
accept--->this is Event1
accept--->this is Event2
accept--->this is Event3
accept--->this is Event4
flatMap(mapper)#
The flatMap operator, when used, converts the emitted events from the source Observable into Observables that can emit multiple events. These Observables are finally merged into a single Observable, which then emits all these events. Here, we will take the commonly used flatMap(mapper) as an example, and its official diagram is as follows:
In the event sending process below, after using the flatMap operator, when the source Observable emits events, corresponding Observables are generated, and the final emitted events are merged into a single Observable, which then calls back the event results to the observer. The test code is as follows:
final Observable observable = Observable.just("Event5", "Event6");
Observable.just("Event1", "Event2", "Event3", "Event4")
.flatMap(new Function<String, Observable<String>>() {
@Override
public Observable<String> apply(String s) throws Exception {
return observable;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "accept--->" + s);
}
});
The execution result of the above code is as follows:
accept--->Event5
accept--->Event6
accept--->Event5
accept--->Event6
accept--->Event5
accept--->Event6
accept--->Event5
accept--->Event6
concatMap(mapper)#
The usage of concatMap is similar to that of flatMap, but concatMap guarantees the order of event reception, while flatMap does not guarantee the order. The official diagram for the concatMap operator is as follows:
In the event sending process below, we delay 3 seconds when the source Observable emits the integer 1, and then continue to emit other events. The test code is as follows:
Observable.intervalRange(1, 2, 0, 1, TimeUnit.SECONDS)
.concatMap(new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long aLong) throws Exception {
int delay = 0;
if (aLong == 1) {
delay = 3;
}
return Observable.intervalRange(4, 4, delay, 1, TimeUnit.SECONDS);
}
}).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.i(TAG, "accept--->" + aLong);
}
});
The execution result of the above code using concatMap is as follows:
accept--->4
accept--->5
accept--->6
accept--->7
accept--->4
accept--->5
accept--->6
accept--->7
The execution result of the above code using flatMap is as follows:
accept--->4
accept--->5
accept--->6
accept--->4
accept--->7
accept--->5
accept--->6
accept--->7
As can be seen, concatMap guarantees the order of event reception compared to flatMap.
switchMap(mapper)#
When the source Observable emits events, it is converted into Observables that can emit multiple events. The switchMap operator only cares about the current Observable. This means that whenever the source Observable emits a new event, it will discard the previously emitted Observable that could emit multiple events. The official diagram is as follows:
In the event sending process below, the source Observable emits 1 and 2 every 2 seconds, while the Observable that can emit multiple events emits integers starting from 4 every second. When the source Observable emits the integer 1, this new Observable that can emit multiple events only sends two integers, which are 4 and 5, and then stops sending because the source Observable starts emitting events again. At this point, the previous Observable that could emit multiple events is discarded, and the listening for the next event from the source Observable begins. The test code is as follows:
Observable.intervalRange(1, 2, 0, 2, TimeUnit.SECONDS)
.switchMap(new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long aLong) throws Exception {
Log.i(TAG, "accept-aLong-->" + aLong);
return Observable.intervalRange(4, 4, 0, 1, TimeUnit.SECONDS);
}
}).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.i(TAG, "accept--->" + aLong);
}
});
The execution result of the above code is as follows:
accept-aLong-->1
accept--->4
accept--->5
accept-aLong-->2
accept--->4
accept--->5
accept--->6
accept--->7
In addition, there are related operators: concatMapDelayError, concatMapEager, concatMapEagerDelayError, concatMapIterable, flatMapIterable, switchMapDelayError, which are extensions of the above operators and will not be introduced here.
groupBy operator#
The groupBy operator categorizes the received data according to specified rules, which can then be output by GroupedObservable and others. The official diagram is as follows:
In the event sending process below, we will group the output according to scores, as follows:
List<DataBean> beanList = new ArrayList<>();
beanList.add(new DataBean("Score is 95", 95));
beanList.add(new DataBean("Score is 70", 70));
beanList.add(new DataBean("Score is 56", 56));
beanList.add(new DataBean("Score is 69", 69));
beanList.add(new DataBean("Score is 90", 90));
beanList.add(new DataBean("Score is 46", 46));
beanList.add(new DataBean("Score is 85", 85));
Observable.fromIterable(beanList)
.groupBy(new Function<DataBean, String>() {
@Override
public String apply(DataBean dataBean) throws Exception {
int score = dataBean.getScore();
if (score >= 80) {
return "A";
}
if (score >= 60 && score < 80) {
return "B";
}
if (score < 60) {
return "C";
}
return null;
}
})
.subscribe(new Consumer<GroupedObservable<String, DataBean>>() {
@Override
public void accept(final GroupedObservable<String, DataBean> groupedObservable) throws Exception {
groupedObservable.subscribe(new Consumer<DataBean>() {
@Override
public void accept(DataBean dataBean) throws Exception {
Log.i(TAG, "accept--->"+ groupedObservable.getKey() + " group--->"+dataBean.getDesc());
}
});
}
});
The execution result of the above code is as follows:
accept--->A group--->Score is 95
accept--->B group--->Score is 70
accept--->C group--->Score is 56
accept--->B group--->Score is 69
accept--->A group--->Score is 90
accept--->C group--->Score is 46
accept--->A group--->Score is 85
cast operator#
The cast operator is used for type conversion. The official diagram for the cast operator is as follows:
The test code is as follows:
Observable.just(1,2,3,4,5)
.cast(String.class)
.subscribe(new Consumer<String>() {
@Override
public void accept(String String) throws Exception {
Log.i(TAG, "accept--->" + String);
}
});
The test will produce the following exception:
java.lang.ClassCastException: Cannot cast java.lang.Integer to java.lang.String
From the result, it can be seen that type conversion between different types will result in a ClassCastException. The cast operator cannot perform type conversion between different types, but it can be used to verify whether the emitted event data type is of the specified type.
scan operator#
The scan operator scans each pair of elements in sequence. The first element does not have a previous element, so the first element's previous element will be ignored. When scanning the second element, the first element will be obtained, and the return value of the apply method will be used as the previous element's value for calculation, ultimately returning the transformed result. The official diagram for the scan operator is as follows:
Let's look at the event sending process below. In the first scan, the first element is 1, which is equivalent to last. The second element is 2, which is equivalent to item. At this point, the return value of the apply method is 2, which will be used as the value of last for the next scan calculation. The next return value will definitely be 2 * 3, which is 6. The test code is as follows:
Observable.just(1, 2, 3, 4, 5)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer last, Integer item) throws Exception {
Log.i(TAG, "accept--last->" + last);
Log.i(TAG, "accept--item->" + item);
return last * item;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "accept--->" + integer);
}
});
The execution result of the above code is as follows:
accept--->1
accept--last->1
accept--item->2
accept--->2
accept--last->2
accept--item->3
accept--->6
accept--last->6
accept--item->4
accept--->24
accept--last->24
accept--item->5
accept--->120
To operator#
toList()#
The toList operator converts a series of emitted data into a List and then emits it all at once. The official diagram for toList is as follows:
The test code is as follows:
Observable.just(1, 2, 3, 4)
.toList()
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
Log.i(TAG, "accept--->" + integers);
}
});
The execution result of the above code is as follows:
accept--->[1, 2, 3, 4]
toMap(keySelector)#
The toMap operator converts the events to be emitted into a Map format according to specified rules and then emits it all at once. The official diagram for the toMap operator is as follows:
The test code is as follows:
Observable.just(1, 2, 3, 4)
.toMap(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "key" + integer;
}
})
.subscribe(new Consumer<Map<String, Integer>>() {
@Override
public void accept(Map<String, Integer> map) throws Exception {
Log.i(TAG, "accept--->" + map);
}
});
The execution result of the above code is as follows:
accept--->{key2=2, key4=4, key1=1, key3=3}
The transformation operators in RxJava are basically as described above, and specific usage should be combined with actual needs.