operators/flatMap.js

  1. import { Observable } from '../Observable';
  2. import { passThroughNextObservable } from './passThroughNextObservable';
  3. import { onSubscriptionsComplete } from '../utilities/onSubscriptionsComplete';
  4. /**
  5. * Same as `map(obs$, mapCallback)` but will take the value of the callback and turn it from an observable to a value
  6. *
  7. * @memberof operators
  8. *
  9. * @param {Observable} source$
  10. * @param {Function} mapCallback
  11. * @returns {Observable}
  12. */
  13. export const flatMap = function (source$, mapCallback) {
  14. return new Observable((observer) => {
  15. let subscription = { isComplete: false };
  16. const nextSubscriptionList = [];
  17. const onComplete = () => onSubscriptionsComplete(
  18. [subscription, ...nextSubscriptionList],
  19. observer.complete
  20. );
  21. subscription = passThroughNextObservable(source$, mapCallback)
  22. .subscribe((nextValue$) => {
  23. const nextSubscription = nextValue$.subscribe(
  24. observer.next,
  25. observer.error,
  26. onComplete
  27. );
  28. nextSubscriptionList.push(nextSubscription);
  29. }, observer.error, onComplete);
  30. return () => {
  31. nextSubscriptionList.forEach((nextSub) => nextSub.unsubscribe());
  32. subscription.unsubscribe();
  33. };
  34. });
  35. };
  36. Observable.flatMap = flatMap;
  37. Observable.prototype.flatMap = function (mapCallback) {
  38. return flatMap(this, mapCallback);
  39. };