// RxJS v6+import { Subject, interval } from'rxjs';import { take, tap, multicast, mapTo } from'rxjs/operators';//emit every 2 seconds, take 5constsource=interval(2000).pipe(take(5));constexample=source.pipe(//since we are multicasting below, side effects will be executed oncetap(() =>console.log('Side Effect #1')),mapTo('Result!'));//subscribe subject to source upon connect()constmulti=example.pipe(multicast(() =>newSubject()));/* subscribers will share source output: "Side Effect #1" "Result!" "Result!" ...*/constsubscriberOne=multi.subscribe(val =>console.log(val));constsubscriberTwo=multi.subscribe(val =>console.log(val));//subscribe subject to sourcemulti.connect();
// RxJS v6+import { interval, ReplaySubject } from'rxjs';import { take, multicast, tap, mapTo } from'rxjs/operators';//emit every 2 seconds, take 5constsource=interval(2000).pipe(take(5));//example with ReplaySubjectconstexample=source.pipe(//since we are multicasting below, side effects will be executed oncetap(_ =>console.log('Side Effect #2')),mapTo('Result Two!'));//can use any type of subjectconstmulti=example.pipe(multicast(() =>newReplaySubject(5)));//subscribe subject to sourcemulti.connect();setTimeout(() => {/* subscriber will receieve all previous values on subscription because of ReplaySubject */constsubscriber=multi.subscribe(val =>console.group(val));},5000);