Press "Enter" to skip to content

RxJava Operators – Part 2: More about Filtering Operators

0

In the previous article of this series, we talked about filtering operators. We demonstrated operators like single(), elementAt(), take(), etc. Now we want to present some other operators that still fit in the filtering category, but they are also useful to avoid the need for backpressure.

Basically, when an Observable emits items faster than they can be consumed, there will be a backpressure. Depends on the situation, if a chain cannot deal with the overproducing items, it will throw a MissingBackpressureException. So, if our Observable is overproducing items, and we do not need/want to process all of them, but instead just emit some items, we can handle this situation by using some ordinary operators.

In this article we will present some of these ordinary operators, like:

Note that RxJava also provides some specific operators such as onBackpressureBuffer and onBackpressureDrop, that can handle backpressure, but we will talk about them in a future article of this series.

You can find here a demo app which implements all the examples shown on this article.

sample(period) Operator

This variant of sample operator emits the most recent items emitted by the source within periodic time intervals. It is useful when source emits a large number of items and we do not need to process all of them. So, we can just take some samples of emitted items and emit them downstream.  

The example above will emit 20 items (from 0 to 19), each one with 500ms of delay, but since we are using sample() operator with 4 seconds of sample rate, it will emit items 6, 14 and 19, since these are the last items emitted for each period of 4 seconds.

Note: RxJava provides a similar operator called throttleLast.

sample(Observable) Operator

There is another variant of sample operator which accepts an Observable. It will return an Observable that emits the last emitted item from the source when the sampler Observable emits something or completes. This is useful when, instead of set a pre-defined interval to get a sample, we want to emit a sample based on an action (i.e. a sampler Observable).

In the example below, we created doAFakeOperation() method which will just sleep between 1 and 5 seconds and then return. Meanwhile, emitItems(…) method will emit items, each one with 500ms of delay. When doAFakeOperation() returns (somewhere between 1 and 5 seconds), the most recent item emitted by the emitItems() will be emitted downstream by the sample operator.

Just as a hypothetical example, we could use this sample operator to monitor a system property (e.g. available memory, CPU usage, etc) whenever an operation finishes. We just need to create a method to emit that system property periodically and when our operation finishes (does not matter how long it takes to be executed), sample would emit the last system status.

throttleFirst(windowDuration) operator.

throttleFirst operator is quite similar to the sample operator, but instead of it emits the most recent emitted item, it will emit the first item emitted within a window duration.

On the above example, since emitItems will emit items each 500ms, and throttleFirst windowDuration is set to 4 seconds, it should emit items 0, 8 and 16.

debounce(timeout) Operator

By definition, “debounce operator only emits an item from an Observable if a particular timespan has passed without it emitting another item”. That being said, if debounce uses a timeout equals to 350ms, it will only emit an item downstream when source Observable does not emit any item within 350ms.

Let’s see an example:

This example emits 40 items, each one between 100 and 500ms of delay (this is done by the emitItems method). Since debounce timeout is set to 350ms, it will only emit items when the source does not emit items within 350ms. Below we can find an output of this test (note that source is emitting by using a random delay, so results may vary each time you run this example). When source emits an item in a period shorter than 350ms, this item is dropped, but as soon as it has a delay greater than 350ms, the last emitted item is emitted downstream:

  • emitItems() – Emitting number:   0 and sleeping for 313ms
  • emitItems() – Emitting number:   1 and sleeping for 468ms → Previous tem (item 0) is emitted,  since item 1 took more than 350ms to be emitted
  • emitItems() – Emitting number:   2 and sleeping for 141ms
  • emitItems() – Emitting number:   3 and sleeping for 252ms
  • emitItems() – Emitting number:   4 and sleeping for 213ms
  • emitItems() – Emitting number:   5 and sleeping for 327ms
  • emitItems() – Emitting number:   6 and sleeping for 328ms
  • emitItems() – Emitting number:   7 and sleeping for 201ms
  • emitItems() – Emitting number:   8 and sleeping for 119ms
  • emitItems() – Emitting number:   9 and sleeping for 270ms
  • emitItems() – Emitting number:  10 and sleeping for 451ms → Previous item (item 9) is emitted, since item 10 took more than 350ms to be emitted
  • emitItems() – Emitting number:  11 and sleeping for 227ms
  • emitItems() – Emitting number:  12 and sleeping for 499ms → Previous item (item 11) is emitted, since item 12 took more than 350ms to be emitted
  • emitItems() – Emitting number:  13 and sleeping for 106ms
  • emitItems() – Emitting number:  14 and sleeping for 327ms
  • emitItems() – Emitting number:  15 and sleeping for 372ms → Previous item (item 14) is emitted, since item 15 took more than 350ms to be emitted

A good example of debounce operator usage is when dealing with search box. Instead of making a search whenever user types something, we can use debounce to prevent that search when user is typing too fast (ex. within 250ms). Once user stops typing for 250ms, it will emit downstream and we can make a search. See how it works below (note that this snippet uses RxBinding which is out of scope of this article).

We already used debounce operator in a previous article about TypingIndicator.

debounce(w/Func1) Operator

Debounce operator has a variant which accepts a function (also called as debounceSelector) that drops items emitted by the source Observable that are followed by another item within a computed debounce duration. This means that, while Func1 is processing an item, if source observable emits another item, that previous item (that is being processed) will be dropped and the newer one will start being processed.

The example below emits 10 items, each one with 1100ms of delay (this is done by the emitItems method). Then, we will simulate a fake operation making debounce selector to sleep somewhere between 1 and 5 seconds. Two cases are possible:

  • If debounce selector sleeps for 2 or more seconds, source Observable will emit another item while the current one is still being processed. This will make this current item to be discarded and the new emitted item will start being processed.
  • If debounce selector sleeps for 1 second, that means our fake operation will finish before the next emission, so it will not be dropped, but emitted downstream.

To illustrate this example, let’s see a step-by-step when running this test. We can see that whenever debounce selector sleeps for more than 1 second, current item is discarded, since source Observable emits another item while the previous one is still being processed. On the other hand, when selector sleeps for only 1 second, that item is emitted. The result for this example will be items 4, 5 and 10 to be emitted.

  • Time 0 – Source emits number 0
    Debounce Selector will sleep for 2 seconds while processing item number 0.
  • Time 1100 – Source emits number 1
    Item 0 is dropped, since while it was being processed, source emitted item number 1.
    Debounce Selector will sleep for 4 seconds while processing item number 1.
  • Time 2200 – Source emits number 2
    Item 1 is dropped, since while it was being processed, source emitted item number 2.
    Debounce Selector will sleep for 3 seconds while processing item number 2.
  • Time 3300 – Source emits number 3
    Item 2 is dropped, since while it was being processed, source emitted item number 3.
    Debounce Selector will sleep for 3 seconds while processing item number 3.
  • Time 4400 – Source emits number 4
    Item 3 is dropped, since while it was being processed, source emitted item number 4.
    Debounce Selector will sleep for 1 second while processing item number 4.
    Item 4 is emitted downstream, since it was finished processing prior the source emits another item.
  • Time 5500 – Source emits number 5
    Debounce Selector will sleep for 1 second while processing item number 5.
    Item 5 is emitted downstream, since it was finished processing prior the source emits another item.
  • Time 6600 – Source emits number 6
    Debounce Selector will sleep for 3 seconds while processing item number 6.
  • Time 7700 – Source emits number 7
    Item 6 is dropped, since while it was being processed, source emitted item number 7.
    Debounce Selector will sleep for 4 seconds while processing item number 7.
  • Time 8800 – Source emits number 8
    Item 7 is dropped, since while it was being processed, source emitted item number 8.
    Debounce Selector will sleep for 2 seconds while processing item number 8.
  • Time 9900 – Source emits number 9
    Item 8 is dropped, since while it was being processed, source emitted item number 9.
    Debounce Selector will sleep for 4 seconds while processing item number 9.
  • Time 11000 – Source emits number 10
    Item 9 is dropped, since while it was being processed, source emitted item number 10.
    Debounce Selector will sleep for 1 second
    Item 10 is emitted downstream, since it was finished processing prior the source emits another item.

Conclusion

In this article, we explained some more filtering operators that are useful when source Observable is overproducing items. By using one of these techniques, we can select items to be emitted downstream, preventing (or alleviating) a chance to get a MissingBackpressureException. We hope you have enjoy it. Stay tuned to the next article of this series.