Press "Enter" to skip to content

RxJava Operators – Part 8: Join Operator

0

Join is one of the operators used to combine Observables. According to the docs it “combines items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable”. Easy, right? Not at all, at least for me!

We have already talked about combining observables here, but we decided to create a separated article for the join operator since we think it may be more complex to understand it. This article, as well as the demo app, are a little different from the others over this series. Demo app can be used as a playground since it allows us to try different values for the join parameters and check the results easily. We will first present some concepts, and then, run into three different examples by using some pre-defined parameters and analyse the results. Check Implementation section below for details.

As usual, you can find here a demo app.

Please, check below the previous articles of this series:

Some Background

Since join() operator uses window concept internally to control its emissions, it may be helpful to understand window operator, especially that variant which accepts a timestamp as the argument.

So, let’s take a look on how this variant of window() operator works:

 

We can see on this marble diagram that a window is defined to be open for a period of time “t”. While it is opened, it collects items from the source. When it closes, window operator emits an Observables containing all collected items from the source Observable and then, this Observable terminates with an onCompleted notification.

On the diagram, while the first window is opened, source Observable emits red and yellow items. After timespan elapses, window is closed (but another one is opened) and an Observable containing these values is emitted downstream. Right after, a green item is emitted (and later a soft blue item) and both are collected by the second window. When it closes, it emits an Observable with those two items. Then, another window opens, collects blue and pink items and closes, by emitting these items. A last window opens, but since no items were emitted from the source while it is opened, it closes after its timespan and emits an empty Observable.

This was just a background on how this window operator variant works. We will use these concepts when dealing with join operator.

Join Operator

Now, let’s start talking about join operator. First let’s see the parameters it accepts:

  • right – the second Observable to join items from
  • leftDurationSelector – a function to select a duration for each item emitted by the source Observable, used to determine overlap
  • rightDurationSelector – a function to select a duration for each item emitted by the right Observable, used to determine overlap
  • resultSelector – a function that computes an item to be emitted by the resulting Observable for any two overlapping items emitted by the two Observables

Basically, it combines items from two source Observables. Whenever an item is emitted from one of its source, a window is opened for a duration defined by a function. Then, while that window is opened (let’s say, left window), items are collected. For every item emitted by the right source, it is combined to all open left items and passed along to the resultSelector function. The same occurs in the opposite way. Finally, resultSelector returns an item to be emitted by the Observable returned from join.

In short, whenever two items (each one for one source) are overlapped, they will be paired and sent to the resultSelector which computes and returns them.

In the marble diagram below we can see this in action. Whenever an item is emitted from any source Observable that overlaps with any item emitted from the other observable, they are combined.

Let’s see a step by step:

  • First observable emits a pink circle item, then a window is opened.
  • Then, the second observable emits a diamond item. Since this emission happened while the first window was still opened, it will be combined with the pink item and emitted downstream.
  • Later, window related to the pink item closes. It will no longer be combined with any other item.
  • Now, first observable emits an orange circle item. But since a window for the second observable (the diamond item) is still opened, that orange circle will be combined to the diamond.
  • Sometime later the window for the diamond item closes.
  • Then, first observable emits another item, a blue one, but since there is no open window for the second observable, this item will not (yet) be combined to any item. At this point, we can see that there are two open window’s for the first Observable, both ones waiting for items from the second observable to be combined with.
  • Now, window related to the orange item closes.
  • Then, second observable emits a star item which will be combined to the blue item from the first observable, since its window is still opened.
  • Then window for the blue item closes.
  • At this moment, only the window related to the star is opened.
  • Next, first observable emits a yellow item, which will be combined to the star item.
  • Now, window for the star item closes.
  • And finally, second observable emits a pentagon item which is combined with the yellow item, since this is the only open window from the other observable opened at this time.
  • And both source observables complete.

Implementation

We decided to make a demo app in a way that users can try using join operator by setting different values form some attributes (e.g. observables delay time, window duration, etc) and run tests using the any configured values (actually there are some boundaries just to not flood the GUI). By doing so, we can easily configure different values for each window (include never value), as well as for the delay that each source will use between their emissions. See how the main method looks like:

Basically, left and right source will emit items using a delay between each emission based on the user configuration. Then, join operator will define a duration for each item (for each Observable) also based on user configuration. Finally, whenever two items (each one from each Observable) are overlapped, they will be passed to the resultSelector which will just group them in a list. That’s it!

Running the code

Now, let’s see how join behaves when using different values for the parameters. This first example was based on this article. The other examples are just variations of this first one.

Note that results you see on the demo app may be in different order. But to keep simplicity, we will show the sequential order on this article.

First Example

  • Left Sequence Delay: 100
  • Right Sequence Delay: 200
  • Left Window Duration: Never
  • Right Window Duration 0
  • Items to be emitted: 10

For this example, since left window never close, every item emitted by the right observable will be combined with all left items. On the other hand, since right window has a duration of 0s, after each item is combined to the open left items, it closes and no item be available anymore.

Based on that, this is what we should see when using these values:

  • At Time 100ms Left Observable emits: 0 -> Nothing happen. No open window on the right side.
  • At Time 200ms Left Observable emits: 1 -> Nothing happen. No open window on the right side.
  • At Time 200ms Right Observable emits: 0 -> Now, we have open window for both Observables. This is what will happen:
    • – Item 0 emitted by the left Observable will be combined with the current right item (0). Result: [0, 0]
    • – Item 1 emitted by the left Observable will be combined with the current right item (0). Result: [1, 0]
  1. NOTE 1: Since left window never close, items 0 and 1 will still be available for the next right emissions
  2. NOTE 2: Since right window has a duration of 0s, item 0 will no longer be available
  • At Time 300ms Left Observable emits: 2 -> Nothing happen. No open window on the right side.
  • At Time 400ms Left Observable emits: 3 -> Nothing happen. No open window on the right side.
  • At Time 400ms Right Observable emits: 1 -> Now, we have open window for both Observables. This is what will happen:
    • – Item 0 emitted by the left Observable will be combined with the current right item (1). Result: [0, 1]
    • – Item 1 emitted by the left Observable will be combined with the current right item (1). Result: [1, 1]
    • – Item 2 emitted by the left Observable will be combined with the current right item (1). Result: [2, 1]
    • – Item 3 emitted by the left Observable will be combined with the current right item (1). Result: [3, 1]
  1. NOTE 1: Since left window never close, items 0, 1, 2 and 3 will still be available for the next right emissions
  2. NOTE 2: Since right window has a duration of 0s, item 1 will no longer be available
  • At Time 500ms Left Observable emits: 4 -> Nothing happen. No open window on the right side.
  • At Time 600ms Left Observable emits: 5 -> Nothing happen. No open window on the right side.
  • At Time 600ms Right Observable emits: 2 -> Now, we have open window for both Observables. This is what will happen:
    • – Item 0 emitted by the left Observable will be combined with the current right item (2). Result: [0, 2]
    • – Item 1 emitted by the left Observable will be combined with the current right item  (2). Result: [1, 2]
    • – Item 2 emitted by the left Observable will be combined with the current right item (2). Result: [2, 2]
    • – Item 3 emitted by the left Observable will be combined with the current right item (2). Result: [3, 2]
    • – Item 4 emitted by the left Observable will be combined with the current right item (2). Result: [4, 2]
    • – Item 5 emitted by the left Observable will be combined with the current right item (2). Result: [5, 2]
  1. NOTE 1: Since left window never close, items 0, 1, 2, 3, 4 and 5 will still be available for the next right emissions
  2. NOTE 2: Since right window has a duration of 0s, item 2 will no longer be available
  • At Time 700ms Left Observable emits: 6 -> Nothing happen. No open window on the right side.
  • At Time 800ms Left Observable emits: 7 -> Nothing happen. No open window on the right side.
  • At Time 800ms Right Observable emits: 3 -> Now, we have open window for both Observables. This is what will happen:
    • – Item 0 emitted by the left Observable will be combined with the current right item (3). Result: [0, 3]
    • – Item 1 emitted by the left Observable will be combined with the current right item  (3). Result: [1, 3]
    • – Item 2 emitted by the left Observable will be combined with the current right item (3). Result: [2, 3]
    • – Item 3 emitted by the left Observable will be combined with the current right item (3). Result: [3, 3]
    • – Item 4 emitted by the left Observable will be combined with the current right item (3). Result: [4, 3]
    • – Item 5 emitted by the left Observable will be combined with the current right item (3). Result: [5, 3]
    • – Item 6 emitted by the left Observable will be combined with the current right item (3). Result: [6, 3]
    • – Item 7 emitted by the left Observable will be combined with the current right item (3). Result: [7, 3]
  1. NOTE 1: Since left window never close, items 0 to 7 will still be available for the next right emissions
  2. NOTE 2: Since right window has a duration of 0s, item 3 will no longer be available
  • … and so on until it completes

Second Example

  • Left Sequence Delay: 100
  • Right Sequence Delay: 200
  • Left Window Duration: 100
  • Right Window Duration: 100
  • Items to be emitted: 10

On this example, both windows (left and right) will last for 100ms. This means that every item emitted by an Observable will be combined to any open window items from the other observable. Let’s see a step by step:

  • At Time 100ms Left Observable emits: 0 -> This window will last for 100ms. Nothing happen. No open window on the right side.
  • At Time 200ms Left Observable emits: 1 -> This window will last for 100ms. Nothing happen. No open window on the right side.
  • At Time 200ms Right Observable emits: 0 -> This window will last for 100ms. Now, we have open window for both Observables. This is what will happen:
    • – Item 0 emitted by the left Observable will be combined with the current right item (0). Result: [0, 0]
    • – Item 1 emitted by the left Observable will be combined with the current right item (0). Result: [1, 0]
  1. NOTE 1: Since left window has a duration of 100ms, if an item is emitted by the right observable within this time, they will be combined
  2. NOTE 2: Since right window has a duration of 100ms, if an item is emitted by the left observable within this time, they will be combined
  • At Time 200ms First left window closes, so, item 0 is no longer available
  • At Time 300ms Left Observable emits: 2 -> This window will last for 100ms. At this time, right emitted item 0 is still available. This is what will happen:
    • – It will be combined with the right item 0. Result: [2, 0]
  • At Time 300ms Left window for item 1 closes
  • At Time 300ms Right window for item 0 closes
  • At Time 400ms Left Observable emits: 3 -> This window will last for 100ms. Nothing happen. No open window on the right side.
  • At Time 400ms Right Observable emits: 1 -> This window will last for 100ms. Now, we have open window for both Observables. This is what will happen:
    • – Item 2 emitted by the left Observable will be combined with the current right item (1). Result: [2, 1]
    • – Item 3 emitted by the left Observable will be combined with the current right item (1). Result: [3, 1]
  • At Time 400ms Left window for item 2 closes
  • At Time 500ms Left Observable emits: 4 -> This window will last for 100ms. We still have open window for both Observables. This is what will happen:
    • – Item 4 emitted by the left Observable will be combined with the current right item (1). Result: [4, 1]
  • At Time 500ms Left window for item 3 closes
  • At Time 500ms Right window for item 1 closes
  • At Time 600ms Left Observable emits: 5 -> This window will last for 100ms. Nothing happen. No open window on the right side.
  • At Time 600ms Right Observable emits: 2 -> This window will last for 100ms. Now, we have open window for both Observables. This is what will happen:
    • – Item 4 emitted by the left Observable will be combined with the current right item (2). Result: [4, 2]
    • – Item 5 emitted by the left Observable will be combined with the current right item (2). Result: [5, 2]
  • At Time 600ms Left window for item 4 closes
  • At Time 700ms Left Observable emits: 6 -> This window will last for 100ms. We still have open window for both Observables. This is what will happen:
    • – Item 6 emitted by the left Observable will be combined with the current right item (2). Result: [6, 2]
  • At Time 700ms Left window for item 5 closes
  • At Time 700ms Right window for item 2 closes
  • At Time 800ms Left Observable emits: 7 -> This window will last for 100ms. Nothing happen. No open window on the right side.
  • At Time 800ms Right Observable emits: 3 -> This window will last for 100ms. Now, we have open window for both Observables. This is what will happen:
    • – Item 6 emitted by the left Observable will be combined with the current right item (3). Result: [6, 3]
    • – Item 7 emitted by the left Observable will be combined with the current right item (3). Result: [7, 3]
  • At Time 800ms Left window for item 6 closes
  • At Time 900ms Left Observable emits: 8 -> This window will last for 100ms. We still have open window for both Observables. This is what will happen:
    • – Item 8 emitted by the left Observable will be combined with the current right item (3). Result: [8, 3]
  • At Time 900ms Left window for item 7 closes
  • At Time 900ms Right window for item 3 closes
  • At Time 1000ms  Left Observable emits: 9 -> This window will last for 100ms. Nothing happen. No open window on the right side.
  • At Time 1000ms  Right Observable emits: 4 -> This window will last for 100ms. Now, we have open window for both Observables. This is what will happen:
    • – Item 8 emitted by the left Observable will be combined with the current right item (4). Result: [8, 4]
    • – Item 9 emitted by the left Observable will be combined with the current right item (4). Result: [9, 4]
  • At Time 1000ms Left window for item 8 closes
  • At Time 1100ms Left window for item 9 closes
  • At Time 1100ms Right window for item 4 closes
  • At Time 1200ms  Right Observable emits: 5 -> This window will lass for 100ms. Nothing happen. No open window on the left side.
  • At Time 1300ms Right window for item 5 closes
  • At Time 1400ms  Right Observable emits: 6 -> This window will last for 100ms. Nothing happen. No open window on the left side.
  • At Time 1500ms Right window for item 6 closes
  • At Time 1600ms  Right Observable emits: 7 -> This window will last for 100ms. Nothing happen. No open window on the left side.
  • At Time 1700ms Right window for item 7 closes
  • At Time 1800ms  Right Observable emits: 8 -> This window will last for 100ms. Nothing happen. No open window on the left side.
  • At Time 1900ms Right window for item 8 closes
  • At Time 2000ms  Right Observable emits: 9 -> This window will last for 100ms. Nothing happen. No open window on the left side.
  • At Time 2100ms Right window for item 9 closes

Third Example

  • Left Sequence Delay: 100
  • Right Sequence Delay: 200
  • Left Window Duration: 200
  • Right Window Duration: 100
  • Items to be emitted: 10

The only difference from this example and the previous one is in the left window duration that will last for 200ms (instead of 100ms). This means that when a right item is emitted, there might have more items on the left side to be combined, since left window lasts for a little more time than the previous example.

Let’s see what we should get when running demo app by using it with this values:

  • At Time 100ms Left Observable emits: 0 -> This window will last for 200ms. Nothing happen. No open window on the right side.
  • At Time 200ms Left Observable emits: 1 -> This window will last for 200ms. Nothing happen. No open window on the right side.
  • At Time 200ms Right Observable emits: 0 -> This window will last for 100ms. Now, we have open window for both Observables. This is what will happen:
    • – Item 0 emitted by the left Observable will be combined with the current right item (0). Result: [0, 0]
    • – Item 1 emitted by the left Observable will be combined with the current right item (0). Result: [1, 0]
  1. NOTE 1: Since left window has a duration of 200ms, if an item is emitted by the right observable within this time, they will be combined
  2. NOTE 2: Since right window has a duration of 100ms, if an item is emitted by the left observable within this time, they will be combined
  • At Time 300ms Left Observable emits: 2 -> This window will last for 200ms. At this time, right emitted item 0 is still available. This is what will happen:
    • – It will be combined with the right item 0. Result: [2, 0]
  • At Time 300ms Left window for item 0 closes
  • At Time 300ms Right window for item 0 closes
  • At Time 400ms Left Observable emits: 3 -> This window will last for 200ms. Nothing happen. No open window on the right side.
  • At Time 400ms Right Observable emits: 1 -> This window will last for 100ms. Now, we have open window for both Observables. This is what will happen:
    • – Item 1 emitted by the left Observable will be combined with the current right item (1). Result: [1, 1]
    • – Item 2 emitted by the left Observable will be combined with the current right item (1). Result: [2, 1]
    • – Item 3 emitted by the left Observable will be combined with the current right item (1). Result: [3, 1]
  • At Time 400ms Left window for item 1 closes
  • At Time 400ms Right window for item 0 closes
  • At Time 500ms Left Observable emits: 4 -> This window will last for 200ms. Now, we have open window for both Observables. This is what will happen:
    • – It will be combined with the right item (1). Result: [4, 1]
  • At Time 500ms Left window for item 2 closes
  • At Time 500ms Right window for item 1 closes
  • At Time 600ms Left Observable emits: 5 -> This window will last for 200ms. Nothing happen. No open window on the right side.
  • At Time 600ms Right Observable emits: 2 -> This window will last for 100ms. Now, we have open window for both Observables. This is what will happen:
    • – Item 3 emitted by the left Observable will be combined with the current right item (2). Result: [3, 2]
    • – Item 4 emitted by the left Observable will be combined with the current right item (2). Result: [4, 2]
    • – Item 5 emitted by the left Observable will be combined with the current right item (2). Result: [5, 2]
  • At Time 600ms Left window for item 3 closes
  • At Time 700ms Left Observable emits: 6 -> This window will last for 200ms. Now, we have open window for both Observables. This is what will happen:
    • – It will be combined with the right item (2). Result: [6, 2]
  • At Time 700ms Left window for item 4 closes
  • At Time 700ms Right window for item 2 closes
  • At Time 800ms Left Observable emits: 7 -> This window will last for 200ms. Nothing happen. No open window on the right side.
  • At Time 800ms Right Observable emits: 3 -> This window will last for 100ms. Now, we have open window for both Observables. This is what will happen:
    • – Item 5 emitted by the left Observable will be combined with the current right item (3). Result: [5, 3]
    • – Item 6 emitted by the left Observable will be combined with the current right item (3). Result: [6, 3]
    • – Item 7 emitted by the left Observable will be combined with the current right item (3). Result: [7, 3]
  • At Time 800ms Left window for item 5 closes
  • At Time 900ms Left Observable emits: 8 -> This window will last for 200ms. Now, we have open window for both Observables. This is what will happen:
    • – It will be combined with the right item (3). Result: [8, 3]
  • At Time 900ms Left window for item 6 closes
  • At Time 900ms Right window for item 3 closes
  • At Time 1000ms Left Observable emits: 9 -> This window will lass for 200ms. Nothing happen. No open window on the right side.
  • At Time 1000ms Right Observable emits: 4 -> This window will last for 100ms. Now, we have open window for both Observables. This is what will happen:
    • – Item 7 emitted by the left Observable will be combined with the current right item (4). Result: [7, 4]
    • – Item 8 emitted by the left Observable will be combined with the current right item (4). Result: [8, 4]
    • – Item 9 emitted by the left Observable will be combined with the current right item (4). Result: [9, 4]
  • At Time 1000ms Left window for item 7 closes
  • At Time 1100ms Left window for item 8 closes
  • At Time 1100ms Right window for item 4 closes
  • At Time 1200ms Right Observable emits: 5 -> This window will last for 100ms. Now, we have open window for both Observables. This is what will happen:
    • – Item 9 emitted by the left Observable will be combined with the current right item (5). Result: [9, 5]
  • At Time 1200ms Left window for item 9 closes
  • At Time 1400ms Right window for item 5 closes

Conclusion

As we can see, working with join operator can be sometimes confusing. I think the point is to first try to understand the basics of this operator. After that things might be easier.

Regarding to the demo app, I first created it for my use only, and it helped me a lot to test join operator by using different values in a simple and quick way. I hope it can also help you in some way. 

And of course, if you have questions, please leave a comment below.