diff options
Diffstat (limited to 'node_modules/rxjs/dist/esm/internal/operators/groupBy.js')
| -rw-r--r-- | node_modules/rxjs/dist/esm/internal/operators/groupBy.js | 63 |
1 files changed, 63 insertions, 0 deletions
diff --git a/node_modules/rxjs/dist/esm/internal/operators/groupBy.js b/node_modules/rxjs/dist/esm/internal/operators/groupBy.js new file mode 100644 index 0000000..56f00b6 --- /dev/null +++ b/node_modules/rxjs/dist/esm/internal/operators/groupBy.js @@ -0,0 +1,63 @@ +import { Observable } from '../Observable'; +import { innerFrom } from '../observable/innerFrom'; +import { Subject } from '../Subject'; +import { operate } from '../util/lift'; +import { createOperatorSubscriber, OperatorSubscriber } from './OperatorSubscriber'; +export function groupBy(keySelector, elementOrOptions, duration, connector) { + return operate((source, subscriber) => { + let element; + if (!elementOrOptions || typeof elementOrOptions === 'function') { + element = elementOrOptions; + } + else { + ({ duration, element, connector } = elementOrOptions); + } + const groups = new Map(); + const notify = (cb) => { + groups.forEach(cb); + cb(subscriber); + }; + const handleError = (err) => notify((consumer) => consumer.error(err)); + let activeGroups = 0; + let teardownAttempted = false; + const groupBySourceSubscriber = new OperatorSubscriber(subscriber, (value) => { + try { + const key = keySelector(value); + let group = groups.get(key); + if (!group) { + groups.set(key, (group = connector ? connector() : new Subject())); + const grouped = createGroupedObservable(key, group); + subscriber.next(grouped); + if (duration) { + const durationSubscriber = createOperatorSubscriber(group, () => { + group.complete(); + durationSubscriber === null || durationSubscriber === void 0 ? void 0 : durationSubscriber.unsubscribe(); + }, undefined, undefined, () => groups.delete(key)); + groupBySourceSubscriber.add(innerFrom(duration(grouped)).subscribe(durationSubscriber)); + } + } + group.next(element ? element(value) : value); + } + catch (err) { + handleError(err); + } + }, () => notify((consumer) => consumer.complete()), handleError, () => groups.clear(), () => { + teardownAttempted = true; + return activeGroups === 0; + }); + source.subscribe(groupBySourceSubscriber); + function createGroupedObservable(key, groupSubject) { + const result = new Observable((groupSubscriber) => { + activeGroups++; + const innerSub = groupSubject.subscribe(groupSubscriber); + return () => { + innerSub.unsubscribe(); + --activeGroups === 0 && teardownAttempted && groupBySourceSubscriber.unsubscribe(); + }; + }); + result.key = key; + return result; + } + }); +} +//# sourceMappingURL=groupBy.js.map
\ No newline at end of file |
