diff options
| author | Pinapelz <yukais@pinapelz.com> | 2025-06-29 11:49:28 -0700 |
|---|---|---|
| committer | Pinapelz <yukais@pinapelz.com> | 2025-06-29 11:49:28 -0700 |
| commit | d55b767039605256c736166a942a9138e3eacfd7 (patch) | |
| tree | 947063b634c50d438a794325f13275e134aa5993 /node_modules/rxjs/src/internal/operators/mergeInternals.ts | |
| parent | 864ce67d89c77d8ef9c3361f80d619853abcf91c (diff) | |
remove dev node_modules (oops)
Diffstat (limited to 'node_modules/rxjs/src/internal/operators/mergeInternals.ts')
| -rw-r--r-- | node_modules/rxjs/src/internal/operators/mergeInternals.ts | 149 |
1 files changed, 0 insertions, 149 deletions
diff --git a/node_modules/rxjs/src/internal/operators/mergeInternals.ts b/node_modules/rxjs/src/internal/operators/mergeInternals.ts deleted file mode 100644 index dab3a2b..0000000 --- a/node_modules/rxjs/src/internal/operators/mergeInternals.ts +++ /dev/null @@ -1,149 +0,0 @@ -import { Observable } from '../Observable'; -import { innerFrom } from '../observable/innerFrom'; -import { Subscriber } from '../Subscriber'; -import { ObservableInput, SchedulerLike } from '../types'; -import { executeSchedule } from '../util/executeSchedule'; -import { createOperatorSubscriber } from './OperatorSubscriber'; - -/** - * A process embodying the general "merge" strategy. This is used in - * `mergeMap` and `mergeScan` because the logic is otherwise nearly identical. - * @param source The original source observable - * @param subscriber The consumer subscriber - * @param project The projection function to get our inner sources - * @param concurrent The number of concurrent inner subscriptions - * @param onBeforeNext Additional logic to apply before nexting to our consumer - * @param expand If `true` this will perform an "expand" strategy, which differs only - * in that it recurses, and the inner subscription must be schedule-able. - * @param innerSubScheduler A scheduler to use to schedule inner subscriptions, - * this is to support the expand strategy, mostly, and should be deprecated - */ -export function mergeInternals<T, R>( - source: Observable<T>, - subscriber: Subscriber<R>, - project: (value: T, index: number) => ObservableInput<R>, - concurrent: number, - onBeforeNext?: (innerValue: R) => void, - expand?: boolean, - innerSubScheduler?: SchedulerLike, - additionalFinalizer?: () => void -) { - // Buffered values, in the event of going over our concurrency limit - const buffer: T[] = []; - // The number of active inner subscriptions. - let active = 0; - // An index to pass to our accumulator function - let index = 0; - // Whether or not the outer source has completed. - let isComplete = false; - - /** - * Checks to see if we can complete our result or not. - */ - const checkComplete = () => { - // If the outer has completed, and nothing is left in the buffer, - // and we don't have any active inner subscriptions, then we can - // Emit the state and complete. - if (isComplete && !buffer.length && !active) { - subscriber.complete(); - } - }; - - // If we're under our concurrency limit, just start the inner subscription, otherwise buffer and wait. - const outerNext = (value: T) => (active < concurrent ? doInnerSub(value) : buffer.push(value)); - - const doInnerSub = (value: T) => { - // If we're expanding, we need to emit the outer values and the inner values - // as the inners will "become outers" in a way as they are recursively fed - // back to the projection mechanism. - expand && subscriber.next(value as any); - - // Increment the number of active subscriptions so we can track it - // against our concurrency limit later. - active++; - - // A flag used to show that the inner observable completed. - // This is checked during finalization to see if we should - // move to the next item in the buffer, if there is on. - let innerComplete = false; - - // Start our inner subscription. - innerFrom(project(value, index++)).subscribe( - createOperatorSubscriber( - subscriber, - (innerValue) => { - // `mergeScan` has additional handling here. For example - // taking the inner value and updating state. - onBeforeNext?.(innerValue); - - if (expand) { - // If we're expanding, then just recurse back to our outer - // handler. It will emit the value first thing. - outerNext(innerValue as any); - } else { - // Otherwise, emit the inner value. - subscriber.next(innerValue); - } - }, - () => { - // Flag that we have completed, so we know to check the buffer - // during finalization. - innerComplete = true; - }, - // Errors are passed to the destination. - undefined, - () => { - // During finalization, if the inner completed (it wasn't errored or - // cancelled), then we want to try the next item in the buffer if - // there is one. - if (innerComplete) { - // We have to wrap this in a try/catch because it happens during - // finalization, possibly asynchronously, and we want to pass - // any errors that happen (like in a projection function) to - // the outer Subscriber. - try { - // INNER SOURCE COMPLETE - // Decrement the active count to ensure that the next time - // we try to call `doInnerSub`, the number is accurate. - active--; - // If we have more values in the buffer, try to process those - // Note that this call will increment `active` ahead of the - // next conditional, if there were any more inner subscriptions - // to start. - while (buffer.length && active < concurrent) { - const bufferedValue = buffer.shift()!; - // Particularly for `expand`, we need to check to see if a scheduler was provided - // for when we want to start our inner subscription. Otherwise, we just start - // are next inner subscription. - if (innerSubScheduler) { - executeSchedule(subscriber, innerSubScheduler, () => doInnerSub(bufferedValue)); - } else { - doInnerSub(bufferedValue); - } - } - // Check to see if we can complete, and complete if so. - checkComplete(); - } catch (err) { - subscriber.error(err); - } - } - } - ) - ); - }; - - // Subscribe to our source observable. - source.subscribe( - createOperatorSubscriber(subscriber, outerNext, () => { - // Outer completed, make a note of it, and check to see if we can complete everything. - isComplete = true; - checkComplete(); - }) - ); - - // Additional finalization (for when the destination is torn down). - // Other finalization is added implicitly via subscription above. - return () => { - additionalFinalizer?.(); - }; -} |
