Press "Enter" to skip to content

RxJava Operators – Part 9: Backpressure

0

Backpressure is a common situation that occurs when an Observable produces items faster than they can be consumed. Currently, many RxJava operators uses backpressure internally making the problem of faster emissions to be moved upstream where they may be more appropriately handled.

But fortunately there are some approaches we can use to try to alleviate it. RxJava provides some ordinary operators we can use in order to select items instead of emit downstream all items emitted from the source. Some of them are sample(), trottleLast(), debounce(), etc. There are also some specific operators that handle backpressure by buffering items or drop them when they are overproducing. We can also handle it manually by requesting for items only when we are able to consume them or even control how many items we will request in the producer side (also called by reactive pull request).

On this article we will present some of these approaches by showing some examples and as well as some considerations. As usual, you can find a demo app that implements all examples shown on this article.

Please, check below the previous articles of this series:

The Problem

As described above, when an Observable emits items faster than they can be consumed, there will be a backpressure. Since most RxJava operators uses backpressure internally, when an operator stops accepting items, the previous operator (in the chain) will start buffering them until it also stops accepting items, and so on. If, at a certain point, the chain cannot deal with the overproducing items, it will throw a MissingBackpressureException.

The example below shows what happens when an Observable emits items faster than they can be consumed. Since we did not provide a way to deal with the fast emission, after a while it will terminate by throwing a MissingBackpressureException.

Dealing with Backpressure by using Ordinary Operators

There are some operators that can reduce the chance to have a backpressure by reducing the number of emitted items downstream. Some of them are: sample(), debounce, throttleLast(), etc.

Note that by using one of them, it does not mean it will solve the problem, since if we still emit items much faster than they can be consumed, we might still end up in the backpressure situation.

Let’s see an example: If we add throttleLast(10) operator in the previous example, it means that only the most recently emitted item during each period of 10ms will be emitted downstream. This will reduce the burst of emitted items by the source (we are emitting items each 1ms!), but we might still end up in a backpressure situation because we are still emitting items faster than they can be processed. Now, if we change the throttleLast intervalDuration to 100ms, we will probably not get into a backpressure situation, since the subscriber will be able to process all emitted items accordingly.

Specific Backpressure Operators

RxJava provides some specific operators that can handle backpressure by, for example, buffering items until they can be processed or drop them when there is no any pending request from  downstream. We will get into two of them: onBackpressureBuffer() and onBackpressureDrop().

onBackpressureBuffer() Operator

onBackpressureBuffer() operator maintains a buffer of all unobserved emissions from the source Observable and emits them to downstream observers according to the requests they generate. When its buffer’s capacity is exceeded, it emits a BufferOverflowException, drops all undelivered items, unsubscribes from the source, and notifies the producer with onOverflow. Let’s see some of its variants.

The first onBackpressureBuffer(…) variant we will show is one that accepts a capacity. Once that buffer’s capacity is exceeded, it will terminate with an error.

The second onBackpressureBuffer(…) variant is one that accepts a capacity and an action. Basically this action gives us a chance to react before it unsubscribing the source. For our example we are just printing a message on the console. Note that, although we can react when its buffer is exceeded, this does not mean we can recover from that state. After our action returns, it will terminate with an error.

onBackpressureBuffer(…) has another variant which accepts, besides an action (that is called whenever an item is buffered), a BackpressureOverflow.Strategy, that can be one of these:

  • ON_OVERFLOW_ERROR (default)
  • ON_OVERFLOW_DROP_OLDEST
  • ON_OVERFLOW_LATEST.

This is a little different from those two we shown above since, depends on which strategy we use, it will not finish when its capacity is full.

If we use ON_OVERFLOW_ERROR strategy, this variant will act the same way as the previous example (i.e. that one which accepts an action), since as soon as the buffer’s capacity is exceeded, it will drop all undelivered items and unsubscribe from the source. In case of OLDEST or LATEST strategies, when buffer’s capacity is full, it will start discarding items according to the chosen strategy. For our example, we are using OLDEST strategy. This will make easier to visualise which items were dropped.

Since we are emitting 100 items, the first items will be emitted accordingly, but as soon as our buffer is full, the oldest emitted items will be discarded. The result may be something like this: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 60, 61, 62…99. We can see that the first fifteen items were processed accordingly, but then, since  buffer’s capacity exceeded, the oldest items started to be discarded. Only after pending items were processed, making buffer to have free slots again, items went back to be emitted accordingly.

onBackpressureDrop() Operator

onBackpressureDrop() basically drops all items it cannot process when there is no request from the downstream. It also has a variant which accepts an action that is invoked for each dropped item.

On the example below, we added a random sleep timer while processing onNext(). This will make onBackpressureDrop operator to drop some items, since there will be some lack of requests from the downstream.

Backpressure Reactive Pull

There might be a case where we want to have full control over how many/when items will be emitted. For this case we can use an approach called reactive pull request. RxJava provides a way for the subscriber to regulate the rate of an observable by calling Subscriber::request(n) method. We are going to show two slightly different examples of this approach.

The first one calls Subscriber::onRequest(n) into the Subscriber’s onStart() method, to inform Observable how many items we want it to emit at the beginning (in our case we will request only one item). Then, in the Subscriber’s onNext(), only after processing an item, we call it again to request another item (again only one item). This will make our example to process one item at a time, no matter how long it takes to be processed. Of course we could also change the number of items to be requested in the Subscriber’s onNext() at any time. This gives us a great opportunity to dynamically balance the number of requested items based on any criteria that might be useful for us.

Of course we could also change the number of items to be requested in the Subscriber’s onNext() at any time. This gives us a great opportunity to dynamically balance the number of requested items based on any criteria that might be useful for us.

The second approach will not call Subscriber::request(n) inside Subscriber’s onNext(). Instead, we will delegate such requests to the user. Whenever user wants to process more items, it will click on a button which will call Subscriber::request(n). Let’s call it as manual request. This example was based on this link.

To implement it, we will extend Subscriber and call its request(n) method at any time, which will tell the observable that it is ready to process n items.

Now that we have our subscriber, let’s instantiate it:

Next step is to subscribe an observable to it:

Now, whenever we want items to be emitted, we simply call puller.requestMore(n) method.

Conclusion

The more we know about backpressure, how it works, which operators uses it internally, etc, more prepared we are to deal with it when we run into it. Especially in complex chains, it can be really hard to identify, since a backpressure can be everywhere in the chain. We tried to show some different approaches that can help you to understand how to deal with backpressure and hope this article can be useful to someone.

If you have questions about it, please, leave a comment below.