Press "Enter" to skip to content

RxJava Operators – Part 5: Transforming Operators

0

There are times we need to transform an item emitted by the source into another one and only then emit it downstream. If you are looking for something like that, RxJava provides some operators to do this job such as buffer, window, flatMap, map, etc. 

The goal of this article is to show six variants of buffer() operator, as well as one example of window() operator (once it is pretty similar to the buffer operator, we do not want to bother you with repeated examples, so we will show only one example of window operator). Later, we will show two variants of scan() operator. If you want to become familiar on what RxJava provides to transform Observables, check the docs.

Here you can find a demo app that implements all examples we will demonstrate throughout this article.

Please, check below the previous articles of this series:

buffer() Operator w/count

Basically, buffer() operator transforms an Observable that emits items into an Observable that emits buffered collections of those items. There are many variants that use different ways to buffer items prior to emit them. This example shows a variant which accepts a count parameter (N), that collect N emitted items and only then it emits an observable with the buffered items. If the source observable emits an error notification or completes, buffer(N) will first emits what it has into the buffer and then propagates the source notification downstream.

The example below shows a source Observable that will emit 18 items. Since buffer operator uses count value of 4, it will buffer 4 items and then emit an Observable with those items. This is what we will get when running this example:

[0,1,2,3], [4,5,6,7], [8,9,10,11], [12,13,14,15],[16,17]

Note that the last emitted buffer contains only 2 items, since this is what was last emitted by the source prior to complete, so, when buffer receives the “onCompleted” notification, it first emit its buffer and then completes.

buffer() Operator w/timespan

If instead of using a count value to collect items, we need to use a timespan, buffer() operator has a variant that accepts it. It will collect emitted items during that timespan and then emit an Observable with the buffered items.

Our example will buffer emitted items each 700ms. Since source is emitting items using a random delay between 100 and 300ms (this is done by the “emitItems” method), it will emit buffers of from 2 to 6 items. Similar to the variant with a count, when source Observable completes or emit an error notification, it will first emit its buffer, and then propagates source notification downstream.

buffer() Operator w/count and timespan

This variant accepts both a count and a timespan parameters. It always emits something when it’s timespan elapses, regardless whether its buffer is fewer than count. To better illustrate it, let’s show an example that uses a count of 4 and a timespan of 700ms. This will make buffer() operator to emit group of 4 items or N items that is emitted in a window of 700ms, which comes first. In this example, source Observable will emit by using a delay of 200ms between each emission:

For the above example, buffer will emit an Observable every 700ms, no matter of it contains 0 or more items on its buffer. It will also emit when its buffer reaches 4 items. That being said, this is what we should get when running this example:

  1. At time   0  – Timespan started
  2. At time 200  – Source emits number: 0
  3. At time 400  – Source emits number: 1
  4. At time 600  – Source emits number: 2
  5. At time 700  – Timespan elapsed. buffer emits: [0, 1, 2]. Another timespan is started.
  6. At time 800  – Source emits number: 3
  7. At time 1000 – Source emits number: 4
  8. At time 1200 – Source emits number: 5
  9. At time 1400 – Timespan elapsed. buffer emits: [3, 4, 5]. Another timespan is started.
  10. At time 1400 – Source emits number: 6
  11. At time 1600 – Source emits number: 7
  12. At time 1800 – Source emits number: 8
  13. At time 2000 – Source emits number: 9
  14. At time 2000 – Count reached 4. buffer emits: [6, 7, 8, 9]
  15. At time 2100 – Timespan elapsed. buffer emits: []. Another timespan is started.
  16. At time 2200 – Source emits number: 10
  17. At time 2400 – Source emits number: 11
  18. At time 2600 – Source emits number: 12
  19. At time 2800 – Timespan elapsed. buffer emits: [10, 11, 12]. Another timespan is started.

We can see at line 14 that buffer() emitted due to the count reached 4, but this has no impact on the timespan (at this time timespan is about 600ms). After 100ms (line 15), timespan hits 700ms and buffer() emits again, but at this time, since no item was emitted by the source Observable since the last emission, buffer() will emit an empty buffer. We will see later on this article that when using window() operator, this may be a little different.

buffer() Operator w/timespan and timeshift

The fourth variant we will show uses a timestamp and also a timeshift that informs the period of time after which a new buffer will be created.

Our example will use a timespan of 600ms, which means items will be buffered for this period. It will also use a timeshift of 3 seconds, meaning that after each emission, all items emitted from the source Observable during 3 seconds will be ignored. After that, a new window is created for more 600ms and so on.

When running this example, since source Observable is emitting items each 200ms, we should get a result like this:

[0, 1], [14, 15, 16], [29, 30, 31].

Let’s get into the details:

  • At time   0  – buffer opens a window
  • At time 200  – Source emits number: 0
  • At time 400  – Source emits number: 1
  • At time 600  – Window is closed. buffer emits: [0, 1]. Next window will be opened only at time 3000 due to the timeshift
  • Items from 2 to 13 will be emitted while no window is opened, so they will be ignored.
  • At time 3000  – Another window is opened
  • At time 3000 – Source emits number: 14
  • At time 3200 – Source emits number: 15
  • At time 3400 – Source emits number: 16
  • At time 3400 – Window is closed. buffer emits: [14, 15, 16]. Next window will be opened only at time 3000 due to the timeshift (each 3 seconds)
  • … same behaviour until source completes

buffer() Operator w/boundary

This variant accepts an Observable as parameter (called boundary). It collects items from the source Observable until boundary emit something or source completes. When it happens, all buffered items are emitted downstream. Note that items emitted from the source after boundary emits will be ignored.

The example below starts buffering items until doAContinuouslyOperation() method emits something (this is a fake operation that takes between 3 and 6 seconds to be completed, and then, it emits an Observable). Then, buffer() operator stops collect items and emits its buffer.

Here you can find a nice example of buffer() with boundary where the author uses a debouce() operator as the boundary Observable parameter.

buffer() Operator w/selectors

This variant uses two selectors to control when a buffer will start and stop collecting items from the source. When the first selector (i.e.: bufferOpenings) emits something, buffer() operator starts buffering emitted items. It will collect items until the bufferClosingSelector (i.e. a Func1) emits an item. Basically this variant gives us full control on when to turn collection on and off.

To illustrate this variant, we created two helper methods,  doAContinuouslyOperation(…) and doASecondContinuouslyOperation(). The first one will be used as the bufferOpenings and will simulate an operation that sleeps for some seconds and then returns an Observable. While it is doing its task, all items emitted by the source will be ignored. As soon as it returns, it will alert buffer() operator to start buffering items. It will also trigger the bufferClosingSelector (the doASecondContinuouslyOperation method). While it does its task (another fake operation that sleeps for some seconds), buffer collects items emitted from the source. When bufferClosingSelector returns, buffer() will stop collect items and return all its buffered items.

window() Operator w/count and timespan

Now that we saw some variants of buffer operator, let’s see one example of window operator. Basically window operator is similar to the buffer, but instead of collecting items into a data structure, it uses a separate Observable. Usually it contains the same variants as its cousin window.

For this variant, when comparing it to the buffer operator, although they both accept the same parameters, they have a slightly different, basically regarding to what happen when count reaches its size.

Docs says “…it closes the currently open window and opens another one every timespan period of time, or whenever the currently open window has emitted count items…”. So, when we started to analyze it, we realized, when currently open window closes due to the timespan elapses, another one is created immediately. So far so good… But, when count reaches its size, currently open window also closes (making it to emit), but a new window will only open either at the next timespan or if source emits an item prior the next timespan, which comes first. 

The difference from the buffer() operator is that buffer() will always emit something every timespan (even an empty Observable), regardless whether it emitted something during that timespan due to its count reached its size. So, if we define a timespan of 700ms, it will always emit something every 700ms. 

Let’s see an example similar to the one we demonstrated for the buffer() operator that uses the same parameters. Here we will use the same values: a timespan of 700ms and a count of 4. Also, source Observable will emit items every 200ms:

When running this example, this is what we should get:

  1. At time   0  – buffer opens a window
  2. At time 200  – Source emits number: 0
  3. At time 400  – Source emits number: 1
  4. At time 600  – Source emits number: 2
  5. At time 700  – Window is closed. buffer emits: [0, 1, 2]. Another window is opened
  6. At time 800  – Source emits number: 3
  7. At time 1000 – Source emits number: 4
  8. At time 1200 – Source emits number: 5
  9. At time 1400 – Window is closed. buffer emits: [3, 4, 5]. Another window is opened
  10. At time 1400 – Source emits number: 6
  11. At time 1600 – Source emits number: 7
  12. At time 1800 – Source emits number: 8
  13. At time 2000 – Source emits number: 9
  14. At time 2000 – Count reached 4. buffer emits: [6, 7, 8, 9]. Current window is also closed.
  15. At time 2100 – Another window is opened.
  16. At time 2200 – Source emits number: 10
  17. At time 2400 – Source emits number: 11
  18. At time 2600 – Source emits number: 12
  19. At time 2700 – Window is closed. buffer emits: [10, 11, 12]. Another window is opened
  20. At time 2800 – Source emits number: 13
  21. At time 3000 – Source emits number: 14
  22. At time 3200 – Source emits number: 15
  23. At time 3400 – Source emits number: 16
  24. At time 3400 – Count reached 4. buffer emits: [13, 14, 15, 16]. Window is also closed. Another window is opened.
  25. At time 3600 – Source emits number: 17
  26. At time 3800 – Source emits number: 18
  27. At time 4000 – Source emits number: 19
  28. At time 4100 – Window is closed. buffer emits: [17, 18, 19].

scan() Operator

Also known as accumulator, scan() operator applies a function to the emitted items and then returns the result of that function. It also feeds that result along with the next emitted item, so we have a chance to work with the previous result when processing the next item.

In order to understand it better, let’s see an example. It will emit 7 random number (between 0 and 10 each one). When an even number is emitted, our function will sum it to the accumulated value (i.e. the previous result of the function). For odd numbers, our function will only return the previous result.

When running this example, if source emits numbers 3, 8, 7, 2, 1, 6 and 4, the final result should be 23. Let’s see it step by step:

  • Source emits number 3 → Since it is an odd number, scan will just return it.
  • Source emits number 8 → Even number, so accumulator function will sum it to the previous result (i.e. number 3), returning 11.
  • Source emits number 7 → Since it is an odd number, just return previous result (11).
  • Source emits number 2 → Even number, returns a sum of it and previous result (11), returning 13.
  • Source emits number 1 → Since it is an odd number, just return previous result (13).
  • Source emits number 6 → Even number, returns a sum of it and previous result (13), returning 19.
  • Source emits number 4 → Even number, returns a sum of it and previous result (19), returning 23.

scan() Operator with a Seed

There is a variant of scan operator that accepts a seed which is applied to the first emitted item (along with the accumulator function). This seed is used in place of the accumulator, since there is no value of it yet. It is useful when we need to start our sequence with a predetermined value.

The example below will emit 10 numbers sequentially (starting from 0) and will sum all emitted number to the of the previous result (i.e.: the accumulator). But since we are using a seed with a value of 3, we can see that the accumulator for the first item has a value of three. We can see it in the log entry it generates:

TransformingOperatorsExampleActivity: emitItems() – Emitting number: 0

TransformingOperatorsExampleActivity: scan() – seed: 3 – accumulator: 3 – item: 0

A classical example of scan() operator use is the Fibonacci Sequence. Here there is a nice example that implements it.

Conclusion

As we could see, there are many ways to transform an emitted item into another. Specially window() and buffer() operators, you can find many variants of them. You may have noticed the absence of the most common transforming operators like map and flatMap. The reason is there are many great articles about them out there, like this, this and this, and I feel I have nothing to add about them.

So, as usual, if you have any question, please, leave a comment below. And stay tuned. On the next article we will talk about timeout operator.