Apply infinite operations to an RxJS Observable

Posted on Aug 14, 2017

This week, my friend @alejandro_such asked me about how to apply infinite operations to an RxJS observable. Honestly, is not something I stumbled upon before, but every single case you need to solve in RxJS feels like a new challenge. That makes room to write a RxJS cookbook, but that's not why you're reading this today 😉.

Note: you need a bit of RxJS knowledge to follow up the article

The problem

We're used to see and write Observables operators in a chain:

const stream = Rx.Observable.of(0, 1, 2);

stream
  .map(val => val + 1)
  .filter(val => val % 2 === 0)
  .subscribe(res => console.log("Res: " + res)); // Res: 2

Now imagine you need to apply an undetermined set of operations. In that case you cannot chain them anymore. One could write the following code:

const sum = val => val + 1;
const fns = [sum, sum, sum];
const stream = Rx.Observable.of(0);

fns.forEach(fn => stream.map(fn));

stream.subscribe(res => console.log("Res: " + res));
// Expected output ==> Res: 3
// Actual output ==> Res: 0

How's it possible that we added 3 map operations, and we get Res: 0? It's like we didn't do anything at all.

Simple: because observables are immutable. So, whenever you apply an operator, it's returning a new observable. Avoiding side effects is one point of FP (Functional Programming), and RxJS is a FRP (Functional Reactive Programming) library. It's a common pitfall to think that the operators return a mutated version of their same instance.

The solution

We need to store the new observable returned by every operator, and apply the next operator over that one. A simple way, following the previous example:

const sum = val => val + 1;
const fns = [sum, sum, sum];
const stream = Rx.Observable.of(0);

fns.forEach(fn => {
  stream = stream.map(fn); // update with new observable
});

stream.subscribe(res => console.log("Res: " + res));

Or, if we wanna go functional and avoid side effects, reduce plays very well here:

// Try yourself at http://jsbin.com/qucihequdu/edit?js,console
const sum = val => val + 1;
const fns = [sum, sum, sum];
const original = Rx.Observable.of(0);

const mapped = fns.reduce(
  (acum, fn) => acum.map(res => fn(res)), // Apply and return new observable
  original
);

mapped.subscribe(res => console.log("Res: " + res));

Go to this jsbin and start playing with it!

Alex Jover

ALEX JOVER

Google Dev Expert, Vue team member, cats and dance lover. Find here all my teachings about web development, web performance and more.

Alex Jover Morales © 2018