Presentación de RxJs desde cero: ¿Cómo funciona?

Todas las semanas contamos con una Front End Sync en el equipo, donde cada miembro debate o presenta algún tema en particular. La semana pasada me tocó preparar la temática RxJs, ya que pocos miembros habían tenido contacto con esta librería.

RxJs es una librería de programación reactiva que facilita la creación de código asíncrono y se basa en eventos a través del uso de observables. Es bastante extensa, por lo que tuve que elegir un escenario de donde partir para exponer los conceptos básicos, ejemplos y su código funcional. Opté por comenzar de la misma manera que yo me inicié en el mundo de RxJs, cuando Angular trajo el servicio HttpClient, reemplazando el objeto Promise por Observable para hacer peticiones http.

Primero la teoría…


Existen 3 clases principales alrededor del Observable y se pueden representar en typescript de la siguiente forma:

interface Observable {
 subscribe(observer: Observer): Subscription
}


interface Observer {
 next(v: any): void;
 error(e: Error): void;
 complete(): void;
}


interface Subscription {
 unsubscribe(): void;
}
  1. Un Observable es un objeto que tiene un método subscribe, el cual recibe como parámetro un observer y devuelve un objeto Subscription.

  2. Un Observer se subscribe a un Observable y, cada vez que este emite un valor, el Observer ejecuta el método next() y, cuando finaliza, las emisiones de valores llaman al  método complete(). En caso de errores el método error().

  3. Un objeto Subscription mantiene la relación entre el Observer y el Observable, por lo que su método unsubscribe() se utiliza para finalizar esta relación y así, el Observer dejaría de recibir los valores emitidos por el Observable.

En cualquier caso la documentación oficial explica a fondo estos conceptos: https://rxjs-dev.firebaseapp.com/guide/overview

Vamos al código…

Ahora que sabemos como funciona un Observable podemos recordar rápidamente el concepto de un Promise: un objeto que puede emitir un único valor en el futuro, y este puede ser un valor resolved o una razón por la que no pudo completarse (rejected).

Con esto planteamos la primera comparación creando dos funciones: 

– La primera utilizando ajax.get() de RxJs (devuelve un Observable<AjaxResponse>).

– La segunda  utilizando fetch() de JavaScript (retorna un Promise<Response>) ambas usadas para realizar una llamada http GET y obtener la información de una persona determinada del mundo de Star Wars usando SWAPI.


Todo el código que voy a presentar abajo se encuentra agrupado en funciones en StackBlitz por lo que puede probarse aquí: 

import { ajax } from "rxjs/ajax";
 
const fetchPeopleAPIRequestObservable = id => {
  return ajax.get(`https://swapi.dev/api/people/${id}/`);
};

export const fetchPeopleAPIRequestPromise = id => {
  return fetch(`https://swapi.dev/api/people/${id}/`);
};
 
fetchPeopleAPIRequestObservable(1)
fetchPeopleAPIRequestPromise(2)

Si ejecutamos este código y observamos en “Developer Tools” (F12) la pestaña “Network”, descubrimos la primera diferencia al encontrar solamente la petición con id “2”. El objeto Promise es “eager”, por lo tanto se ejecuta cuando se llama a la función, mientras que el Observable es “lazy” y no emite valor hasta que se ejecute el método subscribe() para así generar un objeto Subscription.


Para mostrar por consola el resultado de ambas funciones agregamos los siguientes callbacks:

fetchPeopleAPIRequestObservable(1).subscribe(response =>
   console.log("Observable response", response)
 );
 
 fetchPeopleAPIRequestPromise(2).then(response =>
   console.log("Promise response", response)
 );

En el Promise llamamos al método .then() y el primer parámetro es una función que se ejecuta cuando la promesa se completa (resolve()).

En el Observable llamamos al método subscribe() y el primer parámetro es una función que se ejecuta cada vez que se emite un valor. De esta manera definimos implícitamente un objeto Observer y agregamos un callback que se corresponde con el método next().

El resultado que se muestra en consola es similar ya que fetch() devuelve un Promise con un objeto de tipo Response  y ajax.get() devuelve un Observable de tipo AjaxResponse. Pero necesitamos acceder a las propiedades internas para realmente obtener la información de la persona. Si ya utilizaste fetch() probablemente ya conozcas el método .json() que devuelve otro Promise con el json de la respuesta.

fetchPeopleAPIRequestPromise(1)
.then(response => response.json())
 	.then(response => console.log("Promise response", response))

En el caso del Observable podemos acceder al json a través de la propiedad AjaxReponse.response, pero en su lugar vamos a hacerlo usando una de las característica más destacadas de RxJs: un operator.

Operators

Los operators (operadores) son funciones similares a las funciones de la clase Array de JavaScript (como map y filter) y actúan sobre los valores emitidos por el Observable. Existen “Creation Operators” como el caso de ajax, que se utiliza para crear un Observable partiendo de una llamada http.
Y por otro lado “Pipeable Operators” que son aquellos que se pueden encadenar dentro de un método .pipe() y se ejecutan en orden cada vez que el Observable emite un valor. Cada uno recibe un input del anterior operator y devuelven un output al siguiente operator.

Esto presenta la siguiente diferencia y ventaja de un Observable con respecto a una Promise. Además, los Observables pueden emitir múltiples valores a través del tiempo, de manera que todos los objetos Observer son notificados y puedan actuar en cada emisión. Un Promise está limitado a retornar un único valor.

A continuación comienzo por listar los operators que más he utilizado y algunos escenarios comunes, todos ellos relacionados sólo a peticiones http.

map


El operator más común es map() y funciona igual que Array.map() de Javascript. Se importa desde “rxjs/operators”;

import { map } from "rxjs/operators";
 
fetchPeopleAPIRequestObservable(1)
   .pipe(
map(response => response.response)
   )
   .subscribe(response =>
     console.log( response)
   );


El método pipe() debe invocarse anterior al método subscribe() y este siempre devuelve un Observable. En este caso modificamos el tipo de retorno inicial Observable<AjaxResponse> a un nuevo Observable<any> por lo que el método subscribe() ahora recibe el valor que se encuentra en response.response.

Ahora que tenemos el mismo resultado de la persona usando Observable y Promise podemos agregar el siguiente manejo de errores:

fetchPeopleAPIRequestPromise(1)
    .then(response => response.json()) 
    .then(response => console.log(response))
    .catch(error => console.log(error))

fetchPeopleAPIRequestObservable(1)
    .pipe(
        map(response => response.response)
    )
    .subscribe(
        response => console.log( response),
        error => console.log(error)
    );

 
Usando Promise simplemente llamando al método .catch() podemos agregar un callback para manejar el estado rejected del Promise.
En el caso del Observable agregamos un segundo parámetro en el método subscribe() que implícitamente se corresponde con el método error() del Observer.

take


Como estamos realizando una petición http GET nuestro Observable emite solo un valor. Aun así el objeto Subscription sigue vivo hasta que se ejecute el método unsubscribe(). Podemos llamar al método unsubscribe() inmediatamente luego de subscribe() aunque existen algunos operators que nos ayudan a manejar la cantidad de emisiones que vamos a esperar para completar el Observable. Algunos de ellos son: take, takeUntil, takeWhile.
Usando el operador take(1) logramos que el Observable emita 1 solo valor y se complete por lo tanto ya no es necesario llamar a unsubscribe().

import { map, take } from "rxjs/operators";


fetchPeopleAPIRequestObservable(1)
  .pipe(
      take(1),
      map(response => response.response)
    )
  .subscribe(
        response => console.log( response),
        error => console.log(error)
    );

tap

Otro operator muy usado es tap() y sirve para ejecutar una función en cualquier momento del piping. A diferencia de map() no se modifica el output por lo que tap() devuelve el mismo valor que recibió.

import { map, take,  tap } from "rxjs/operators";
 
fetchPeopleAPIRequestObservable(1)
   .pipe(
	take(1),
map(response => response.response),
tap(response => console.log( response))
   )
   .subscribe();


Ahora podemos mover el callback que se ejecutaba dentro de subscribe() hacia tap() ya que se trata de una simple acción de depuración, obteniendo el mismo resultado. Aunque esto no es un reemplaza a subscribe() ya que sin este método no existiría el objeto Subscription y por lo tanto el objeto Observable no emitirá valores. Pueden existir tantos tap() dentro de pipe() como queramos.



withLatestFrom

 
import { map, take, tap, whitLatestFrom } from "rxjs/operators";
 
fetchPeopleAPIRequestObservable(1)
   .pipe(
	take(1),
// Get the latest emitted value
withLatestFrom(fetchPlanetAPIRequest(5)),
// Maps both responses into a new object
map(([peopleResponse, planetResponse]) => ({
  people: peopleResponse.response,
  planet: planetResponse.response
})),
tap(response =>
  console.log(
    "getPeopleObservableWithLatestFrom: ",
    response.people,
    response.planet
  )
)
   )


concatMap

En el siguiente escenario queremos utilizar información de la persona y usarla como parámetro para realizar un llamado a otro endpoint. Agregamos fetchFilmsAPIRequest que devuelve información de un film y vamos a usarlo para obtener el primer film donde apareció la persona solicitada, y agregar ese film a la respuesta final.
Usando concatMap se puede suscribir a otro observable y combinar ambos valores emitidos. Existen otros operadores similares como mergeMap, flatMap, switchMap que se utilizan con el mismo propósito, pero cada uno se sitúa mejor en algún escenario particular por la manera en que tratan las emisiones. Para el siguiente ejemplo nos concentramos solo en concatMap.


import { map, take, tap, concatMap } from "rxjs/operators";
 
fetchPeopleAPIRequestObservable(1) 
   .pipe(
	take(1),
concatMap(responsePeople => {
   const firstFilmIndex = responsePeople.films[0][27];
 
   // Returns a new Observable as input of next operator on pipe
   // Maps both response into new object
   return fetchFilmsAPIRequest(firstFilmIndex).pipe(
     map(response => response.response),
     map(responseFilm => ({ people: responsePeople, film: responseFilm }))
   );
 })
   ).subscribe(response =>
     console.log(response)
   );

Lograr el mismo resultando con Promises quizá nos lleva a un callback hell o a implementar probablemente async y await =)


forkJoin

El último escenario es cuando necesitamos hacer varias peticiones GET que no están relacionadas y necesitamos ejecutar un callback cuando todas hayan emitido una respuesta. El operator forkJoin funciona similar al método Promises.all(). Recibe un array de Observable y se completa con la última emisión de cada Observable, por lo que no es necesario llamar .unsubscribe() ni usar take().

 
import { map, forkJoin } from "rxjs/operators";
 
forkJoin([
   fetchPeopleAPIRequestObservable(1),
   fetchPlanetAPIRequest(2),
   fetchFilmsAPIRequest(3)
 ])
   .pipe(
     map(([peopleResponse, planetResponse, filmResponse]) => ({
       people: peopleResponse.response,
       planet: planetResponse.response,
       film: filmResponse.response
     }))
   )
   .subscribe(response => console.log("forkJoinResponse", response));
 
Promise.all([
   fetchPeopleAPIRequestPromise(1),
   fetchPeopleAPIRequestPromise(2),
   fetchPeopleAPIRequestPromise(3)
 ]).then(values => {
   console.log(values);
 });

Existen muchos operators y pertenecen a distintas categorías, además se representan gráficamente usando diagramas de Marble. Lista de todos los operators
También disponemos de un “Operator Decision Tree”, a través del cual, a partir de una serie de preguntas, podemos obtener el operator que más se ajusta a nuestra solución:

A manera de conclusión puedo decir que de esta forma comencé a utiizar RxJs y a trabajar con Observables. Quizá en un escenario de peticiones http GET, de la cual esperamos un único valor, no se aprecia una gran ventaja para reemplazar el uso de Promise, pero creo que hacerlo es la forma en que Angular comenzó a fomentar el uso y aprendizaje de RxJs para luego traer otras librerías basadas en RxJs como es NgRx

NgRx nos ayuda a manejar el estado de nuestra aplicación de manera similar a Redux pero trabajando totalmente con Observables.

RxJs también tiene implementaciones en otros ámbitos como ser eventos del DOM. Por ejemplo, un objeto Observable que emite un evento cada vez que un usuario hace click en un botón. Para ello pueden probar el operator fromEvent.

¿Qué les parece RxJs? ¿Lo usan actualmente? ¿Reemplazarían el uso de Promise por Observable? ¿Lo usarían en una app con React?

Por Alexander Dedek,

Software Engineer en GM2.

Don’t Stop Here

More To Explore