All posts

Think reactive (the code)

📅 Created 6 years ago → updated after 5 years

🏷️ #ts #archive #js #rx

🔗 Think Reactive

Well, not from very scratch but let’s rewrite all the reactive part. At this moment I hope you fulfilled all the prerequisites:

  • NodeJS is installed,
  • the code is downloaded,
  • npm install has been run,
  • some MP3s are in the demo/audio/.

First, let me introduce what’s around. The app.ts on the GitHub imports and instantiates 2 classes: the AudioAnalyser and VisualizerCanvas.
First one stands for the spying the audio currently played and providing us with Fast Fourier Transform data (FFT). Second one wraps the HTML <canvas> element and provides the drawBars() method. It consumes the Array<0..255> and actually draws the bar diagram of that array.

That’s the very essence of the demo: we will fetch the FFT data and will feed ‘em to the drawBars() periodically.

So let’s remove the content of runDemo() starting from :23 and below. The only things left are instantiating of the AudioAnalyser and VisualizerCanvas and loading first audio file (the fetch() piece).

Check how drawBars() work

As I stated earlier, the instance of the VisualizerCanvas has the drawBars() method which consumes the array of numbers. So let’s feed it to the method and see how it works:

function runDemo() {
  // ...
  const testData = [64, 33, 12, 128, 200, 225]
  visualizer.drawBars(testData)

  // draw a bit different diagram in a second
  setTimeout(() => {
    testData.unshift(testDate.pop())
    visualizer.drawBars(testData)
  }, 1000)
}

Hope this works (fingers crossed 🤭). Ok, let’s remove this piece and go further.

Setting up the time interval

1️⃣ Create an Observable

The RxJS provides many ways to create an Observable (aka Stream). Actually it’s possible to create Rx Observable from any async activity (like user events, timeouts, Promises) or even from static values. So we’ll create an observable from the time interval (500 ms for testing purposes, later on we’ll adjust the frequency):

import { interval } from "rxjs"

function runDemo() {
  // ...
  const analyser$ = interval(500)
  // by some agreements the stream names end with `$`
}

This makes no difference to the app behavior. In order to create that difference we have to…

2️⃣ Subscribe to Observable:

function runDemo() {
  // ...
  const analyser$ = interval(500)

  analyser$.subscribe((result: number) => {
    console.info(result)
  })
}

Not very useful so far, right? Actually we don’t need the data from that interval, we need only the regular event emitting (beaming).

3️⃣ Make the stream providing real data

The true power of Rx is possibility to modify the stream (Observable) before subscribing. We’ll use following technic (the code is pretty talkative):

import { interval } from "rxjs"
import { map } from "rxjs/operators"

function runDemo() {
  const analyser$ = interval(500).pipe(
    map(() => analyser.getFft()),
  );

  analyser$.subscribe((fftData: number[]) => {
    visualizer.drawBars(fftData)
  })
}

Pay attention to:

  • How the RxJS components are imported:
    • Constructors and interfaces reside in the rxjs package itself; all the methods for creating observables are imported from there as well.
    • The operators for modifying the stream reside under rxjs/operators.
  • How the stream is modified: we actually pipe it through modifying methods. Mind that ideally the map() {} should be pure (this makes it easy testable and straightens the data flow). But in order to make this work it’s really convenient to map the stream of dummy numbers into a stream of FFT data.
  • Since the modified stream returns the FFT number[], we can directly feed it into the visualizer (check out the .subscribe() {} part).

At this point 500 ms does not look like human-friendly update interval. Now we can change it to, for instance, 40 ms (think 25 FPS).

More streams!

The visualizer.drawBars() receives second optional boolean argument. It’s false by default and it’s responsible for drawing bars in grey or in color. Let’s bring some colors in when Shift is pressed. In order to do that we’ll create another stream (Observable) out of keyboard events. It will deliver the information about the Shift button state.

Observable from a keyboard event

import { fromEvent, of, interval, merge } from "rxjs"
import { map, startWith } from "rxjs/operators"

function runDemo() {
  const kbd$ = merge(
    fromEvent(window, "keyup"),
    fromEvent(window, "keydown"),
  ).pipe(
    map((e: Event) => (<KeyboardEvent>e).shiftKey),
    startWith(false),
  );

  kbd$.subscribe((isShiftPressed: boolean) => {
    console.info(`Shift is ${isShiftPressed ? "pressed" : "released"}`);
  });

  // ...the `analyzer$` part here
}

What’s under the hood:

  1. Merge together two streams (Observables) from the keyup and keydown events. This creates new stream emitting data when a key is pressed and when a key is released.
  2. Converted the output data to boolean (since both steams/events deliver similar data object: the KeyboardEvent).
  3. Ensured the stream spits out the false at the very beginning (think when app is just started).

Marry two streams

Now we have two separate streams: one delivers the FFT data every 40 ms, another delivers the Shift state (at least once). Since they both deliver too different data, we cannot simple merge() them. But we can perform the combineLatest() operation which generates the stream (Observable):

  • At the moment when any stream got updated.
    All the source streams must emit at least one value at this point; that’s the meaning of “latest”.
  • With the data combined from all the source streams (as [source1data, source2data,...]):
import { combineLatest } from "rxjs"

function runDemo() {
  // ...the `kbd$` part and the `analyzer$` part
  combineLatest(analyser$, kbd$)
    .subscribe(([fftData, isShiftPressed]: [number[], boolean]) => {
      visualizer.drawBars(fftData, isShiftPressed)
    })
}

Voila! Now run the code an enjoy! The full version of app.ts is on GitHub.

// Observe the force, young padawan!

Color: what it really means? Think Reactive