diff options
Diffstat (limited to 'node_modules/rxjs/dist/esm/internal/operators/mergeInternals.js')
| -rw-r--r-- | node_modules/rxjs/dist/esm/internal/operators/mergeInternals.js | 58 |
1 files changed, 58 insertions, 0 deletions
diff --git a/node_modules/rxjs/dist/esm/internal/operators/mergeInternals.js b/node_modules/rxjs/dist/esm/internal/operators/mergeInternals.js new file mode 100644 index 0000000..f387656 --- /dev/null +++ b/node_modules/rxjs/dist/esm/internal/operators/mergeInternals.js @@ -0,0 +1,58 @@ +import { innerFrom } from '../observable/innerFrom'; +import { executeSchedule } from '../util/executeSchedule'; +import { createOperatorSubscriber } from './OperatorSubscriber'; +export function mergeInternals(source, subscriber, project, concurrent, onBeforeNext, expand, innerSubScheduler, additionalFinalizer) { + const buffer = []; + let active = 0; + let index = 0; + let isComplete = false; + const checkComplete = () => { + if (isComplete && !buffer.length && !active) { + subscriber.complete(); + } + }; + const outerNext = (value) => (active < concurrent ? doInnerSub(value) : buffer.push(value)); + const doInnerSub = (value) => { + expand && subscriber.next(value); + active++; + let innerComplete = false; + innerFrom(project(value, index++)).subscribe(createOperatorSubscriber(subscriber, (innerValue) => { + onBeforeNext === null || onBeforeNext === void 0 ? void 0 : onBeforeNext(innerValue); + if (expand) { + outerNext(innerValue); + } + else { + subscriber.next(innerValue); + } + }, () => { + innerComplete = true; + }, undefined, () => { + if (innerComplete) { + try { + active--; + while (buffer.length && active < concurrent) { + const bufferedValue = buffer.shift(); + if (innerSubScheduler) { + executeSchedule(subscriber, innerSubScheduler, () => doInnerSub(bufferedValue)); + } + else { + doInnerSub(bufferedValue); + } + } + checkComplete(); + } + catch (err) { + subscriber.error(err); + } + } + })); + }; + source.subscribe(createOperatorSubscriber(subscriber, outerNext, () => { + isComplete = true; + checkComplete(); + })); + return () => { + additionalFinalizer === null || additionalFinalizer === void 0 ? void 0 : additionalFinalizer(); + }; +} +//# sourceMappingURL=mergeInternals.js.map
\ No newline at end of file |
