Press "Enter" to skip to content

RxJava Operators – Part 3: Combining Observables

0

Sometimes we need to combine the output of more than one Observable. If that is the case, RxJava provides some operators to do this job. On this third part of the RxJava series, we are going to take a look at some of these operators. They are:

  • merge(w/Iterable of Observables)
  • merge(w/multiple Observables)
  • merge() with error notifications
  • mergeDelayError()
  • mergeWith()
  • zip()
  • zipWith()
  • combineLatest()
  • switchOnNext()

You can find here a demo app which implement all examples showed on this article.

Check here for the first article of this series and here for the second one.

merge(w/Iterable of Observables) Operator

Merge operator basically combines the output of multiple Observables into a single Observable. The result will act like a single Observable, does not matter of how many Observables were combined. RxJava provides over than ten variants of merge operator.
This example will show a merge variant that accepts an Iterator of Observables. Note that, unlike concat operator, merge may interleave the items emitted by the merged Observables.

We first create two lists of Observables and then pass them to the merge operator. For the odd numbers list, emitNumbers() method will sleep for 250ms between emissions. For the even numbers, it will sleep for 150ms. This way, we should see something like this when running this example:

  • At time 150 → emitNumbers() – Emitting number: 2
  • At time 250 → emitNumbers() – Emitting number: 1
  • At time 300 → emitNumbers() – Emitting number: 4
  • At time 450 → emitNumbers() – Emitting number: 6
  • At time 500 → emitNumbers() – Emitting number: 3
  • At time 650 → emitNumbers() – Emitting number: 8
  • At time 750 → emitNumbers() – Emitting number: 5

The final result should be a sequence like this: 2, 1, 4, 6, 3, 8, 5, 10, 12, 7, 14, 9, 11, 13, 15.

Just as a comparison, if we change merge operator to concat operator, the result should be the sequence 1, 3, 5, 7, 9, 11, 13, 15, 2, 4, 6, 8, 10, 12, 14. This clearly shows us that while concat operator emits Observables one after another, merge operator emits items as they are emitted by the source.

merge(w/multiple Observables) Operator

Depends on the situation we might want to pass multiples Observables, one by one, instead of a list of Observables to the merge operator. There are variants which accepts up to 9 Observables. We will show one that accepts two Observables.

So, we can rewrite the previous example by passing individual Observables to the merge operator. If we use the same timeouts (in the emitNumbers method), the result should be the same.

But, to make things more interesting, in the demo app we added two more Observables to this example, like this:

This will give us a result like this: 1000, 2000, 2, 3000, 1, 4000, 4, 5000, 6000, 6, 7000, 3, 8, 100, 5, 10… Once again, this show us that merge may interleave the items emitted by the merged Observables.

If we change merge operator to concat operator, the final result will be the oddNumbers list first, then evenNumbers list, next someNumbers list and last someMoreNumbers list.

merge(w/multiple Observables) Operator with Error Notifications

But what happen if one Observable that are emitting, emit an error? How does merge operator react to an error?

Well, it will terminate with an error as soon as one of the sources terminates with an error. This is useful when we want to immediately terminate the whole process when an error happens.

In this example, the first two Observables will start emitting items (by using 250 and 150ms of delay between emissions each one). Then, after 1 second the third Observable will emit an error and the whole chain will terminate.

mergeDelayError() Operator

If we want a different behavior when one (or more) Observable emits an error, we can use mergeDelayError() operator instead. Basically, what it does is: after one of the sources emits an error, instead of terminate with an error immediately, it will hold off on reporting the error and keep processing all source Observables, and only when all of them complete, it will terminate with an error. One thing we need to keep in mind is that if more than one source emits an error, it may pass information about those errors in the onError, so we need to prepare our observer to accept a parameter of the CompositeException type.

The example below is similar to the previous one, but using mergeDelayError operator instead. When running it, the emitError() method will emit and error after 1 second, but, since we are using mergeDelayError operator, we will see that all items will be emitted from the first two Observables, and only then it will terminate with an error.

mergeWith() Operator

mergeWith() operator is quite similar to merge operator. Basically it allows us to merge sequences one by one in a chain.

This example will give the same result when using:

zip() Operator

The zip() operator combines emitted items from source Observables (from two up to nine) based on their index. When each source Observables emits an item, a function is applied over those items. On that function we can do whatever we want with emitted items. Then, the result is emitted downstream.

This example will combine two Observables (one that emits even numbers and another emitting odd numbers) by emitting a list of two numbers Ex: [1,2], [3,4], [5,6], etc.

Basically, this is what will happen when we run this example:

  • First Observable emits its first number (1).
  • Then the second Observable emits its first number (2).
  • zip function is executed and emits a list containing both numbers [1, 2]. This is what will be emitted downstream.
  • First Observable emits its second number (3).
  • Then the second Observable emits its second number (4).
  • zip function is executed again and emits a list containing both numbers [3, 4]. This is what will be emitted downstream.
  • … and so on…

This is what will happen until any Observable terminates. Note that the zip sequence will terminate when any of the source sequences terminates. Further values from the other sequences will be ignored. For this example, the last odd number emitted (15) will be discarded, since there will be no any even number to be combined with.

zipWith() Operator

There is also a zip variation called zipWith. It allows us to combine result of zip operators in chain.

This example will combine items emitted by the first Observable with the second one by applying a function over them (a sum of both values). Then, that result will be combined with items emitted by the third Observable and another function will be applied upon them (a sum of the previous result to the items emitted by the third Observable). Since each Observable is emitting numbers sequentially, the result should be: 3, 6, 9, 12, 15, 18 and 21.

combineLatest() Operator

From the docs: “When any of the source Observables emits an item, CombineLatest combines the most recently emitted items from each of the other source Observables, using a function you provide, and emits the return value from that function.”

That being said, let’s see an example. For our example, we are using a simple function which just creates a list of the last item emitted by both Observables.

Basically this is what we will get when running this example:

  • 1 → This is emitted at time 1000ms by the first Observable
  • 2 → This is emitted at time 1500ms by the second Observable
  • [1,2] → This is what will be combined, based on the provided function, since these numbers were the last item emitted by each Observable
  • 3 → This is emitted at time 2000ms by the first Observable
  • [3,2] → This is what will be combined, (number 3 just emitted by the first Observable and number 2 that is the last item emitted by the second observable)
  • 4 → This is emitted at time 3000ms by the second Observable
  • [3,4] → This is what will be combined, (number 3 emitted by the first Observable and number 4 that was just emitted by the second Observable)
  • 5 → This is emitted at time 3000ms by the first Observable (note this was emitted at the same time as the latest item emitted by the second Observable)
  • [5,4] → This is what will be combined

switchOnNext() Operator

switchOnNext Operator is one of those weird operators (at least for me). Let’s see what docs say about it:  “… it takes an observable that emits Observable…”… “Each time it observes one of these emitted Observables, the Observable returned by switchOnNext begins emitting the items emitted by that Observable. When a new Observable is emitted, switchOnNext stops emitting items from the earlier-emitted Observable and begins emitting items from the new one.”.

If we look at the marble diagram, we can see that behavior described above: switchOnNext emits items from the first Observable (the circles) until the second Observable (the triangles) start emitting. Then, it unsubscribes from the first Observable and start emitting items from the second one. We can also see that the first Observable still emits one item (the yellow circle) after switchOnNext starts watching to the second observable, but prior to be unsubscribed, but this item is dropped and never emitted downstream.

In order to reproduce it, we can do something like this:

On this example, the first Observable will emit odd numbers each 180ms, and switchOnNext will emit downstream these items. Then, after 1000ms, the second Observable will start emitting even numbers, making switchOnNext to stop emitting items from the first one, unsubscribe from it and start emitting downstream items from the second Observable. This is what we should get when running this example:

  • At time 180ms: First Observable emits number 1
  • At time 360ms: First Observable emits number 3
  • At time 540ms: First Observable emits number 5
  • At time 720ms: First Observable emits number 7
  • At time 900ms: First Observable emits number 9
  • At time 1000ms: switchOnNext starts observing the second Observable. It will unsubscribe from the first one
  • At time 1080ms: First Observable (still) emits number 11. This is not emitted downstream
  • At time 1500ms: Second Observable emits number 2
  • At time 2000ms: Second Observable emits number 4
  • At time 2500ms: Second Observable emits number 6
  • At time 3000ms: Second Observable emits number 8
  • At time 3500ms: Second Observable emits number 10
  • At time 4000ms: Second Observable emits number 12
  • At time 4500ms: Second Observable emits number 14

There is an interesting SO question that explains this in details.

But we can go further and use switchOnNext operator to repeat an action whenever another action is triggered. In order to do so, the first Observable (we can call it as the outer Observable) will be the trigger. Whenever it emits something, the second Observable (called as inner Observable) will be discarded (if there is already one) and a new one is created. We got this idea from this article. See how it looks like:

On this example, outer Observable will emit items every 600ms. Then, for each item it emits, inner Observable will start emitting items each 180ms (meaning that three items will be emitted until the outer Observable emits a new item). When the outer Observable emits a new item, the inner Observable will be discarded and a new one will be used to emit items.

This is what happens when running this example:

  • 0 → This is emitted at time 0ms by the outer Observable. Note it will be used only to trigger inner Observable.
  • 0, 1, 2 → Inner Observable will start emit items with 180ms of delay until the outer Observable emits a new item. These items are what will be propagated downstream.
  • 1 → Outer Observable will emit it after 600m. This will make the inner Observable to be discarded and a new one will be created and start emitting items.
  • 0, 1, 2 → Inner Observable will emit these three items (until outer Observable emits something). These items are what will be propagated downstream.
  • 2 → Then, after more 600ms, the outer Observable will emit a new item. Once again, inner Observable is discarded, a new one is created and start emitting items.
  • 0, 1, 2 → Inner Observable will emit these three items (until outer Observable emits something). These items are what will be propagated downstream.

In the end, we will see as a result a sequence of numbers 0, 1 and 2 (from the inner Observables). 

Note: similar to mergeDelayError, there is a variation for switchOnNext called switchOnNextDelayError that delays any exception until all Observables terminate.

Conclusion

As we could see, RxJava provides different operators to combine multiples Observables. If we go to the docs, we can see it provides more than 10 variants only for the merge operator! After combining multiple data, the result usually acts like a single Observable. We also demonstrated some operators variants which delay any error that happens until all source terminate. We hope with that, you can have now a better idea on how to combine multiple source of Observables.

Thanks for reading!