作成オペレータ: Observableを作る関数


新規作成タイプ

インプットObservableが複数のタイプ


from(配列)

配列をObservableに

let log = [] // ログ用の配列

from(['a', 1, 2]).subscribe(値 => log.push(値))
log.push(3)

console.log(log) // [ 'a', 1, 2, 3 ]
// 3が末尾なことに注目

of(...引数列)

引数列をObservableに

let log = [] // ログ用の配列

of('a', 1, 2).subscribe(値 => log.push(値))
log.push(3)

console.log(log) // [ 'a', 1, 2, 3 ]

pairs(オブジェクト)

オブジェクトをObservableに

const obj = {
  id: 1,
  名前: 'alice',
  color: 'red'
}
pairs(obj).subscribe(console.log)
/*
[ 'id', 1 ]
[ '名前', 'alice' ]
[ 'color', 'red' ]
*/

range(初項, 個数): 数列

指定範囲の整数のObservable

let log = [] // ログ用の配列

// 100から3個
range(100, 3).subscribe(値 => log.push(値))
log.push('終わり')

console.log(log) // [ 100, 101, 102, '終わり' ]

interval(間隔): 数列

インターバルのあるObservable

interval(500).subscribe(console.log) // 0 1 2 ...
console.log('非同期')

パイプオペレータを使う例

// 3の倍数と3の付く数だけ顔文字付き
interval(500).pipe(
  take(40), // 39まで
  filter(x => x > 0), // 0除外
  map(x => {
    if (x % 3 === 0 || /3/.test(x + '')) return x + ' (^o^)'
    return x
  }),
).subscribe(console.log)

/*
1
2
3 (^o^)
4
...
*/

timer(開始時期, 間隔): 数列

開始時期とインターバルのあるObservable

const 開始時期 = new Date(Date.now() + 1000) // 今から1秒後
timer(開始時期, 500).subscribe(console.log)
console.log('非同期')

EMPTY: 空

値を発行しないObservable

EMPTY.subscribe({
  next: console.log,
  complete: () => console.log('complete') // これだけ
})

throwError('エラー'): エラー

error()だけするObservable

throwError('エラー').subscribe({
  next: console.log,
  error: エラー => console.error('error():', エラー), // これだけ
  complete: () => console.log('complete')
})

defer(コールバック): 後から作られるObservable

subscribe()のたびに新作のObservable

const xyOr12 = defer(() => {
  // 確率0.5でどちらかのObservableを戻す
  return Math.random() > 0.5 ? of('x', 'y') : range(1, 2)
})

xyOr12.subscribe(console.log) // 結果はそれぞれ
xyOr12.subscribe(console.log)
xyOr12.subscribe(console.log)

fromEvent(イベントターゲット, 'イベント名')

イベントと紐付ける

let log = [] // ログ用の配列

// Node.jsイベント
const EventEmitter = require('events')
const myEvent = new EventEmitter()
myEvent.on('倍返し', () => {})
myEvent.on('でかい', () => {})

fromEvent(myEvent, '倍返し')
  .pipe(map(x => x * 2)).subscribe(値 => log.push(値))
fromEvent(myEvent, 'でかい')
  .pipe(map(x => x.toUpperCase())).subscribe(値 => log.push(値))

// イベント発動
myEvent.emit('倍返し', 3)
myEvent.emit('倍返し', 8)
myEvent.emit('でかい', 'abc')
console.log(log) // [ 6, 16, 'ABC' ]

merge(...Observable): 合体

同期だけ順番維持

let log = [] // ログ用の配列

// 同期のみ
merge(from([1, 2]), of(3, 4)).subscribe(値 => log.push(値))
log.push(5)
console.log(log) // [ 1, 2, 3, 4, 5 ]

同期と非同期のmerge()

log = []
// 非同期あり
// 0.5秒待機 => 0.5秒間隔で 0 1
const a = timer(500, 500).pipe(take(2), map(x => `a${x}`))
// 0.3秒間隔で 0 1
const b = interval(300).pipe(take(2), map(x => `b${x}`))
const c = of('c0', 'c1')

merge(a, b, c).subscribe({
  next: 値 => log.push(値),
  complete: () => console.log(log)
  // [ 'c0', 'c1', 'b0', 'a0', 'b1', 'a1' ]
})
console.log('非同期')

concat(...Observable): 合体

非同期もObservable間の順番維持

let log = [] // ログ用の配列

const a = timer(500, 500).pipe(take(2), map(x => `a${x}`))
const b = interval(300).pipe(take(2), map(x => `b${x}`))
const c = of('c0', 'c1')

// 同期: a => b => c
concat(a, b, c).subscribe({
  next: 値 => log.push(値),
  complete: () => console.log(log)
  // [ 'a0', 'a1', 'b0', 'b1', 'c0', 'c1' ]
})
console.log('非同期')

zip(...Observable): 合体

同順番の値ごとに配列に

const a = timer(500, 500).pipe(map(x => `a${x}`))
const b = interval(300).pipe(take(4), map(x => `b${x}`))
const c = of('c0', 'c1', 'c2') // <= 最も少ない要素数

zip(a, b, c).subscribe(console.log)

console.log('非同期')
/*
非同期
[ 'a0', 'b0', 'c0' ]
[ 'a1', 'b1', 'c1' ]
[ 'a2', 'b2', 'c2' ]
*/

forkJoin(Observable[]): 合体

それぞれの最後の値を一つの配列に

const a = timer(500, 500).pipe(take(2), map(x => `a${x}`))
const b = interval(10).pipe(take(5), map(x => `b${x}`))
const c = of('c0', 'c1')

// 配列インプット
forkJoin([a, b, c])
  .subscribe(console.log) // [ 'a1', 'b4', 'c1' ]

// 辞書型インプット
forkJoin({a, b, c})
  .subscribe(console.log) // { a: 'a1', b: 'b4', c: 'c1' }

console.log('非同期')

(注意) 発行条件

// 値を発行せずcomplete()するものがある
forkJoin([EMPTY, of('x', 'y')]).subscribe(console.log)

// 終わらないものがある
forkJoin([interval(500), of('x', 'y')]).subscribe(console.log)

// エラーのものがある
forkJoin([throwError('エラー'), of('x', 'y')])
  .subscribe(console.log, console.error) // エラー
// 終わらない

partition(Observable, 条件関数): 分裂

1つのObservableを2つに分裂

let log = [] // ログ用の配列

const input = of('a', 'b', 0, 'c', 1, 2)
const [str, num] = partition(input, x => typeof x === 'string')

str.subscribe(値 => log.push(値))
num.subscribe(値 => log.push(値))
console.log(log) // [ 'a', 'b', 'c', 0, 1, 2 ]

race(...Observable): 選出

最も早いスタートのObservableを選出

const ランダム = () => Math.random() * 1000
const a = interval(ランダム()).pipe(map(x => `a: ${x}`))
const b = interval(ランダム()).pipe(map(x => `b: ${x}`))
const c = interval(ランダム()).pipe(map(x => `c: ${x}`))

// 最初に開始したもの
race(a, b, c).subscribe(console.log)