RxJS: インストール


前提


ドキュメント


インストール

npm install rxjs

# 確認
npm list rxjs

└── rxjs@6.5.2

概要

プログラミングの流れ(イメージ)

  1. Observable(観測可能)な予定を作成
  2. Observer(観測者)を作成
  3. subscribe(購読)する

Observables

将来の処理やデータの流れ(集まり)を表したオブジェクト。

Observer(Subscriber)

予定が実現・失敗したときにどうするかを表したオブジェクト。

予定の実現

observables.subscribe(observer)

オペレータ(Operators)

Observablesを加工したりフィルタリングしたりできる関数。

Subscription

subscribeメソッドの戻り値オブジェクト。


擬似コード

// 基本
observables.subscribe(observer)

// オペレータを使うケース
observables.pipe(
  オペレータ1(),
  オペレータ2()
).subscribe(observer)

準備: インポート

// Observablesが含まれる
const rxjs = require('rxjs')

// オペレータが含まれる
const operators = require('rxjs/operators')

以降、インポート省略して、このように使う。

// 例: Observableな予定データを作成できるof関数
const of = rxjs.of

// 例: Observablesをフィルタリングするfilter関数
const filter = operators.filter

Observablesの使い方: of関数

const of = rxjs.of
const observables = of(2, 3, 4)

// observablesはまだ実現していない
console.log(1)

// Observer: ただのオブジェクト
// あとで実現したとき、nextメソッドが呼ばれる決まり
const observer = {
  next: (値) => console.log('購読:', 値)
}

// ここで順に実現
observables.subscribe(observer)

console.log(5)

結果

1
購読: 2
購読: 3
購読: 4
5

コンストラクタでObservablesを作るケース

const Observable = rxjs.Observable

const observables = new Observable(observer => {
  observer.next(2)
  observer.next(3)
  observer.next(4)
})

of関数による下記と同じ。

const observables = of(2, 3, 4)

簡略化した書き方

const of = rxjs.of
of('あ', 'い', 'う').subscribe(値 => console.log('購読:', 値))

結果

購読: あ
購読: い
購読: う

Observer

三つの通知メソッドを使える(省略可)

  1. next(値): 各Observableの成功
  2. error(エラー): 失敗(エラー)、Observableは終了する
  3. complete(): 完了、引数なし

subscribeに一つのオブジェクトで渡す

observables.subscribe({
  next: (値) => console.log('成功:', 値),                                                                         
  error: (エラー) => console.error('エラー:', エラー),
  complete: () => console.log('完了')
})

subscribeにこの順番で引数として渡す(無名のコールバック)

observables.subscribe(
  (値) => console.log('成功:', 値),
  (エラー) => console.error('エラー:', エラー),
  () => console.log('完了')
)

そのまま出力するだけならobservables.subscribe(console.log)でよい。


error、completeメソッドでObservable終了

const Observable = rxjs.Observable
new Observable(observer => {
  observer.next('a')
  observer.error('e')
  // 以降は実行されない
  observer.next('b')
}).subscribe(
  (値) => console.log('成功:', 値),
  (エラー) => console.error('エラー:', エラー),
  () => console.log('完了')
)

結果

成功: a
エラー: e

from関数

const from = rxjs.from
from([1, 2, 3]).subscribe(console.log)

結果

1
2
3

Observablesを作る関数

of, from, range, interval, timer, pairs, zipなど多数ある。

// それぞれ const range = rxjs.range のようにして

// 範囲: 1から5
range(1, 5).subscribe(console.log)

// 500ミリ秒間隔で、0から1ずつ増える
interval(500).subscribe(console.log)
// 1秒待機してから同様
timer(1000, 500).subscribe(console.log)

// 複数のObservableをつなげる
merge(from([1, 2, 3]), from([4, 5, 6])).subscribe(console.log)

// オブジェクトを[key, value]の配列にする
pairs({ a: 1, b: 2 }).subscribe(console.log)

// 複数のObservableを順番に配列にする
zip(of('a', 'b', 'c'), of(1, 2, 3)).subscribe(console.log)
// pairsの例
[ 'a', 1 ]
[ 'b', 2 ]

// zipの例
[ 'a', 1 ]
[ 'b', 2 ]
[ 'c', 3 ]

予定のキャンセル

const interval = rxjs.interval

// 500ミリ秒ごとに、0から1ずつ増える数
const subscription = interval(500).subscribe(console.log)

// 2秒後にunsubscribe()で購読を止める
setTimeout(() => subscription.unsubscribe(), 2000)

実行結果

0
1
2

カスタムunsubscribeメソッド

const Observable = rxjs.Observable

const observable = new Observable(observer => {
  const id = setInterval(() => {
    console.log('a')
    observer.next('x')
  }, 500) // 500ミリ秒ごと処理

  // カスタムunsubscribe関数を戻す
  return function unsubscribe() {
    clearInterval(id)
  }
})

const subscription = observable.subscribe(console.log)
setTimeout(()=>subscription.unsubscribe(), 1100) // 1.1秒でキャンセル

実行結果

a
x
a
x