aboutsummaryrefslogtreecommitdiffstats
path: root/node_modules/rxjs/src/internal/operators/groupBy.ts
diff options
context:
space:
mode:
authorPinapelz <yukais@pinapelz.com>2025-06-28 17:26:46 -0700
committerPinapelz <yukais@pinapelz.com>2025-06-28 17:43:56 -0700
commite4fa1e69e7ebfb627c7198fd1a9881e9327ec4d4 (patch)
tree06284a538a6008eca75051399e47db4e5d50301c /node_modules/rxjs/src/internal/operators/groupBy.ts
initial commit: scaffolding
Diffstat (limited to 'node_modules/rxjs/src/internal/operators/groupBy.ts')
-rw-r--r--node_modules/rxjs/src/internal/operators/groupBy.ts288
1 files changed, 288 insertions, 0 deletions
diff --git a/node_modules/rxjs/src/internal/operators/groupBy.ts b/node_modules/rxjs/src/internal/operators/groupBy.ts
new file mode 100644
index 0000000..17bbb9a
--- /dev/null
+++ b/node_modules/rxjs/src/internal/operators/groupBy.ts
@@ -0,0 +1,288 @@
+import { Observable } from '../Observable';
+import { innerFrom } from '../observable/innerFrom';
+import { Subject } from '../Subject';
+import { ObservableInput, Observer, OperatorFunction, SubjectLike } from '../types';
+import { operate } from '../util/lift';
+import { createOperatorSubscriber, OperatorSubscriber } from './OperatorSubscriber';
+
+export interface BasicGroupByOptions<K, T> {
+ element?: undefined;
+ duration?: (grouped: GroupedObservable<K, T>) => ObservableInput<any>;
+ connector?: () => SubjectLike<T>;
+}
+
+export interface GroupByOptionsWithElement<K, E, T> {
+ element: (value: T) => E;
+ duration?: (grouped: GroupedObservable<K, E>) => ObservableInput<any>;
+ connector?: () => SubjectLike<E>;
+}
+
+export function groupBy<T, K>(key: (value: T) => K, options: BasicGroupByOptions<K, T>): OperatorFunction<T, GroupedObservable<K, T>>;
+
+export function groupBy<T, K, E>(
+ key: (value: T) => K,
+ options: GroupByOptionsWithElement<K, E, T>
+): OperatorFunction<T, GroupedObservable<K, E>>;
+
+export function groupBy<T, K extends T>(
+ key: (value: T) => value is K
+): OperatorFunction<T, GroupedObservable<true, K> | GroupedObservable<false, Exclude<T, K>>>;
+
+export function groupBy<T, K>(key: (value: T) => K): OperatorFunction<T, GroupedObservable<K, T>>;
+
+/**
+ * @deprecated use the options parameter instead.
+ */
+export function groupBy<T, K>(
+ key: (value: T) => K,
+ element: void,
+ duration: (grouped: GroupedObservable<K, T>) => Observable<any>
+): OperatorFunction<T, GroupedObservable<K, T>>;
+
+/**
+ * @deprecated use the options parameter instead.
+ */
+export function groupBy<T, K, R>(
+ key: (value: T) => K,
+ element?: (value: T) => R,
+ duration?: (grouped: GroupedObservable<K, R>) => Observable<any>
+): OperatorFunction<T, GroupedObservable<K, R>>;
+
+/**
+ * Groups the items emitted by an Observable according to a specified criterion,
+ * and emits these grouped items as `GroupedObservables`, one
+ * {@link GroupedObservable} per group.
+ *
+ * ![](groupBy.png)
+ *
+ * When the Observable emits an item, a key is computed for this item with the key function.
+ *
+ * If a {@link GroupedObservable} for this key exists, this {@link GroupedObservable} emits. Otherwise, a new
+ * {@link GroupedObservable} for this key is created and emits.
+ *
+ * A {@link GroupedObservable} represents values belonging to the same group represented by a common key. The common
+ * key is available as the `key` field of a {@link GroupedObservable} instance.
+ *
+ * The elements emitted by {@link GroupedObservable}s are by default the items emitted by the Observable, or elements
+ * returned by the element function.
+ *
+ * ## Examples
+ *
+ * Group objects by `id` and return as array
+ *
+ * ```ts
+ * import { of, groupBy, mergeMap, reduce } from 'rxjs';
+ *
+ * of(
+ * { id: 1, name: 'JavaScript' },
+ * { id: 2, name: 'Parcel' },
+ * { id: 2, name: 'webpack' },
+ * { id: 1, name: 'TypeScript' },
+ * { id: 3, name: 'TSLint' }
+ * ).pipe(
+ * groupBy(p => p.id),
+ * mergeMap(group$ => group$.pipe(reduce((acc, cur) => [...acc, cur], [])))
+ * )
+ * .subscribe(p => console.log(p));
+ *
+ * // displays:
+ * // [{ id: 1, name: 'JavaScript' }, { id: 1, name: 'TypeScript'}]
+ * // [{ id: 2, name: 'Parcel' }, { id: 2, name: 'webpack'}]
+ * // [{ id: 3, name: 'TSLint' }]
+ * ```
+ *
+ * Pivot data on the `id` field
+ *
+ * ```ts
+ * import { of, groupBy, mergeMap, reduce, map } from 'rxjs';
+ *
+ * of(
+ * { id: 1, name: 'JavaScript' },
+ * { id: 2, name: 'Parcel' },
+ * { id: 2, name: 'webpack' },
+ * { id: 1, name: 'TypeScript' },
+ * { id: 3, name: 'TSLint' }
+ * ).pipe(
+ * groupBy(p => p.id, { element: p => p.name }),
+ * mergeMap(group$ => group$.pipe(reduce((acc, cur) => [...acc, cur], [`${ group$.key }`]))),
+ * map(arr => ({ id: parseInt(arr[0], 10), values: arr.slice(1) }))
+ * )
+ * .subscribe(p => console.log(p));
+ *
+ * // displays:
+ * // { id: 1, values: [ 'JavaScript', 'TypeScript' ] }
+ * // { id: 2, values: [ 'Parcel', 'webpack' ] }
+ * // { id: 3, values: [ 'TSLint' ] }
+ * ```
+ *
+ * @param key A function that extracts the key
+ * for each item.
+ * @param element A function that extracts the
+ * return element for each item.
+ * @param duration
+ * A function that returns an Observable to determine how long each group should
+ * exist.
+ * @param connector Factory function to create an
+ * intermediate Subject through which grouped elements are emitted.
+ * @return A function that returns an Observable that emits GroupedObservables,
+ * each of which corresponds to a unique key value and each of which emits
+ * those items from the source Observable that share that key value.
+ *
+ * @deprecated Use the options parameter instead.
+ */
+export function groupBy<T, K, R>(
+ key: (value: T) => K,
+ element?: (value: T) => R,
+ duration?: (grouped: GroupedObservable<K, R>) => Observable<any>,
+ connector?: () => Subject<R>
+): OperatorFunction<T, GroupedObservable<K, R>>;
+
+// Impl
+export function groupBy<T, K, R>(
+ keySelector: (value: T) => K,
+ elementOrOptions?: ((value: any) => any) | void | BasicGroupByOptions<K, T> | GroupByOptionsWithElement<K, R, T>,
+ duration?: (grouped: GroupedObservable<any, any>) => ObservableInput<any>,
+ connector?: () => SubjectLike<any>
+): OperatorFunction<T, GroupedObservable<K, R>> {
+ return operate((source, subscriber) => {
+ let element: ((value: any) => any) | void;
+ if (!elementOrOptions || typeof elementOrOptions === 'function') {
+ element = elementOrOptions as ((value: any) => any);
+ } else {
+ ({ duration, element, connector } = elementOrOptions);
+ }
+
+ // A lookup for the groups that we have so far.
+ const groups = new Map<K, SubjectLike<any>>();
+
+ // Used for notifying all groups and the subscriber in the same way.
+ const notify = (cb: (group: Observer<any>) => void) => {
+ groups.forEach(cb);
+ cb(subscriber);
+ };
+
+ // Used to handle errors from the source, AND errors that occur during the
+ // next call from the source.
+ const handleError = (err: any) => notify((consumer) => consumer.error(err));
+
+ // The number of actively subscribed groups
+ let activeGroups = 0;
+
+ // Whether or not teardown was attempted on this subscription.
+ let teardownAttempted = false;
+
+ // Capturing a reference to this, because we need a handle to it
+ // in `createGroupedObservable` below. This is what we use to
+ // subscribe to our source observable. This sometimes needs to be unsubscribed
+ // out-of-band with our `subscriber` which is the downstream subscriber, or destination,
+ // in cases where a user unsubscribes from the main resulting subscription, but
+ // still has groups from this subscription subscribed and would expect values from it
+ // Consider: `source.pipe(groupBy(fn), take(2))`.
+ const groupBySourceSubscriber = new OperatorSubscriber(
+ subscriber,
+ (value: T) => {
+ // Because we have to notify all groups of any errors that occur in here,
+ // we have to add our own try/catch to ensure that those errors are propagated.
+ // OperatorSubscriber will only send the error to the main subscriber.
+ try {
+ const key = keySelector(value);
+
+ let group = groups.get(key);
+ if (!group) {
+ // Create our group subject
+ groups.set(key, (group = connector ? connector() : new Subject<any>()));
+
+ // Emit the grouped observable. Note that we can't do a simple `asObservable()` here,
+ // because the grouped observable has special semantics around reference counting
+ // to ensure we don't sever our connection to the source prematurely.
+ const grouped = createGroupedObservable(key, group);
+ subscriber.next(grouped);
+
+ if (duration) {
+ const durationSubscriber = createOperatorSubscriber(
+ // Providing the group here ensures that it is disposed of -- via `unsubscribe` --
+ // when the duration subscription is torn down. That is important, because then
+ // if someone holds a handle to the grouped observable and tries to subscribe to it
+ // after the connection to the source has been severed, they will get an
+ // `ObjectUnsubscribedError` and know they can't possibly get any notifications.
+ group as any,
+ () => {
+ // Our duration notified! We can complete the group.
+ // The group will be removed from the map in the finalization phase.
+ group!.complete();
+ durationSubscriber?.unsubscribe();
+ },
+ // Completions are also sent to the group, but just the group.
+ undefined,
+ // Errors on the duration subscriber are sent to the group
+ // but only the group. They are not sent to the main subscription.
+ undefined,
+ // Finalization: Remove this group from our map.
+ () => groups.delete(key)
+ );
+
+ // Start our duration notifier.
+ groupBySourceSubscriber.add(innerFrom(duration(grouped)).subscribe(durationSubscriber));
+ }
+ }
+
+ // Send the value to our group.
+ group.next(element ? element(value) : value);
+ } catch (err) {
+ handleError(err);
+ }
+ },
+ // Source completes.
+ () => notify((consumer) => consumer.complete()),
+ // Error from the source.
+ handleError,
+ // Free up memory.
+ // When the source subscription is _finally_ torn down, release the subjects and keys
+ // in our groups Map, they may be quite large and we don't want to keep them around if we
+ // don't have to.
+ () => groups.clear(),
+ () => {
+ teardownAttempted = true;
+ // We only kill our subscription to the source if we have
+ // no active groups. As stated above, consider this scenario:
+ // source$.pipe(groupBy(fn), take(2)).
+ return activeGroups === 0;
+ }
+ );
+
+ // Subscribe to the source
+ source.subscribe(groupBySourceSubscriber);
+
+ /**
+ * Creates the actual grouped observable returned.
+ * @param key The key of the group
+ * @param groupSubject The subject that fuels the group
+ */
+ function createGroupedObservable(key: K, groupSubject: SubjectLike<any>) {
+ const result: any = new Observable<T>((groupSubscriber) => {
+ activeGroups++;
+ const innerSub = groupSubject.subscribe(groupSubscriber);
+ return () => {
+ innerSub.unsubscribe();
+ // We can kill the subscription to our source if we now have no more
+ // active groups subscribed, and a finalization was already attempted on
+ // the source.
+ --activeGroups === 0 && teardownAttempted && groupBySourceSubscriber.unsubscribe();
+ };
+ });
+ result.key = key;
+ return result;
+ }
+ });
+}
+
+/**
+ * An observable of values that is the emitted by the result of a {@link groupBy} operator,
+ * contains a `key` property for the grouping.
+ */
+export interface GroupedObservable<K, T> extends Observable<T> {
+ /**
+ * The key value for the grouped notifications.
+ */
+ readonly key: K;
+}
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage