Press "Enter" to skip to content

RxJava Operators – Part 7: Mathematical and Aggregate Operators


RxJava provides some operators that operate on the entire sequence of items emitted by an Observable. They are known as mathematical and aggregate operators. This means that they first wait for the source to complete, and only then, they construct their emissions. Based on that, we need to ensure source is not infinite nor will take longer than we expect to finish.

With the mathematical operators we can, for example, get the max value emitted by a source, sum all emitted items, get the average of emitted items, and so on. Note they are part of the rxjava-math library.

On this article we will present some variants of collect() and reduce() aggregate operators. By using them, whenever source emits an item, it is passed to a function which we can change it and return a modified value to the next iteration, until source terminates. We will also show two mathematical operators (since they are very simple): sumInteger() and max() operators. 

Here you can find a demo app that implement all examples demonstrated on this article.

Please, check below the previous articles of this series:

sumInteger() Operator

Could not be more simple: it calculates the sum of numbers emitted by an Observable and emits this sum. Once again, be sure source will complete at some point.

This example will emit 20 integers, and sumInteger operator will wait for the source to complete and then emit a single value with the sum of all emitted items. You can also calculate sum of other primitive types by using sumDouble, sumFloat and sumLong operators.

max() Operator

Similar to sumInteger operator, max will emit a single value containing the max value emitted by the source.

This example will emit 20 random numbers. When source completes, max operator will emit a single value containing the max emitted number by the source.

collect() Operator

By definition, collect operator “…collects items emitted by the source Observable into a single mutable data structure and returns an Observable that emits this structure”. Since this data structure (also known as stateFactory) is mutable, there is no overhead on creating a new object on every interaction (which is supposed to happen when using reduce operator, for example).

The stateFactory, then, will be passed to a function that also accepts the current emitted item. Inside that function we can use current emitted item to change our stateFactory the way we want. 

So, in order to illustrate it, let’s see a very basic example that uses an array of Integer as the stateFactory and initialize it with a couple of items. Since our function is basically adding emitted items to the stateFactory, the final result should be stateFactory initial values followed by all emitted items.

reduce() Operator

The reduce() operator applies a function (known as accumulator) to each item emitted by the source, make the result available to the next accumulator call, and emit the final result when the source terminates. If the source emits no item, it terminates with an error.

The example below shows how to return the max value emitted item by using reduce() operator.

In this example, if source emits 15, 13, 4, 16 this is how reduce should behave:

  • Item 15 is emitted → It is associated to the accumulator.
  • Item 13 is emitted → accumulator is still 15, and item parameter holds number 13 (the item just emitted). Returning 15, since 15 is greater than 13.
  • Item 4 is emitted → accumulator is still 15, and item parameter holds number 4. Returning 15, since 15 is greater than 4.
  • Item 16 is emitted → accumulator is still 15, and item parameter holds number 16. Returning 16, since 16 is greater than 15.
  • Source terminates.
  • reduce() operator returns number 16.

reduce() Operator w/Global Seed

There is another reduce variant which accepts an initial value that is applied to the accumulator function. The difference from the version with no seed is that if we inform an initial value and source emits no item, seed value will be the final result and it will terminate accordingly.

One point to note is that the initialValue is shared among all subscribers and may cause problems if it is mutable and you subscribe it more than once. If you run the example below without any modification you will not notice this behaviour, since there is only one subscription. I realised this behaviour in the hardest way. After struggling a lot, I finally found a very good SO question that lead me to find out what I was doing wrong. I strongly suggest you to read it and also this link. They contain many information about reduce operator.

To demonstrate that the initialValue is being shared on every subscription, just uncomment the second and the third subscription and run this example again (this is how we are using on the demo app).

reduce() Operator with a null Seed

Ok, but what if we need to use a seed but without sharing it between subscriptions? Well, this is very simple. Use null as the initialValue parameter and create it inside the accumulator function instead. This way, this instance will not be shared anyway. Once again thanks to this great SO question.

When running this example, even when subscribing observable multiple times, we can see the same result on all of them. This clearly shows that seed is no longer shared between multiple subscriptions.


Mathematical and aggregate operator are quite easy to use. The only point we have to keep in mind is that they depend from the source to terminate at some point. Therefore, always be sure source will not emit forever.

As usual, if you have questions, please, leave a comment below.