aboutsummaryrefslogtreecommitdiffstats
path: root/node_modules/rxjs/src/internal/operators/mergeInternals.ts
diff options
context:
space:
mode:
authorPinapelz <yukais@pinapelz.com>2025-06-29 11:49:28 -0700
committerPinapelz <yukais@pinapelz.com>2025-06-29 11:49:28 -0700
commitd55b767039605256c736166a942a9138e3eacfd7 (patch)
tree947063b634c50d438a794325f13275e134aa5993 /node_modules/rxjs/src/internal/operators/mergeInternals.ts
parent864ce67d89c77d8ef9c3361f80d619853abcf91c (diff)
remove dev node_modules (oops)
Diffstat (limited to 'node_modules/rxjs/src/internal/operators/mergeInternals.ts')
-rw-r--r--node_modules/rxjs/src/internal/operators/mergeInternals.ts149
1 files changed, 0 insertions, 149 deletions
diff --git a/node_modules/rxjs/src/internal/operators/mergeInternals.ts b/node_modules/rxjs/src/internal/operators/mergeInternals.ts
deleted file mode 100644
index dab3a2b..0000000
--- a/node_modules/rxjs/src/internal/operators/mergeInternals.ts
+++ /dev/null
@@ -1,149 +0,0 @@
-import { Observable } from '../Observable';
-import { innerFrom } from '../observable/innerFrom';
-import { Subscriber } from '../Subscriber';
-import { ObservableInput, SchedulerLike } from '../types';
-import { executeSchedule } from '../util/executeSchedule';
-import { createOperatorSubscriber } from './OperatorSubscriber';
-
-/**
- * A process embodying the general "merge" strategy. This is used in
- * `mergeMap` and `mergeScan` because the logic is otherwise nearly identical.
- * @param source The original source observable
- * @param subscriber The consumer subscriber
- * @param project The projection function to get our inner sources
- * @param concurrent The number of concurrent inner subscriptions
- * @param onBeforeNext Additional logic to apply before nexting to our consumer
- * @param expand If `true` this will perform an "expand" strategy, which differs only
- * in that it recurses, and the inner subscription must be schedule-able.
- * @param innerSubScheduler A scheduler to use to schedule inner subscriptions,
- * this is to support the expand strategy, mostly, and should be deprecated
- */
-export function mergeInternals<T, R>(
- source: Observable<T>,
- subscriber: Subscriber<R>,
- project: (value: T, index: number) => ObservableInput<R>,
- concurrent: number,
- onBeforeNext?: (innerValue: R) => void,
- expand?: boolean,
- innerSubScheduler?: SchedulerLike,
- additionalFinalizer?: () => void
-) {
- // Buffered values, in the event of going over our concurrency limit
- const buffer: T[] = [];
- // The number of active inner subscriptions.
- let active = 0;
- // An index to pass to our accumulator function
- let index = 0;
- // Whether or not the outer source has completed.
- let isComplete = false;
-
- /**
- * Checks to see if we can complete our result or not.
- */
- const checkComplete = () => {
- // If the outer has completed, and nothing is left in the buffer,
- // and we don't have any active inner subscriptions, then we can
- // Emit the state and complete.
- if (isComplete && !buffer.length && !active) {
- subscriber.complete();
- }
- };
-
- // If we're under our concurrency limit, just start the inner subscription, otherwise buffer and wait.
- const outerNext = (value: T) => (active < concurrent ? doInnerSub(value) : buffer.push(value));
-
- const doInnerSub = (value: T) => {
- // If we're expanding, we need to emit the outer values and the inner values
- // as the inners will "become outers" in a way as they are recursively fed
- // back to the projection mechanism.
- expand && subscriber.next(value as any);
-
- // Increment the number of active subscriptions so we can track it
- // against our concurrency limit later.
- active++;
-
- // A flag used to show that the inner observable completed.
- // This is checked during finalization to see if we should
- // move to the next item in the buffer, if there is on.
- let innerComplete = false;
-
- // Start our inner subscription.
- innerFrom(project(value, index++)).subscribe(
- createOperatorSubscriber(
- subscriber,
- (innerValue) => {
- // `mergeScan` has additional handling here. For example
- // taking the inner value and updating state.
- onBeforeNext?.(innerValue);
-
- if (expand) {
- // If we're expanding, then just recurse back to our outer
- // handler. It will emit the value first thing.
- outerNext(innerValue as any);
- } else {
- // Otherwise, emit the inner value.
- subscriber.next(innerValue);
- }
- },
- () => {
- // Flag that we have completed, so we know to check the buffer
- // during finalization.
- innerComplete = true;
- },
- // Errors are passed to the destination.
- undefined,
- () => {
- // During finalization, if the inner completed (it wasn't errored or
- // cancelled), then we want to try the next item in the buffer if
- // there is one.
- if (innerComplete) {
- // We have to wrap this in a try/catch because it happens during
- // finalization, possibly asynchronously, and we want to pass
- // any errors that happen (like in a projection function) to
- // the outer Subscriber.
- try {
- // INNER SOURCE COMPLETE
- // Decrement the active count to ensure that the next time
- // we try to call `doInnerSub`, the number is accurate.
- active--;
- // If we have more values in the buffer, try to process those
- // Note that this call will increment `active` ahead of the
- // next conditional, if there were any more inner subscriptions
- // to start.
- while (buffer.length && active < concurrent) {
- const bufferedValue = buffer.shift()!;
- // Particularly for `expand`, we need to check to see if a scheduler was provided
- // for when we want to start our inner subscription. Otherwise, we just start
- // are next inner subscription.
- if (innerSubScheduler) {
- executeSchedule(subscriber, innerSubScheduler, () => doInnerSub(bufferedValue));
- } else {
- doInnerSub(bufferedValue);
- }
- }
- // Check to see if we can complete, and complete if so.
- checkComplete();
- } catch (err) {
- subscriber.error(err);
- }
- }
- }
- )
- );
- };
-
- // Subscribe to our source observable.
- source.subscribe(
- createOperatorSubscriber(subscriber, outerNext, () => {
- // Outer completed, make a note of it, and check to see if we can complete everything.
- isComplete = true;
- checkComplete();
- })
- );
-
- // Additional finalization (for when the destination is torn down).
- // Other finalization is added implicitly via subscription above.
- return () => {
- additionalFinalizer?.();
- };
-}
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage