Press "Enter" to skip to content

RxJava Operators – Part 10: Hot Observables


There are basically two different types of observables: cold and hot. If we take a look on the docs, it says “… A cold Observable waits until an observer subscribes to it before it begins to emit items, and so such an observer is guaranteed to see the whole sequence from the beginning.” Regarding to the hot observables, docs says “[it] may begin emitting items as soon as it is created, and so any observer who later subscribes to that Observable may start observing the sequence somewhere in the middle.”

In short, a Hot Observable can start emitting values regardless whether there is a subscription or not. A classical example is a mouse click:

On this article, we will not get into details about cold observable, since this is what we have been discussing in all previous articles of this series. Instead, we will focus on hot observables by showing some examples for these operators:

As usual, you can find here a demo app that implements all examples we demonstrate over this article. Note that the snippets we will show on this article are a little different from what you will find on the demo app. The reason is that on the demo app we wanted to provide a more dynamic way where you can play around hot observables. This way you can easily connect or disconnect an observable as well as subscribe and unsubscribe a subscriber at any time making it easy and fun to discover how they behave and differ from each other.

Before we begin, check below the previous articles of this series:

Example #1 – connect()

There are a lot of ways to transform a cold observable into a hot one. This example will use Observable<T>::publish() method which returns a ConnectableObservable<T> instance.

ConnectableObservable is a variety of Observable that waits until its connect() method is called before it begins emitting items. When calling its connect() method, if there is already any subscriber subscribed to the ConnectableObservable, it will start receiving items. If not,  ConnectableObservable will still emit items. Later, as subscribers subscribe to it, they start receiving items emitted after the subscriptions.

In the example below, after the connect method call, ConnectableObservable will start emit items (i.e. sequential numbers starting from #0). Then, after 5 seconds a first subscriber subscribes and starts receiving items from #5. After more 5 seconds a second subscriber subscribes and starts receiving items from #10.

Note that if change a little this example by subscribing the first subscriber prior to call connect method, as soon as we call connect, it will start receiving emitted items from #0.

At any time, if we do not want to receive items anymore, we can unsubscribe a subscription (by calling unsubscribe method for the firstSubscription and/or secondSubscription instances). Note that it will not make observable to stop emitting items, but just making the subscribers to stop receiving them. Later, if we resubscribe any subscription, they will start receiving items again.

Finally, if we want our observable to stop emitting items, we can unsubscribe it by using a Subscription instance returned when calling ConnectableObservable<T>::connect().

Now, let’s see how it works on the demo app:

When we click on the “Connect” button, observable starts emitting items. Then, when we press “Subscribe First” button, we notice it starts receiving items, but only those items emitted after its subscription. The same happens when we click on the “Subscribe Second” button. As we click on the “Unsubscribe First” and “Unsubscribe Second” buttons, their respective subscriptions stop receiving items, but at this point, observable keeps emitting them. Finally, when pressing “Disconnect” button, our observable stops emitting.

Example #2 – RefCount()

refCount() keeps a reference to all the subscribers subscribed to it. When we call refCount, differently from connect(), our observable does not start emitting items, but only when a first subscriber subscribes upon it. Newer subscribers will start receiving emitted items as soon as they subscribe to the observable. But similar to connect, it does not collect any item, so subscribers will only receive items emitted after their subscriptions.

As the subscribers unsubscribe from the observable, refCount will decrease an internal counter. When the last subscriber unsubscribes (i.e.: when its internal counter reaches zero), it will stop emit items. If later we subscribe again, a new connection to the observable will be made and it will start emitting items from the beginning.

If we run the code below, we will see that after calling refCount, observable will not emit anything. But, after 5 seconds, when the first subscriber subscribes, it will start receiving emitted items from #0 (remember, refCount will make observable to start emitting items only when the first subscription). Then, after 5 seconds more, a second subscriber subscribes and starts receiving items from #4. After that, the first subscriber unsubscribes and stops receiving items (but the observable will keep emitting since there is still one subscriber connected to it). After 3 seconds the second subscriber unsubscribes, making the observable stops emitting and terminates, since there is no more subscriber subscribed to it.

Note that when using refCount we must subscribe our subscriber’s upon the Observable returned by the refCount, and not on the ConnectableObservable returned by the publish (as we did when using connect method).

Now, let’s see RefCount example we provided on the demo app:

When clicking on the “RefCount” button, nothing happen,  but as soon as we subscribe to it (by clicking on the “Subscribe First” button), observable starts emitting and the first subscription starts receiving them. The same happens when we subscribe the second subscriber. After we unsubscribe both subscription, we can notice our observable stops emitting items.

Example #3 – Replay()

This example shows how to use Observable::replay() operator. As soon as we call connect(), our observable will start emitting items and replay() will collect them (depends on the replay variation, collected items might vary). Later, as we subscribe our subscribers, they will first receive all collected items and then receive items as they are emitted.

We will show three replay variants:

  • replay(): replay all of its items and notifications to any future Observer.
  • replay(bufferSize): replay at most bufferSize items emitted by the Observable. Older items are discarded.    
  • replay(time): replay all items emitted by the Observable within a specified time window.


replay() with no arguments will collect all items. When we call connect, our observable will start emitting and it will collect them. If there is already a subscriber subscribed to it, it will receive emitted items. For newer subscriptions, all collected items will be first emitted prior the new items.


This variant basically replays at most [bufferSize] items emitted by the observable. If we use it in the snippet above with bufferSize = 3, we will notice that the first subscriber will receive numbers starting from #2. This is because we are sleeping for 5 seconds prior to subscribe, which makes the first two emitted items to be discarded (items #0 and #1). The second observer will receive items starting from #7 (since we are sleeping for 5 seconds more prior to subscribe it).


This another variant will replay all items emitted by the observable within a specified time window. Again, if we change the snippet above to use it with 2 seconds of window, the first observer will get items starting from #3, since the first 3 items will not be replayed (remember we are sleeping for 5 seconds prior this subscription). The second observer will receive items starting from #8, since we are sleeping for 5 seconds more prior to subscribe it.

For any replay variant, we can also unsubscribe any subscriber if we do not want to receive items anymore. Later, if we resubscribe it, it will receive all buffered items again.

Now, see how it appears when running this example on the demo app:

We can see that when clicking on the “Connect” button, observable starts emitting items. Then, when clicking on the “Subscribe First” button, it will first receive all items emitted before the subscription (since we used replay with no argument variant) as well as the new items as they are emitted. The same happens on the second subscription. Later, even when unsubscribe both subscriptions, observable keeps emitting, and only when clicking on the “Disconnect” button, it stops emitting. You can also try using replay with count and time windows options.

Example #4 – cache()

This example demonstrates how to use Observable::cache() operator. Basically it ensures that all observers see the same sequence of emitted items, even if they subscribe after the Observable has begun emitting items. Different from the replay(), when we call cache() to an observable, items will not start being emitted, but only when the first subscriber arrives.

So, as we subscribe our subscribers, they will first receive all collected items (if any) and then receive items as they are emitted.

As well explained here: “An important thing to note is that the internal ConnectableObservable doesn’t unsubscribe if all the subscribers go away, like  refCount would. Once the first subscriber arrives, the source observable will be observed and cached once and for all. This matters because we can’t walk away from an infinite observable anymore. Values will continue to cached until the source terminates or we run out of memory.”. So, if we have no control over the source, we can, for example use take(N) operator, or any other approach to ensure our observable will terminate at some point.

In the example below, after calling cache, our observable does not start emitting, but only after the first subscription (which happens after 5 seconds). After 5 seconds more, a second subscriber is subscribed and also receives all cached items. Observable will stop emitting after 30 items due to the take(30) operator in the chain.

And now, let’s see how it works on the demo app:

When clicking on the “cache” button, nothing happen, but as soon as we click on the “Subscribe First” button, observable starts emitting and the first subscription receive these items. When we click on the “Subscribe Second” button, this subscription also first receives all cached items and then it starts receiving new emitted items. Finally, even when unsubscribe both subscriptions, observables keeps emitting, but since we are using take(30) operator, it will stop emitting after it emits 30 items.


If we take a close look, we can see that the examples shown on this article have some similar behaviors. On the table below we can see some of this similarities:

Start emitting only when there is a subscriptionStart emitting right after call connect methodCollect itemsStop emitting when there is no more subscription
refCount yesnonoyes


As we could see, the concept of hot observables is pretty simple. They can be useful in many different situations such as when creating observables are an expensive operation, or maybe when we just want to emit “live” occurrences such as mouse clicks. On this article we presented only some ways to work with hot observables. If you want to learn more about it, I suggest you to check this, this and this articles.

Now What?

We finally finished this series about RxJava. For the last 5 months we have presented about 75 operators by showing examples, some comments about them, as well as providing demo apps for each one. There is still a lot more to learn, but I hope this series could give you some ideas about RxJava in general.

Probably on the next few weeks we will wrap them all together in a single app (in addiction to some other examples already published by other authors), so that you can use it as a reference.

So, thank you all, and … see you around.