diff options
Diffstat (limited to 'node_modules/rxjs/dist/esm/internal/operators/bufferTime.js')
| -rw-r--r-- | node_modules/rxjs/dist/esm/internal/operators/bufferTime.js | 61 |
1 files changed, 61 insertions, 0 deletions
diff --git a/node_modules/rxjs/dist/esm/internal/operators/bufferTime.js b/node_modules/rxjs/dist/esm/internal/operators/bufferTime.js new file mode 100644 index 0000000..f5b61b0 --- /dev/null +++ b/node_modules/rxjs/dist/esm/internal/operators/bufferTime.js @@ -0,0 +1,61 @@ +import { Subscription } from '../Subscription'; +import { operate } from '../util/lift'; +import { createOperatorSubscriber } from './OperatorSubscriber'; +import { arrRemove } from '../util/arrRemove'; +import { asyncScheduler } from '../scheduler/async'; +import { popScheduler } from '../util/args'; +import { executeSchedule } from '../util/executeSchedule'; +export function bufferTime(bufferTimeSpan, ...otherArgs) { + var _a, _b; + const scheduler = (_a = popScheduler(otherArgs)) !== null && _a !== void 0 ? _a : asyncScheduler; + const bufferCreationInterval = (_b = otherArgs[0]) !== null && _b !== void 0 ? _b : null; + const maxBufferSize = otherArgs[1] || Infinity; + return operate((source, subscriber) => { + let bufferRecords = []; + let restartOnEmit = false; + const emit = (record) => { + const { buffer, subs } = record; + subs.unsubscribe(); + arrRemove(bufferRecords, record); + subscriber.next(buffer); + restartOnEmit && startBuffer(); + }; + const startBuffer = () => { + if (bufferRecords) { + const subs = new Subscription(); + subscriber.add(subs); + const buffer = []; + const record = { + buffer, + subs, + }; + bufferRecords.push(record); + executeSchedule(subs, scheduler, () => emit(record), bufferTimeSpan); + } + }; + if (bufferCreationInterval !== null && bufferCreationInterval >= 0) { + executeSchedule(subscriber, scheduler, startBuffer, bufferCreationInterval, true); + } + else { + restartOnEmit = true; + } + startBuffer(); + const bufferTimeSubscriber = createOperatorSubscriber(subscriber, (value) => { + const recordsCopy = bufferRecords.slice(); + for (const record of recordsCopy) { + const { buffer } = record; + buffer.push(value); + maxBufferSize <= buffer.length && emit(record); + } + }, () => { + while (bufferRecords === null || bufferRecords === void 0 ? void 0 : bufferRecords.length) { + subscriber.next(bufferRecords.shift().buffer); + } + bufferTimeSubscriber === null || bufferTimeSubscriber === void 0 ? void 0 : bufferTimeSubscriber.unsubscribe(); + subscriber.complete(); + subscriber.unsubscribe(); + }, undefined, () => (bufferRecords = null)); + source.subscribe(bufferTimeSubscriber); + }); +} +//# sourceMappingURL=bufferTime.js.map
\ No newline at end of file |
