diff options
| author | Pinapelz <yukais@pinapelz.com> | 2025-06-28 17:26:46 -0700 |
|---|---|---|
| committer | Pinapelz <yukais@pinapelz.com> | 2025-06-28 17:43:56 -0700 |
| commit | e4fa1e69e7ebfb627c7198fd1a9881e9327ec4d4 (patch) | |
| tree | 06284a538a6008eca75051399e47db4e5d50301c /node_modules/rxjs/src/internal/operators/exhaustMap.ts | |
initial commit: scaffolding
Diffstat (limited to 'node_modules/rxjs/src/internal/operators/exhaustMap.ts')
| -rw-r--r-- | node_modules/rxjs/src/internal/operators/exhaustMap.ts | 100 |
1 files changed, 100 insertions, 0 deletions
diff --git a/node_modules/rxjs/src/internal/operators/exhaustMap.ts b/node_modules/rxjs/src/internal/operators/exhaustMap.ts new file mode 100644 index 0000000..bf66683 --- /dev/null +++ b/node_modules/rxjs/src/internal/operators/exhaustMap.ts @@ -0,0 +1,100 @@ +import { Observable } from '../Observable'; +import { Subscriber } from '../Subscriber'; +import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; +import { map } from './map'; +import { innerFrom } from '../observable/innerFrom'; +import { operate } from '../util/lift'; +import { createOperatorSubscriber } from './OperatorSubscriber'; + +/* tslint:disable:max-line-length */ +export function exhaustMap<T, O extends ObservableInput<any>>( + project: (value: T, index: number) => O +): OperatorFunction<T, ObservedValueOf<O>>; +/** @deprecated The `resultSelector` parameter will be removed in v8. Use an inner `map` instead. Details: https://rxjs.dev/deprecations/resultSelector */ +export function exhaustMap<T, O extends ObservableInput<any>>( + project: (value: T, index: number) => O, + resultSelector: undefined +): OperatorFunction<T, ObservedValueOf<O>>; +/** @deprecated The `resultSelector` parameter will be removed in v8. Use an inner `map` instead. Details: https://rxjs.dev/deprecations/resultSelector */ +export function exhaustMap<T, I, R>( + project: (value: T, index: number) => ObservableInput<I>, + resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R +): OperatorFunction<T, R>; +/* tslint:enable:max-line-length */ + +/** + * Projects each source value to an Observable which is merged in the output + * Observable only if the previous projected Observable has completed. + * + * <span class="informal">Maps each value to an Observable, then flattens all of + * these inner Observables using {@link exhaustAll}.</span> + * + *  + * + * Returns an Observable that emits items based on applying a function that you + * supply to each item emitted by the source Observable, where that function + * returns an (so-called "inner") Observable. When it projects a source value to + * an Observable, the output Observable begins emitting the items emitted by + * that projected Observable. However, `exhaustMap` ignores every new projected + * Observable if the previous projected Observable has not yet completed. Once + * that one completes, it will accept and flatten the next projected Observable + * and repeat this process. + * + * ## Example + * + * Run a finite timer for each click, only if there is no currently active timer + * + * ```ts + * import { fromEvent, exhaustMap, interval, take } from 'rxjs'; + * + * const clicks = fromEvent(document, 'click'); + * const result = clicks.pipe( + * exhaustMap(() => interval(1000).pipe(take(5))) + * ); + * result.subscribe(x => console.log(x)); + * ``` + * + * @see {@link concatMap} + * @see {@link exhaust} + * @see {@link mergeMap} + * @see {@link switchMap} + * + * @param project A function that, when applied to an item emitted by the source + * Observable, returns an Observable. + * @return A function that returns an Observable containing projected + * Observables of each item of the source, ignoring projected Observables that + * start before their preceding Observable has completed. + */ +export function exhaustMap<T, R, O extends ObservableInput<any>>( + project: (value: T, index: number) => O, + resultSelector?: (outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number) => R +): OperatorFunction<T, ObservedValueOf<O> | R> { + if (resultSelector) { + // DEPRECATED PATH + return (source: Observable<T>) => + source.pipe(exhaustMap((a, i) => innerFrom(project(a, i)).pipe(map((b: any, ii: any) => resultSelector(a, b, i, ii))))); + } + return operate((source, subscriber) => { + let index = 0; + let innerSub: Subscriber<T> | null = null; + let isComplete = false; + source.subscribe( + createOperatorSubscriber( + subscriber, + (outerValue) => { + if (!innerSub) { + innerSub = createOperatorSubscriber(subscriber, undefined, () => { + innerSub = null; + isComplete && subscriber.complete(); + }); + innerFrom(project(outerValue, index++)).subscribe(innerSub); + } + }, + () => { + isComplete = true; + !innerSub && subscriber.complete(); + } + ) + ); + }); +} |
