パイプオペレータ
使い方
pipe()
メソッドにオペレーターを渡す
- それから
subscribe()
observable.pipe(
パイプオペレータ1(),
パイプオペレータ2(),
).subscribe(observer)
- 一個一個の値が、別々に順番にパイプを流れるイメージ
- 一つの値が末尾のオペレータを通過 => 次の値が最初のオペレータへ
分類
- 閲覧・加工:
tap, map
- フィルタ:
filter, take, takeLast, takeWhile, skip, skipLast, skipWhile
distinct, distinctUntilChanged
expand
- エラーハンドリング:
retry, catchError
- 時間:
delay, timeout, debounceTime
- 内側のObservable
tap(関数)
: 見るだけ
range(1, 3).pipe( // 1 2 3 =>
tap(x => {
if (x % 2 === 0) console.log(`偶数: ${x}`) // 偶数: 2
}),
).subscribe(console.log) // 1 2 3
map(関数)
: 加工
return
した値に変更: 加工後の値で後方のパイプへ
range(1, 3).pipe( // 1 2 3 =>
map(x => x * 2),
map(x => x * 3),
).subscribe(console.log) // 6 12 18
filter(関数)
: フィルタ条件
return
がtrue
のものだけ通す
false
になったら次のパイプには行かず、次の値のサイクルへ
range(1, 3).pipe( // 1 2 3 =>
filter(x => x % 2 === 1), // 奇数だけ通す
tap(x => console.log(`傍受: ${x}`)), // 2はここに到達しない
).subscribe(console.log) // 1 3
take(前半最大個数)
: 前半からの最大個数
- 最大個数までは通過させる
- 超過以降の値のサイクルへは行かず、
complete()
- 非同期は非同期のまま (
takeLast()
との違い)
// 同期
range(100, 3).pipe( // 100 101 102 =>
take(100), // 最大個数
).subscribe(console.log) // 100 101 102
console.log()
// 非同期
interval(500).pipe( // 0 1 2 3 ... (無限) =>
take(3),
).subscribe(console.log) // 0 1 2
(注意) take(100)
は、
- 「まず100個の値を次のオペレータに渡す」という意味ではない
- 「100個取るまで終わらせない」という意味ではない
- 「最大個数目が来た瞬間に
complete()
」という意味ではない
interval(500).pipe( // 0 1 2 3 ... (無限) =>
take(100), // <= 意味なし
tap(x => console.log(`傍受: ${x}`)), // 0 1 2
take(3), // 先にこの制限に達する
// 最大個数目の値も以降のオペレータを通る
map(x => x * 10)
).subscribe(console.log) // 0 10 20
takeLast(後半最大個数)
: 後半からの最大個数
- 「後半部分の何個」を取りたいとき
- 発行は逆順にはならない
非同期ものは末尾判明後、同期的に発行
interval(200).pipe( // 0 1 2 3 ... (無限) =>
take(7),
takeLast(5),
// 末尾判明後、同期的にnext(): インターバル効果が無くなる
).subscribe(console.log) // 2 3 4 5 6
console.log('ここは非同期だけど')
takeWhile(関数)
: 通過継続条件
interval(200).pipe( // 0 1 2 3 ... (無限) =>
tap(x => console.log(`傍受: ${x}`)), // 0 1 2 3
takeWhile(x => x < 3), // falseでcomplete()
map(x => x * 10),
).subscribe(console.log) // 0 10 20
skip(前半除外個数)
: 前半からの最大除外個数
- 除外の最大個数に達するまでは、すぐに次の値のサイクルへ
interval(200).pipe( // 0 1 2 3 ... (無限) =>
tap(x => console.log(`傍受: ${x}`)), // 0 1 2 3 4 5
skip(3),
take(3),
).subscribe(console.log) // 3 4 5
skipLast(後半除外個数)
: 後半からの最大除外個数
非同期は発行遅延あり
- 後半N個除外: 前半N+1個目が来たら1個目を発行
- 末尾判明が必須の
takeLast()
と違い、遅延はあるが非同期のまま
interval(100).pipe( // 0 1 2 3 ... (無限) =>
take(20),
skipLast(10), // 発行遅延: 11個目が来てからでないと、1個目発行できない
).subscribe(console.log) // 0 1 2 3 4 5 6 7 8 9
skipWhile(関数)
: 通過開始条件
interval(200).pipe( // 0 1 2 3 ... (無限) =>
tap(x => console.log(`傍受: ${x}`)), // 0 1 2 3 4 5
skipWhile(x => x < 3 || x > 3), // 初のfalse以降は通す
take(3),
).subscribe(console.log) // 3 4 5
retry(回数)
: やり直す
- エラーにせず、最初から やり直す回数
- 回数超過で
error()
range(1, 5).pipe(// 1 2 3 4 5 =>
tap(x => {
const num = x * Math.random()
if (num > 2) throw `エラー: ${num}`
}),
retry(3),
).subscribe(console.log, console.error, () => console.log('成功'))
catchError(関数)
- 関数: 継続するならObservableを
return
- 仮引数1: エラー => エラーにするなら
throw
- 仮引数2: Observable(当初と同じ) => 最初からやり直す場合
range(1, 5).pipe( // 1 2 3 4 5 =>
tap(x => {if (x > 3) throw 'エラー'}),
// 同じObservableで再トライ
catchError((error, observable) => observable),
take(5),
).subscribe(console.log, console.error) // 1 2 3 1 2
range(1, 5).pipe( // 1 2 3 4 5 =>
tap(x => {if (x > 3) throw x}),
catchError((error, observable) => {throw `エラー: ${error}`}), // エラー: 4
).subscribe(console.log, console.error) // 1 2 3
- 別のObservable返すと、以降は先行するパイプは使われない
interval(500).pipe( // 0 1 2 3 ... (無限) =>
tap(x => {if (x === 1) throw 'error'}),
take(10),
catchError((error, observable) => interval(500)), // 別物return
take(5),
).subscribe(console.log) // 0 0 1 2 3
delay(開始時期)
: 開始時期を遅らせる
- ミリ秒または
Date
- 各値が等しく遅延のため、各値の間隔は変わらない
range(1, 3).pipe( // 1 2 3 =>
delay(1000),
).subscribe(console.log) // (遅延) 1 2 3
console.log('非同期')
timeout(制限時間)
: タイムアウトでエラー
// 制限時間以内に標準入力 (1回リトライあり)
process.stdin.setEncoding('utf-8') // 標準入力に必要
const 制限時間 = 2000
fromEvent(process.stdin, 'data').pipe( // 標準入力待ち =>
take(2),
timeout(制限時間), // 入力それぞれで
catchError((error, observable) => {
console.log(`${制限時間}ミリ秒以内に入力`)
throw '時間切れ'
}),
retry(1)
).subscribe(
値 => console.log(`入力: ${値}`),
エラー => {console.error(エラー); process.exit(1)},
() => process.exit(0)
)
debounceTime(最低間隔)
: 最低間隔のある直近の値だけ通す
- 間隔未満の値は通さない
- 使用例: すばやく繰り返すイベント中は待機し、一息ついたタイミングを掴むなど
// 直前の入力から1秒以上の間隔があるものだけ通す
process.stdin.setEncoding('utf-8') // 標準入力に必要
fromEvent(process.stdin, 'data').pipe( // 標準入力待ち =>
debounceTime(1000),
take(3) // 3回まで入力
).subscribe(
値 => console.log(`入力: ${値}`),
console.error,
() => process.exit(0)
)
distinct()
: 重複を除外
from([1, 'a', 2, 2, 3, 'b', 3, 4, 'b']).pipe(
distinct()
).subscribe(console.log) // 1 a 2 3 b 4
distinctUntilChanged()
: 直前との重複を除外
from(['た','た','け','や','や','ぶ','ぶ','や','け','け','た']).pipe(
distinctUntilChanged()
).subscribe(console.log) // たけやぶやけた
expand(関数)
: 再帰的にループ
- 再帰的に適用したいObservableを
return
- ループから抜けるとき、空Observableの
EMPTY
// 偶数なら2で割り、奇数なら3倍して1足す、のループ
const start = 3001
of(start).pipe( // 3001 =>
expand(x => {
if (x === 1) return EMPTY // 1で終了
if (x % 2 === 0) return of(x / 2) // 偶数なら
return of(x * 3 + 1) // 奇数なら
}),
).subscribe(console.log)