RxJava Operators – Part 1: Filtering Operators

Hi all! After sometime away from this blog, now it is time to go back and talk about RxJava. We will publish a series of 10 articles about RxJava operators (including their variants) for the next couple of weeks. They will be divided into categories such as Combining Observables, Transforming Operators, HotObservables, etc. We will not run into all examples for all categories… this would take so long! But I still think this series is valuable 🙂 .

Notice I am not an expert in RxJava at all. So, with this series, I just want to share my thoughts and what I have been learning about RxJava since I started using it. Also notice that although RxJava2 was released a while ago, this series is based on RxJava1.

Before we Begin

Each article over this series comes with a demo app which implements all examples we will discuss. For each example, we will show a snippet which will contain a (helper) method that is expect to return an Observable. I hope they are self-explanatory. For example, a method called emitItemsOrEmpty() is expected to emit some items or an empty Observable. A method called emitError() should emit an error, and so on. At any time, if you feel confused on what those methods are about, you can check the available source code.

That being said, let’s start learning together by talking about Filtering Observables!

Filtering Operators

Filtering operators are operators we can use to selectively emit items from a source Observable. At first glance, they seem to be simple (and they are!), but sometimes we can get confused on which one to pick up, since some are very similar each other. So, understand their differences can help us to choose the one that better fits our needs.

The intent of this article is to present some operators that fit on the filtering category, and show some details of them. In the end, we expect you can be able to understand their nuances.

This is a list of all operators covered on this post:

  • first()
  • first() with predicate
  • firstOrDefault()
  • takeFirst()
  • single()
  • singleOrDefault()
  • elementAt()
  • last()
  • lastOrDefault()
  • take()
  • takeLast()
  • filter()

Here you can find the source-code for all topics covered in this article. For a complete list of filtering operators, check the documentation.

first() Operator

Let’s start with one of the most simple filtering operators: first(). Basically it takes the first emitted item and emit it downstream. If more than one item is emitted from the source Observable, only the first one is emitted, then it will terminate. But in case of an empty Observable is emitted, since first() operator expects at least one item to be emitted, it terminates with an error.

So, when we run this test, if emitItemsOrEmpty() method emits an item, that item will be emitted downstream and the chain will terminate. But if emitItemsOrEmpty() method emits an empty observable, we will see an error.

first(Func1) Operator (w/ Predicate)

first(Func1) operator is a variant of first() which accepts a predicate that is called until it returns true. Then, that item is emitted.

We can see below a predicate function that returns true in case emitted number is multiple of three. Since emitItems(N) helper method emits N random numbers, if it emits numbers 5, 8, 2, 6, the first three items will be filtered out until the last one (six, that is multiple of three), once it will make our predicate function to return true. Then, it will be emitted downstream and the chain will be terminated. But if the source Observable emits, for example, numbers 1,5,7,2,4, first() will finish with an error, since no emitted items will evaluate predicate function as true.

firstOrDefault() Operator

But what if we do not want to terminate with an error in case of no item is emitted at all, but use a default value instead? Maybe when fetching a server for some data, if that server cannot provide any value, we just want to use a local value instead. If this is the case, we can use firstOfDefault() operator which accepts a default value that will be emitted if source Observable is not able to emit any value before completion.

When running this test, in case of the source Observable emits an item, it will be emitted downstream (and the chain terminates), but if it emits an empty Observable, instead of terminate with an error as first(Func1) operator does, a default value will be emitted.

takeFirst() Operator

Another similar operator is takeFirst(), but when source Observable does not emit any item before terminates, it returns an empty Observable.

Although first(), firstOrDefault() and takeFirst() operators are quite similar, they differ when no item is emitted by the source Observable. first() will break the chain, firstOrDefault() will emit a default value, and takeFirst() will emit an empty Observable.

Note: there are some approaches to not break the chain in case of errors, but this is out of scope of this article.

single() Operator

single() operator always emits either one value or an error notification. So, basically:

  • If the source Observable emits exactly one item, that item is emitted by the single operator.
  • If more than one item is emitted by the source Observable, single() will terminate with the error “Sequence contains too many elements”.
  • If the source Observable does not emit any item before successfully completing, it will throw a NoSuchElementException and will terminate with error: “Sequence contains no elements”.

When running this test, if emitItemsOrEmpty() method emits an empty Observable or more than one item, single() will terminate with an error, but, it if emits exactly one item, single() will successfully completes.

singleOrDefault() Operator

singleOrDefault() operator is slightly different from the single() operator:

  • In case of no element is emitted, instead of terminate with an error, it will emit the default value.
  • In case of more than one item is emitted, it will behave similar to the single() operator, that is, terminate with “Sequence contains too many elements” error.
  • In case of only one item is emitted by the source Observable, it will emit it and terminate accordingly.

In short, the difference between single() and singleOrDefault() is that when no item is emitted by the source Observable, the former will terminate with error and the latter will emit the default value.

elementAt() Operator

When we know in advance which position a certain element should be, we can use elementAt() operator. But notice that elementAt(N) expects at least N items are emitted. In case of less items are emitted, an IndexOutOfBoundException is thrown. Also notice that elementAt uses a zero-based index

For example, in a game with 5 players, if we want to get the players positions, we can use elementAt() operator. Just do not forget to cover the case when accessing an invalid position (e.g.: when a player is out of the game).

last() Operator

last() operator is pretty straightforward. It will emit only the last emitted item by the source Observable.

In case of an empty observable is emitted, since last() operator expects at least one item to be emitted, it will terminate with an error (NoSuchElementException)

Notice it expects the source Observable to terminate prior to return the last emitted item. So, if you have no control over the source Observable, you need to use it carefully, since if source never terminate, last() will never emit any item at all.

lastOrDefault() Operator

Similar to last() operator, but in case of the source Observable does not emit any item, a default value will be emitted instead of an error.

take(N) Operator

All the operators we showed so far emit only one item (maybe none or an error depends on the operator). But there are cases we might want to emit more than one item. For these cases there are some operators we can use. Take(N) is one that will return N items emitted by the source Observable. Even when fewer items than N are emitted, when the source Observable terminates, those items are  emitted and the chain is terminated accordingly. In case of no items are emitted at all, it will also complete with no error.

When running this test, if emitItemsOrEmpty() method emits up to 5 items, after it completes all those items are emitted. It it emits an empty Observable, take(N) will terminate without any error.

takeLast(N) Operator

That is very simple. Emits the last N item emitted by the source Observable. In case of no item is emitted, it will completes with no error. There are many variations of it, including one that accepts a temporal duration rather than a quantity of items and another one which combines a timer and a number of items. Take a look on the Javadoc for further details.

filter() Operator

We demonstrated above when using first(w/predicate) operator, that it terminates after predicate evaluates as true. But if we do not want terminate our chain, but continuously emit items when that predicate function returns true, we can use filter operator instead.

In the example below, it will emit all numbers multiple of three until source Observable completes.

Conclusion

With this article, we hope you can have understood how filtering operations work and their differences. As we already mentioned, although they are quite simple, it is important to know them well in order to always get the right tool for the right job.

If you have questions, please leave a comment below. On the next article of this series we will keep talking about filtering operators. 

  • Great post! I think this series will be the best quick reference guide for every developer that is using Rx (not just RxJava). I really like the idea of the demo app.
    You mentioned that this series you be focused on RxJava1. Is that any difference between RxJava1 and RxJava 2 concerning these filtering operators?

    • androidahead

      Hi Adriano. Thanks for your kind words. Actually I never used RxJava2, but only read a little about it. I made a small research, and did not find any difference in these filtering operators in terms of their functionalities. But of course, it would be nice if someone with more experience could help on this. When I finish this series, I am planing to convert all examples to RxJava2, so stay tuned.