Pausable Observables in RxJS

and other backpressure techniques

There are different ways to pause event streams: we can filter, delay, buffer, space events, etc. Some of the techniques will ignore events during pause (lossy), others will delay events handling until resumed (lossless). Some of them will keep their subscription, others will resubscribe — therefore their output will behave differently, depending on the source. In this article I’d like to explore several pausing techniques and suggest how they can be implemented using RxJS. Lets go!

Image for post
Image for post

Imagine we have an app that displays push notifications from the server. These notifications we will pause in all different ways!

Why: user wants to mute notifications for some time. Notifications during enabled mute will be skipped

Image for post
Image for post

What: when paused we simply filter out events from Observable source, keeping server connection intact

How: probably, windowToggle operator would be the best fit for this. Using it we can indicate when to pass events from the source, and when to filter them out

source$.pipe(
windowToggle(
offs$,
() => ons$
),
// then flattern window Observables
flatMap(x => x)
)

Play with this example using windowToggle operator in a playground.

Why: when user switches to another window — we can unsubscribe from notifications stream, therefore lowering network, process and server load. Once user switches back — we restore notifications stream connection

Image for post
Image for post
(here source is a cold timer, so upon resubscription it starts emitting from 0)

What: when paused we unsubscribe from Observable and resubscribe on resume

How:

  • takeUntil with repeatWhen
    takeUntil will complete Observable when we pause
    and repeatWhen will subscribe again when we resume
source$.pipe(
takeUntil(ons$),
repeatWhen(() => offs$)
)

Play with this example using takeUntil and repeatWhen in a playground.

  • switchMap with EMPTY or NEVER
    when paused we switchMap to an empty Observable
    when resumed we switchMap to source stream (this triggers subscription)
pause$.pipe(
switchMap(value =>
value
? EMPTY
: source$
)
)

Play with the example using switchMap operator in a playground.
An article describing this approach with switchMap.

  • or manual use of subscribe / unsubscribe

Old geek note: there used to be a pausable operator in rxjs 4 that implemented this kind of pausing. Official advice is to use switchMap to achieve this in 5+ versions

Why: when you don’t want to bother your user with frequent updates — there are several ways to dose amount of messages shown in a given time period

Image for post
Image for post
(sampling source every 10ms)

What: every N ms take the latest value emitted since previous sampling

Image for post
Image for post
(debounceTime vs throttleTime vs auditTime)

Note: in the marble diagram above I keep source marble colors, while updating their values to represent time when they were emitted. E.g. the second yellow marble from the source was emitted at 5ms, in debounceTime(10) it was emitted at 15ms, in auditTime(10) it was emitted at 10ms, and it was ignored in throttleTime(10) stream.

What: debouncing emits a value if after a given time no other values were emitted. Throttling does exactly opposite: when a value is emitted — ignore consequent emissions for a given time period. And audition acts similar to throttling, though it emits the latest value in a given period, not the first.

How: for all four we have out-of-the-box operators

  • sample
  • debounce, debounceTime
  • throttle, throttleTime
  • audit, auditTime

Play with sample operator in a playground.
Check out this comparison of debounceTime vs throttleTime vs auditTime.

Why: when push notifications come rapidly — we’d like to ensure that user has time to grasp them. So we make at least 1 sec pause after displaying a notification

Image for post
Image for post

What: making at least N-long gap between emissions

How: concatMap with a timer, that starts with source value and completes in N time

source$.pipe(
concatMap(value => timer(N).pipe(
ignoreElements(),
startWith(value)
))
)

Play with this example of spacing values on an observable using concatMap.

Why: imagine that our client has lost connection to the server, and is trying to send a bunch of messages. We can delay sending those messages until we get connected back

Image for post
Image for post

What: when paused we buffer values on the stream, emit buffer once resumed

How:

  • merge( windowToggle between resume and pause ,bufferToggle between pause and resume )
merge(
source$.pipe( bufferToggle(off$, ()=>on$) ),
source$.pipe( windowToggle(on$, ()=>off$) )
).pipe(
// then flatten buffer arrays and window Observables
flatMap(x => x)
)

Run this example using windowToggle with bufferToggle to make pausable buffered Observable.

  • switchMap pause stream onto buffer when paused, onto raw source on resume

Old geek note: there used to be a pausableBuffered operator in rxjs 4 that implemented this kind of pausing

Delaying events on mouse hover with event spacing (lossless)

Image for post
Image for post

When mouse hovers a message — we want to pause a stream to give user time to read the message. Notifications stream will be paused until user drives mouse away. Also, all the events are spaced to give user at least 1 second to read the message

Play with this complex example using lots of stuff.
Stackblitz interactive app.

Ability of Observable consumer to control producer’s emission rate, time or amount is often called back pressure. So pausing is just a subset of back pressure.

Obviously, we could not cover all back pressure techniques in this article, though here are some I’d like to mention:

  • delay and delayWhen are simple, yet useful operators that might fit your pausing needs
  • controlled Observable — another operator from RxJS prior v5. It allowed consumer to control when Observable should produce values on the stream by calling source$.request(Number) . Read more on this operator in RxJS 4 docs and check its source code to get insight how this could be achieved in modern RxJS
  • Pausable delayed Observable — in this case result stream would delay event production, keeping emission pace from the original source. Say, if we use this to delay a 1s timer — after resume it would still emit at 1s rate. I’d suggest checking concatMap and timeInterval while implementing this kind of pausing

Have another example, a different approach or a question — please, share that in the comments section! Your feedback is very valuable!

If you enjoyed reading this article — give a push to the clap button: it will let me understand usefulness of this topic and will help others discover this read. Follow me here on medium and twitter for more updates!

I’m proud that you’ve read so far! Congratulations!

I often post about RxJS, you can follow me on twitter to get latest updates:

Check out my new article on error handling in RxJS! Lots of marble diagrams and code samples will help you understand the nuances of different approaches:

And in this article, I’m exploring an Rx+JSX framework concept, check it out:

Be sure to check this RxJS Playground — the tool I used to compile marble diagrams for this article. I’ve created it to help developers (myself included) explore, understand and explain RxJS streams. Give it a try!

Further reading on backpressure in rxjs v4 with operators sources

RxJS: pause and resume — mighty switchMap” by Wojciech Trawiński

A look at Back pressure and its handling in RxJS” by muhammad abdulmoiz

Hot vs Cold Observables” by Ben Lesh

Written by

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store