Subject: 複数のObserverにマルチキャスト
- Subject、BehaviorSubject、ReplaySubject
Subject
: マルチキャストできるObservable
複数のObserverを登録できる
subject.subscribe(observer)
: 登録するだけでまだ実現しない
subject.next(値)
でマルチキャスト実現
Observerでもある
- 他のObservableのObserverにもなる
他のObservable.subscribe(subject)
(これはこのとき実現)
複数のObserverを登録できる
next()
でマルチキャスト
- 個々のSubscription(
subscribe()
の戻り)ごとにunsubscribe()
できる
- 全体の
complete()
もできる
const Subject = rxjs.Subject
const subject = new Subject()
// 1つ目のObserverを登録
const subscriptionA = subject.subscribe(x => console.log('A', x))
console.log(0) // まだ実現していない
subject.next(1) // next()で実現
// 2つ目のObserverを登録
const subscriptionB = subject.subscribe(x => console.log('B', x))
subject.next(2) // マルチキャスト
subscriptionA.unsubscribe() // Aだけ受信停止
subject.next(3) // Bだけ実現
subject.complete() // 全体停止
subject.next(4) // なし
/*
0
A 1
A 2
B 2
B 3
*/
他のObservableのObserverにもなる
他のObservable.subscribe(subject)
const subject = new Subject()
// Observerを登録
subject.subscribe(x => console.log('A', x)) // 一つ目
subject.subscribe(x => console.log('B', x)) // 二つ目
// まだ実現していない
console.log(0)
// SubjectをObserverにする
// take(2)はそれぞれ2個ずつの意味
interval(500).pipe(take(2)).subscribe(subject) // マルチキャスト
console.log(1) // 非同期なのでこれが先
/*
0
1
A 0
B 0
A 1
B 1
*/
BehaviorSubject
: 直前値を発行できるSubject
コンストラクタに初期値を渡す
subscribe()
: 初期値または直前値が実現
next()
: 追加のマルチキャスト
getValue()
: 直前値を得る
// コンストラクタに初期値を渡す
const subject = new BehaviorSubject(1)
// 一つ目のObserver
// subscribe()で直前値発行(ここでは初期値)
subject.subscribe(x => console.log('A', x)) // A 1
// next()でも発行
subject.next(2) // A 2
console.log(subject.getValue()) // 2
// 二つ目のObserver
// 直前値発行、すぐに受信停止
subject.subscribe(x => console.log('B', x)).unsubscribe() // B 2
subject.next(3) // A 3
ReplaySubject
: 指定回数分の過去値を発行できるSubject
コンストラクタで、記憶回数を指定
subscribe()
: 過去値があれば実現
next()
: 追加のマルチキャスト
- 引数2で時間制限も可
- 各Observerへのパイプは別個のもの(共有せず)
- 全体停止と個別停止の区別
const subject = new ReplaySubject(2) // 2回分
// 1つ目のObserver
subject.subscribe(x => console.log('A', x))
// 2つ目のObserver
subject.pipe( // B個別のパイプ
tap(x => {if (x === 2) throw 'Bエラー'}),
).subscribe({
next: x => console.log('B', x),
error: console.error,
})
subject.next(1)
subject.next(2) // Bだけエラー停止
// 3つ目のObserver
subject.pipe( // C個別のパイプ
tap(x => {if (x === 3) subject.complete()}), // 全体停止
).subscribe(x => console.log('C', x))
subject.next(3) // 全体停止(先のAだけ発行後)
subject.next(4) // なし
/*
A 1
B 1
A 2
Bエラー
C 1
C 2
A 3
*/