console
const source$ = Rx.Observable.interval(1000).take(3);
const observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
const observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
const subject = {
observers: [],
addObserver: function(observer) {
this.observers.push(observer)
},
next: function(value) {
this.observers.forEach(o => o.next(value))
},
error: function(error){
this.observers.forEach(o => o.error(error))
},
complete: function() {
this.observers.forEach(o => o.complete())
}
}
subject.addObserver(observerA)
source$.subscribe(subject);
setTimeout(() => {
subject.addObserver(observerB);
}, 1000);