ObservableとPromiseの違い


Observable

Promise


いつ実行か

あとで実行のObservable

const observable1 = new Observable(observer => {
  console.log(2)
  observer.next(3)
})
const observable2 = of(4)
console.log(1)
observable1.subscribe(console.log) // 2 3
observable2.subscribe(console.log) // 4
// 順番: 1 2 3 4

すぐ実行のPromise

const promise = new Promise((resolve, reject) => {
  console.log(1)
  resolve(3)
}) // 1
console.log(2)
promise.then(console.log) // 3
// 順番: 1 2 3

同期か非同期か

同期も非同期もあるObservable

const observable1 = of(2) // 同期
console.log(1)
observable1.subscribe(console.log) // 2
console.log(3)

const observable2 = interval(1).pipe(take(1), mapTo(6)) // 非同期
console.log(4)
observable2.subscribe(console.log) // 6
console.log(5)
// 順番: 1 2 3 4 5 6

常に非同期のPromise

const promise = new Promise((resolve, reject) => {
  console.log(1)
  resolve(4)
}) // 1
console.log(2)
promise.then(console.log) // 4
console.log(3)
// 順番: 1 2 3 4

発行は何回か

何回も値を発行できるObservable

const observable1 = new Observable(observer => {
  observer.next(1)
  observer.next(2)
  observer.next(3)
})
const observable2 = from([4, 5])
observable1.subscribe(console.log) // 発行のたびにconsole.log(値)
observable2.subscribe(console.log) // 発行のたびにconsole.log(値)
/*
1
2
3
4
5
*/

発行1回のPromise

const promise = new Promise((resolve, reject) => {
  resolve(1)
  resolve(2) // 意味なし
})
promise.then(console.log) // 1

発行先はいくつか

マルチキャストできるObservable

const subject = new Subject() // マルチキャストできるObservable
subject.subscribe(x => console.log('A', x)) // 一つ目
subject.subscribe(x => console.log('B', x)) // 二つ目
subject.next(1) // next()でマルチキャスト
subject.next(2) // next()でマルチキャスト
/*
A 1
B 1
A 2
B 2
*/

マルチキャストなしのPromise

const promise = new Promise((resolve, reject) => {
  console.log('あれこれロジック')
  resolve(1)
})
promise.then(x => console.log('A', x))
// もう一度呼ぶのは正しくない使い方
promise.then(x => console.log('B', x))
/*
あれこれロジック
A 1
B 1
*/

キャンセルできるか

キャンセルできるObservable

const subscription = interval(500).subscribe(console.log)
// 2秒後にunsubscribe()で購読を止める
setTimeout(() => subscription.unsubscribe(), 2000)
// 0 1 2

キャンセルできないPromise


既存のPromiseをObservableに変換

Promise対応した作成オペレータ

from(Promise)

// Observableに変換、通常の値を発行
from(Promise.resolve(1)).pipe(tap(console.log)).subscribe(console.log) // 1

// Promiseのまま発行
of(Promise.resolve(2)).subscribe(console.log) // Promise { 2 }
from([Promise.resolve(2)]).subscribe(console.log) // Promise { 2 }
pairs({key: Promise.resolve(2)}).subscribe(console.log) // [ 'key', Promise { 2 } ]

defer(() => Promise)

const getPromise = () => {
  return new Promise((resolve, reject) => resolve(Math.random()))
}

const deferPromise = defer(getPromise) // new Promise()はまだ
const fromPromise = from(getPromise()) // new Promise()実行

// 繰り返し使える: new Promise()がそれぞれ発生
deferPromise.subscribe(console.log) // 結果はそれぞれ
deferPromise.subscribe(console.log) // 結果はそれぞれ

// from()では繰り返しは意味がない
fromPromise.subscribe(console.log) // 同じ
fromPromise.subscribe(console.log) // 同じ

forkJoin(配列|辞書型)

function asyncFunc(引数) { // 最長1秒で終わる非同期関数
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      console.log(`resolve()直前: ${引数}`)
      resolve(引数)
    }, Math.random() * 1000)
  })
}

const [a, b, c] = [asyncFunc('a'), timer(500).pipe(mapTo('b')), asyncFunc('c')]
const [e, f, g] = [asyncFunc('e'), timer(500).pipe(mapTo('f')), asyncFunc('g')]
// 配列インプット
forkJoin([a, b, c]).subscribe(console.log) // [ 'a', 'b', 'c' ]
// 辞書型インプット
forkJoin({e, f, g}).subscribe(console.log) // { e: 'e', f: 'f', g: 'g' }
console.log('同期化ではない')

同期化のやり方は?

pipe()で並べるObservable

既存のPromiseとObservableを同期化

// 0.2秒で配列に追加するPromiseやObservable
const p1 = arr => new Promise(r => setTimeout(() => r(arr.concat('p1')), 200))
const o2 = arr => timer(200).pipe(mapTo(arr.concat('o2')))
const p3 = arr => new Promise(r => setTimeout(() => r(arr.concat('p3')), 200))
const o4 = arr => timer(200).pipe(mapTo(arr.concat('o4')))

defer(() => p1([0])).pipe(
  tap(console.log),
  mergeMap(arr => o2(arr)), // 外側1個のためconcatMap()等も同様
  tap(console.log),
  mergeMap(arr => p3(arr)),
  tap(console.log),
  mergeMap(arr => o4(arr)),
).subscribe(console.log)
/*
[ 0, 'p1' ]
[ 0, 'p1', 'o2' ]
[ 0, 'p1', 'o2', 'p3' ]
[ 0, 'p1', 'o2', 'p3', 'o4' ]
*/

then()でチェインするPromise

// 1秒で終わる非同期関数
const p1 = 引数 => new Promise(r => setTimeout(() => r(引数), 1000))

// 合計3秒
p1(1).then(値 => {
  console.log(値)
  return p1(2) // Promiseを返す
}).then(値 => {
  console.log(値)
  return p1(3) // Promiseを返す
}).then(値 => {
  console.log(値)
})

(参考: 対比ケース)

Observableだけの単純同期化 (値の受け渡しなし)

function o1(引数) { // 1秒で終わる非同期関数
  return new Observable(observer => {
    setTimeout(() => {
      observer.next(引数)
      observer.complete() // 省略すると同期化にならない
    }, 1000)
  })
}

// 外側非同期を同期化: 合計3秒
concat(o1(1), o1(2), o1(3)).subscribe(console.log)
// 内側非同期を同期化: 合計3秒
from([1, 2, 3]).pipe(
  concatMap(x => o1(x))
).subscribe(console.log)
// 同時並行: 合計1秒
// 外側
merge(o1('a'), o1('b'), o1('c')).subscribe(console.log)
// 内側
from(['a', 'b', 'c']).pipe(mergeMap(x => o1(x))).subscribe(console.log)

Promise変換の注意

// 1秒で終わる非同期関数
const p1 = 引数 => new Promise(r => setTimeout(() => r(引数), 1000))

// 同期化にならない: 合計1秒
concat(p1(1), p1(2), p1(3)).subscribe(console.log)

// 外側非同期を同期化: 合計3秒、 ただし完全同期化はpipe()の方法へ
concat(
  defer(() => p1(4)), defer(() => p1(5)), defer(() => p1(6))
).subscribe(console.log)