Observableを発行するObservable


普通の値を発行するObservable

Observableを発行するObservable


外側のObservable、内側のObservable

以降、外側、内側と略す。

map(関数)の関数で内側をreturn

const 外側 = interval(1000) // 0 1 2 ... (無限) =>
const 内側 = from(['a', 'b', 'c'])

外側.pipe(
  map(x => 内側),
).subscribe(内側 => 内側.subscribe(console.log))
// a b c (間) a b c (間) a b c ... (無限)

外側を高階(higher-order)Observableともいう


高階を1階建て(first-order)にするパイプオペレータ

〜All()

const 一階 = 外側.pipe(
  map(x => 内側),
  mergeAll(), // 1階建てに変換
)
一階.subscribe(console.log)

map(関数)と1階建てにする」をまとめたパイプオペレータ

〜Map()

const 一階 = 外側.pipe(
  mergeMap(x => 内側), // map()しつつ、1階建てに変換
)
一階.subscribe(console.log)

〜MapTo()

mergeMap(外側値 => 内側) // 関数引数
mergeMapTo(内側) // Observable引数

同様動作のため、以降、〜All()系や〜MapTo()系ではなく、〜Map()系を前提


内側のObservable: 〜Map()

switchMap, mergeMap(別名flatMap), concatMap, exhaustMap

外側Observable.pipe(
  普通のパイプオペレータ(値 => 普通の値),
  内側発生かつ一階建て化の何々Map(値 => 内側Observable),
).subscribe(observer)

基本動作


switchMap vs mergeMap vs concatMap vs exhaustMap

使い分けポイント

外側も内側も同期: 全部同じ結果

  1. 外側値1サイクル: 内側値1 -> 内側値2 ... -> 内側完了
  2. 外側値2サイクル: 内側値1 -> 内側値2 ... -> 内側完了
  3. ...
// 同様: mergeMap, concatMap, exhaustMap
range(1, 3).pipe( // 1 2 3 =>
  switchMap(x => of(`a${x}`, `A${x}`))
).subscribe(console.log) // a1 A1 a2 A2 a3 A3

外側が非同期になっても同様

const 外側 = from([
  timer(0).pipe(mapTo(0)), // 0秒後に発生する0
  timer(2000).pipe(mapTo(2)), // 2秒後に発生する2
  timer(1000).pipe(mapTo(1)),
  timer(3000).pipe(mapTo(3)),
]).pipe(mergeAll()) // 1階建て化

外側.pipe( // 0 1 2 3 =>
  // 同様: mergeMap, concatMap, exhaustMap
  switchMap(x => of(`a${x}`, `A${x}`))
).subscribe(console.log) // a0 A0 (間) a1 A1 (間) a2 A2 (間) a3 A3

内側が非同期

内側の開始(条件)が、


switchMap(関数): 内側切り替え

最新の非同期のみアクティブにしたいケース

外側も内側も非同期

process.stdin.setEncoding('utf-8') // 標準入力に必要
fromEvent(process.stdin, 'data').pipe( // 標準入力待ち =>
  filter(入力文字 => 入力文字.trim().length !== 0), // 何か文字入力
  switchMap(入力文字 => {
    return interval(1000).pipe(
      take(5),
      map(秒 => `${入力文字.trim()}: ${秒 + 1}`)
    )
  }),
).subscribe(console.log)

外側同期、内側非同期

// (意味なし) 外側同期、内側非同期
of('a', 'b', 'c').pipe( // a b c =>
  tap(x => console.log(x)), // a b c
  switchMap(x => interval(500).pipe(map(i => x + i))),
  take(5),
).subscribe(console.log)
// c0 (間) c1 (間) c2 (間) c3 (間) c4

mergeMap(関数): 内側同時並行

すべて非同期のまま完遂したいケース

外側も内側も非同期

process.stdin.setEncoding('utf-8') // 標準入力に必要
fromEvent(process.stdin, 'data').pipe( // 標準入力待ち =>
  filter(入力文字 => 入力文字.trim().length !== 0), // 何か文字入力
  mergeMap(入力文字 => {
    return interval(1000).pipe(
      take(5),
      map(秒 => `${入力文字.trim()}: ${秒 + 1}`)
    )
  }),
).subscribe(console.log)

外側同期、内側非同期

of('a', 'b', 'c').pipe( // a b c =>
  tap(x => console.log(x)), // a b c
  mergeMap(x => interval(500).pipe(map(i => x + i), take(3))),
).subscribe(console.log)
// a0 b0 c0 (間) a1 b1 c1 (間) a2 b2 c2

concatMap: 内側同期化

非同期を同期化・順番に一つずつ実行したいケース

外側も内側も非同期

process.stdin.setEncoding('utf-8') // 標準入力に必要
fromEvent(process.stdin, 'data').pipe( // 標準入力待ち =>
  filter(入力文字 => 入力文字.trim().length !== 0), // 何か文字入力
  concatMap(入力文字 => {
    return interval(1000).pipe(
      take(5),
      map(秒 => `${入力文字.trim()}: ${秒 + 1}`)
    )
  }),
).subscribe(console.log)

外側同期、内側非同期

of('a', 'b', 'c').pipe( // a b c =>
  tap(x => console.log(x)), // a b c
  concatMap(x => interval(500).pipe(map(i => x + i), take(3))),
).subscribe(console.log)
// a0 (間) a1 (間) a2 (間) b0 (間) b1 (間) b2 (間) c0 (間) c1 (間) c2

exhaustMap: 残存内側無敵

残存してるか不明な非同期を一つだけ起動できればいいケース

外側も内側も非同期

process.stdin.setEncoding('utf-8') // 標準入力に必要
fromEvent(process.stdin, 'data').pipe( // 標準入力待ち =>
  filter(入力文字 => 入力文字.trim().length !== 0), // 何か文字入力
  exhaustMap(入力文字 => {
    return interval(1000).pipe(
      take(5),
      map(秒 => `${入力文字.trim()}: ${秒 + 1}`)
    )
  }),
).subscribe(console.log)

外側同期、内側非同期

// (意味なし) 外側同期、内側非同期
of('a', 'b', 'c').pipe( // a b c =>
  tap(x => console.log(x)), // a b c
  exhaustMap(x => interval(500).pipe(map(i => x + i))),
  take(5),
).subscribe(console.log)
// a0 (間) a1 (間) a2 (間) a3 (間) a4