Subject: 複数のObserverにマルチキャスト


Subject: マルチキャストできるObservable

複数のObserverを登録できる

Observerでもある


複数のObserverを登録できる

const Subject = rxjs.Subject
const subject = new Subject()

// 1つ目のObserverを登録
const subscriptionA = subject.subscribe(x => console.log('A', x))
console.log(0) // まだ実現していない
subject.next(1) // next()で実現

// 2つ目のObserverを登録
const subscriptionB = subject.subscribe(x => console.log('B', x))
subject.next(2) // マルチキャスト

subscriptionA.unsubscribe() // Aだけ受信停止
subject.next(3) // Bだけ実現
subject.complete() // 全体停止
subject.next(4) // なし

/*
0
A 1
A 2
B 2
B 3
*/

他のObservableのObserverにもなる

他のObservable.subscribe(subject)

const subject = new Subject()
// Observerを登録
subject.subscribe(x => console.log('A', x)) // 一つ目
subject.subscribe(x => console.log('B', x)) // 二つ目
// まだ実現していない
console.log(0)

// SubjectをObserverにする
// take(2)はそれぞれ2個ずつの意味
interval(500).pipe(take(2)).subscribe(subject) // マルチキャスト
console.log(1) // 非同期なのでこれが先

/*
0
1
A 0
B 0
A 1
B 1
*/

BehaviorSubject: 直前値を発行できるSubject

コンストラクタに初期値を渡す

// コンストラクタに初期値を渡す
const subject = new BehaviorSubject(1)

// 一つ目のObserver
// subscribe()で直前値発行(ここでは初期値)
subject.subscribe(x => console.log('A', x)) // A 1
// next()でも発行
subject.next(2) // A 2
console.log(subject.getValue()) // 2

// 二つ目のObserver
// 直前値発行、すぐに受信停止
subject.subscribe(x => console.log('B', x)).unsubscribe() // B 2
subject.next(3) // A 3

ReplaySubject: 指定回数分の過去値を発行できるSubject

コンストラクタで、記憶回数を指定

const subject = new ReplaySubject(2) // 2回分

// 1つ目のObserver
subject.subscribe(x => console.log('A', x))

// 2つ目のObserver
subject.pipe( // B個別のパイプ
  tap(x => {if (x === 2) throw 'Bエラー'}),
).subscribe({
  next: x => console.log('B', x),
  error: console.error,
})

subject.next(1)
subject.next(2) // Bだけエラー停止

// 3つ目のObserver
subject.pipe( // C個別のパイプ
  tap(x => {if (x === 3) subject.complete()}), // 全体停止
).subscribe(x => console.log('C', x))

subject.next(3) // 全体停止(先のAだけ発行後)
subject.next(4) // なし

/*
A 1
B 1
A 2
Bエラー
C 1
C 2
A 3
*/