作成オペレータ: Observableを作る関数
新規作成タイプ
- 既存オブジェクトから:
from, of, pairs
- 数列:
range, interval, timer
- その他:
EMPTY, throwError, defer, fromEvent
インプットObservableが複数のタイプ
- 合体:
merge, concat, zip, forkJoin
- 分裂:
partition
- 選出:
race
from(配列)
配列をObservableに
let log = [] // ログ用の配列
from(['a', 1, 2]).subscribe(値 => log.push(値))
log.push(3)
console.log(log) // [ 'a', 1, 2, 3 ]
// 3が末尾なことに注目
- Observerの
next()
メソッドは、まとめてではなく要素ごとに呼ばれる
- Observable内に非同期がない限り、
subscribe()
しても同期的 (以降同様)
of(...引数列)
引数列をObservableに
let log = [] // ログ用の配列
of('a', 1, 2).subscribe(値 => log.push(値))
log.push(3)
console.log(log) // [ 'a', 1, 2, 3 ]
pairs(オブジェクト)
オブジェクトをObservableに
- 各プロパティごとに
['key', value]
にして発行
const obj = {
id: 1,
名前: 'alice',
color: 'red'
}
pairs(obj).subscribe(console.log)
/*
[ 'id', 1 ]
[ '名前', 'alice' ]
[ 'color', 'red' ]
*/
range(初項, 個数)
: 数列
指定範囲の整数のObservable
let log = [] // ログ用の配列
// 100から3個
range(100, 3).subscribe(値 => log.push(値))
log.push('終わり')
console.log(log) // [ 100, 101, 102, '終わり' ]
- 小数など他の数列にしたいときは、オペレータで変換 (以降同様)
interval(間隔)
: 数列
インターバルのあるObservable
- 0から始まる整数
- 指定間隔(ミリ秒)ごとに順に発行
- 非同期:
subscribe()
以降のコードが先に進む
interval(500).subscribe(console.log) // 0 1 2 ...
console.log('非同期')
パイプオペレータを使う例
// 3の倍数と3の付く数だけ顔文字付き
interval(500).pipe(
take(40), // 39まで
filter(x => x > 0), // 0除外
map(x => {
if (x % 3 === 0 || /3/.test(x + '')) return x + ' (^o^)'
return x
}),
).subscribe(console.log)
/*
1
2
3 (^o^)
4
...
*/
timer(開始時期, 間隔)
: 数列
開始時期とインターバルのあるObservable
- 0から始まる整数
- 開始時期から、指定間隔(ミリ秒)ごとに順に発行
- 開始時期: ミリ秒または
Date
- 非同期
const 開始時期 = new Date(Date.now() + 1000) // 今から1秒後
timer(開始時期, 500).subscribe(console.log)
console.log('非同期')
EMPTY
: 空
値を発行しないObservable
EMPTY.subscribe({
next: console.log,
complete: () => console.log('complete') // これだけ
})
throwError('エラー')
: エラー
error()
だけするObservable
throwError('エラー').subscribe({
next: console.log,
error: エラー => console.error('error():', エラー), // これだけ
complete: () => console.log('complete')
})
defer(コールバック)
: 後から作られるObservable
subscribe()
のたびに新作のObservable
- 引数コールバック: Observableを
return
するもの
- 条件付きのObservableを繰り返し
subscribe()
したいとき
- 類似の
iif()
より複雑多数の条件分岐にできる
const xyOr12 = defer(() => {
// 確率0.5でどちらかのObservableを戻す
return Math.random() > 0.5 ? of('x', 'y') : range(1, 2)
})
xyOr12.subscribe(console.log) // 結果はそれぞれ
xyOr12.subscribe(console.log)
xyOr12.subscribe(console.log)
fromEvent(イベントターゲット, 'イベント名')
イベントと紐付ける
- イベントターゲット: ブラウザDOMのイベントターゲットやNode.jsの
EventEmitter
など
- イベント発生で発行
let log = [] // ログ用の配列
// Node.jsイベント
const EventEmitter = require('events')
const myEvent = new EventEmitter()
myEvent.on('倍返し', () => {})
myEvent.on('でかい', () => {})
fromEvent(myEvent, '倍返し')
.pipe(map(x => x * 2)).subscribe(値 => log.push(値))
fromEvent(myEvent, 'でかい')
.pipe(map(x => x.toUpperCase())).subscribe(値 => log.push(値))
// イベント発動
myEvent.emit('倍返し', 3)
myEvent.emit('倍返し', 8)
myEvent.emit('でかい', 'abc')
console.log(log) // [ 6, 16, 'ABC' ]
merge(...Observable)
: 合体
同期だけ順番維持
- 非同期でなければ、引数列の順番で発行
- オペレータの
merge
とは違う (以降同様)
let log = [] // ログ用の配列
// 同期のみ
merge(from([1, 2]), of(3, 4)).subscribe(値 => log.push(値))
log.push(5)
console.log(log) // [ 1, 2, 3, 4, 5 ]
同期と非同期のmerge()
log = []
// 非同期あり
// 0.5秒待機 => 0.5秒間隔で 0 1
const a = timer(500, 500).pipe(take(2), map(x => `a${x}`))
// 0.3秒間隔で 0 1
const b = interval(300).pipe(take(2), map(x => `b${x}`))
const c = of('c0', 'c1')
merge(a, b, c).subscribe({
next: 値 => log.push(値),
complete: () => console.log(log)
// [ 'c0', 'c1', 'b0', 'a0', 'b1', 'a1' ]
})
console.log('非同期')
concat(...Observable)
: 合体
非同期もObservable間の順番維持
- 非同期のインプットObservableがあっても
merge()
との違い
let log = [] // ログ用の配列
const a = timer(500, 500).pipe(take(2), map(x => `a${x}`))
const b = interval(300).pipe(take(2), map(x => `b${x}`))
const c = of('c0', 'c1')
// 同期: a => b => c
concat(a, b, c).subscribe({
next: 値 => log.push(値),
complete: () => console.log(log)
// [ 'a0', 'a1', 'b0', 'b1', 'c0', 'c1' ]
})
console.log('非同期')
zip(...Observable)
: 合体
同順番の値ごとに配列に
- 各Observableの同順番の値で構成する配列を、順に発行
- 1番目だけ集めて配列発行、2番目だけ集めて配列発行...
- 最も少ない要素数まで配列発行
- 配列内の順番は引数列の順番を維持(非同期あっても)
const a = timer(500, 500).pipe(map(x => `a${x}`))
const b = interval(300).pipe(take(4), map(x => `b${x}`))
const c = of('c0', 'c1', 'c2') // <= 最も少ない要素数
zip(a, b, c).subscribe(console.log)
console.log('非同期')
/*
非同期
[ 'a0', 'b0', 'c0' ]
[ 'a1', 'b1', 'c1' ]
[ 'a2', 'b2', 'c2' ]
*/
forkJoin(Observable[])
: 合体
それぞれの最後の値を一つの配列に
- インプットObservableのそれぞれ最後の値を格納した配列を一つだけ発行
- 引数の順番維持
- 引数を辞書型Objectにしたとき、最後の値を辞書型で発行
{キー: Observable}
=> {キー: 最後の値}
const a = timer(500, 500).pipe(take(2), map(x => `a${x}`))
const b = interval(10).pipe(take(5), map(x => `b${x}`))
const c = of('c0', 'c1')
// 配列インプット
forkJoin([a, b, c])
.subscribe(console.log) // [ 'a1', 'b4', 'c1' ]
// 辞書型インプット
forkJoin({a, b, c})
.subscribe(console.log) // { a: 'a1', b: 'b4', c: 'c1' }
console.log('非同期')
(注意) 発行条件
- すべてのインプットObservableが、それぞれ最低一つ値が実現し、それぞれcomplete()
// 値を発行せずcomplete()するものがある
forkJoin([EMPTY, of('x', 'y')]).subscribe(console.log)
// 終わらないものがある
forkJoin([interval(500), of('x', 'y')]).subscribe(console.log)
// エラーのものがある
forkJoin([throwError('エラー'), of('x', 'y')])
.subscribe(console.log, console.error) // エラー
// 終わらない
partition(Observable, 条件関数)
: 分裂
1つのObservableを2つに分裂
- 戻り値:
[Observable1, Observable2]
条件関数(値, インデックス)
- 条件が
true
で戻ればObservable1側に分類
let log = [] // ログ用の配列
const input = of('a', 'b', 0, 'c', 1, 2)
const [str, num] = partition(input, x => typeof x === 'string')
str.subscribe(値 => log.push(値))
num.subscribe(値 => log.push(値))
console.log(log) // [ 'a', 'b', 'c', 0, 1, 2 ]
race(...Observable)
: 選出
最も早いスタートのObservableを選出
const ランダム = () => Math.random() * 1000
const a = interval(ランダム()).pipe(map(x => `a: ${x}`))
const b = interval(ランダム()).pipe(map(x => `b: ${x}`))
const c = interval(ランダム()).pipe(map(x => `c: ${x}`))
// 最初に開始したもの
race(a, b, c).subscribe(console.log)