パイプオペレータ


使い方

observable.pipe(
  パイプオペレータ1(),
  パイプオペレータ2(),
).subscribe(observer)

分類


tap(関数): 見るだけ

range(1, 3).pipe( // 1 2 3 =>
  tap(x => {
    if (x % 2 === 0) console.log(`偶数: ${x}`) // 偶数: 2
  }),
).subscribe(console.log) // 1 2 3

map(関数): 加工

range(1, 3).pipe( // 1 2 3 =>
  map(x => x * 2),
  map(x => x * 3),
).subscribe(console.log) // 6 12 18

filter(関数): フィルタ条件

range(1, 3).pipe( // 1 2 3 =>
  filter(x => x % 2 === 1), // 奇数だけ通す
  tap(x => console.log(`傍受: ${x}`)), // 2はここに到達しない
).subscribe(console.log) // 1 3

take(前半最大個数): 前半からの最大個数

// 同期
range(100, 3).pipe( // 100 101 102 =>
  take(100), // 最大個数
).subscribe(console.log) // 100 101 102

console.log()

// 非同期
interval(500).pipe( // 0 1 2 3 ... (無限) =>
  take(3),
).subscribe(console.log) // 0 1 2

(注意) take(100)は、

interval(500).pipe( // 0 1 2 3 ... (無限) =>
  take(100), // <= 意味なし
  tap(x => console.log(`傍受: ${x}`)), // 0 1 2
  take(3), // 先にこの制限に達する
  // 最大個数目の値も以降のオペレータを通る
  map(x => x * 10)
).subscribe(console.log) // 0 10 20

takeLast(後半最大個数): 後半からの最大個数

非同期ものは末尾判明後、同期的に発行

interval(200).pipe( // 0 1 2 3 ... (無限) =>
  take(7),
  takeLast(5),
  // 末尾判明後、同期的にnext(): インターバル効果が無くなる
).subscribe(console.log) // 2 3 4 5 6
console.log('ここは非同期だけど')

takeWhile(関数): 通過継続条件

interval(200).pipe( // 0 1 2 3 ... (無限) =>
  tap(x => console.log(`傍受: ${x}`)), // 0 1 2 3
  takeWhile(x => x < 3), // falseでcomplete()
  map(x => x * 10),
).subscribe(console.log) // 0 10 20

skip(前半除外個数): 前半からの最大除外個数

interval(200).pipe( // 0 1 2 3 ... (無限) =>
  tap(x => console.log(`傍受: ${x}`)), // 0 1 2 3 4 5
  skip(3),
  take(3),
).subscribe(console.log) // 3 4 5

skipLast(後半除外個数): 後半からの最大除外個数

非同期は発行遅延あり

interval(100).pipe( // 0 1 2 3 ... (無限) =>
  take(20),
  skipLast(10), // 発行遅延: 11個目が来てからでないと、1個目発行できない
).subscribe(console.log) // 0 1 2 3 4 5 6 7 8 9

skipWhile(関数): 通過開始条件

interval(200).pipe( // 0 1 2 3 ... (無限) =>
  tap(x => console.log(`傍受: ${x}`)), // 0 1 2 3 4 5
  skipWhile(x => x < 3 || x > 3), // 初のfalse以降は通す
  take(3),
).subscribe(console.log) // 3 4 5

retry(回数): やり直す

range(1, 5).pipe(// 1 2 3 4 5 =>
  tap(x => {
    const num = x * Math.random()
    if (num > 2) throw `エラー: ${num}`
  }),
  retry(3),
).subscribe(console.log, console.error, () => console.log('成功'))

catchError(関数)

range(1, 5).pipe( // 1 2 3 4 5 =>
  tap(x => {if (x > 3) throw 'エラー'}),
  // 同じObservableで再トライ
  catchError((error, observable) => observable),
  take(5),
).subscribe(console.log, console.error) // 1 2 3 1 2
range(1, 5).pipe( // 1 2 3 4 5 =>
  tap(x => {if (x > 3) throw x}),
  catchError((error, observable) => {throw `エラー: ${error}`}), // エラー: 4
).subscribe(console.log, console.error) // 1 2 3

delay(開始時期): 開始時期を遅らせる

range(1, 3).pipe( // 1 2 3 =>
  delay(1000),
).subscribe(console.log) // (遅延) 1 2 3
console.log('非同期')

timeout(制限時間): タイムアウトでエラー

// 制限時間以内に標準入力 (1回リトライあり)
process.stdin.setEncoding('utf-8') // 標準入力に必要
const 制限時間 = 2000
fromEvent(process.stdin, 'data').pipe( // 標準入力待ち =>
  take(1),
  timeout(制限時間),
  catchError((error, observable) => {
    console.log(`${制限時間}ミリ秒以内に入力`)
    throw '時間切れ'
  }),
  retry(1)
).subscribe(
  値 => console.log(`入力: ${値}`),
  エラー => {console.error(エラー); process.exit(1)},
  () => process.exit(0)
)

debounceTime(最低間隔): 最低間隔のある直近の値だけ通す

// 直前の入力から1秒以上の間隔があるものだけ通す
process.stdin.setEncoding('utf-8') // 標準入力に必要
fromEvent(process.stdin, 'data').pipe( // 標準入力待ち =>
  debounceTime(1000),
  take(3) // 3回まで入力
).subscribe(
  値 => console.log(`入力: ${値}`),
  console.error,
  () => process.exit(0)
)

distinct(): 重複を除外

from([1, 'a', 2, 2, 3, 'b', 3, 4, 'b']).pipe(
  distinct()
).subscribe(console.log) // 1 a 2 3 b 4

distinctUntilChanged(): 直前との重複を除外

from(['た','た','け','や','や','ぶ','ぶ','や','け','け','た']).pipe(
  distinctUntilChanged()
).subscribe(console.log)  // たけやぶやけた

expand(関数): 再帰的にループ

// 偶数なら2で割り、奇数なら3倍して1足す、のループ
const start = 3001
of(start).pipe( // 3001 =>
  expand(x => {
    if (x === 1) return EMPTY // 1で終了
    if (x % 2 === 0) return of(x / 2) // 偶数なら
    return of(x * 3 + 1) // 奇数なら
  }),
).subscribe(console.log)