aboutsummaryrefslogtreecommitdiffstats
path: root/node_modules/rxjs/src/internal/observable/combineLatest.ts
diff options
context:
space:
mode:
authorPinapelz <yukais@pinapelz.com>2025-06-28 17:26:46 -0700
committerPinapelz <yukais@pinapelz.com>2025-06-28 17:43:56 -0700
commite4fa1e69e7ebfb627c7198fd1a9881e9327ec4d4 (patch)
tree06284a538a6008eca75051399e47db4e5d50301c /node_modules/rxjs/src/internal/observable/combineLatest.ts
initial commit: scaffolding
Diffstat (limited to 'node_modules/rxjs/src/internal/observable/combineLatest.ts')
-rw-r--r--node_modules/rxjs/src/internal/observable/combineLatest.ts300
1 files changed, 300 insertions, 0 deletions
diff --git a/node_modules/rxjs/src/internal/observable/combineLatest.ts b/node_modules/rxjs/src/internal/observable/combineLatest.ts
new file mode 100644
index 0000000..9044060
--- /dev/null
+++ b/node_modules/rxjs/src/internal/observable/combineLatest.ts
@@ -0,0 +1,300 @@
+import { Observable } from '../Observable';
+import { ObservableInput, SchedulerLike, ObservedValueOf, ObservableInputTuple } from '../types';
+import { argsArgArrayOrObject } from '../util/argsArgArrayOrObject';
+import { Subscriber } from '../Subscriber';
+import { from } from './from';
+import { identity } from '../util/identity';
+import { Subscription } from '../Subscription';
+import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs';
+import { popResultSelector, popScheduler } from '../util/args';
+import { createObject } from '../util/createObject';
+import { createOperatorSubscriber } from '../operators/OperatorSubscriber';
+import { AnyCatcher } from '../AnyCatcher';
+import { executeSchedule } from '../util/executeSchedule';
+
+// combineLatest(any)
+// We put this first because we need to catch cases where the user has supplied
+// _exactly `any`_ as the argument. Since `any` literally matches _anything_,
+// we don't want it to randomly hit one of the other type signatures below,
+// as we have no idea at build-time what type we should be returning when given an any.
+
+/**
+ * You have passed `any` here, we can't figure out if it is
+ * an array or an object, so you're getting `unknown`. Use better types.
+ * @param arg Something typed as `any`
+ */
+export function combineLatest<T extends AnyCatcher>(arg: T): Observable<unknown>;
+
+// combineLatest([a, b, c])
+export function combineLatest(sources: []): Observable<never>;
+export function combineLatest<A extends readonly unknown[]>(sources: readonly [...ObservableInputTuple<A>]): Observable<A>;
+/** @deprecated The `scheduler` parameter will be removed in v8. Use `scheduled` and `combineLatestAll`. Details: https://rxjs.dev/deprecations/scheduler-argument */
+export function combineLatest<A extends readonly unknown[], R>(
+ sources: readonly [...ObservableInputTuple<A>],
+ resultSelector: (...values: A) => R,
+ scheduler: SchedulerLike
+): Observable<R>;
+export function combineLatest<A extends readonly unknown[], R>(
+ sources: readonly [...ObservableInputTuple<A>],
+ resultSelector: (...values: A) => R
+): Observable<R>;
+/** @deprecated The `scheduler` parameter will be removed in v8. Use `scheduled` and `combineLatestAll`. Details: https://rxjs.dev/deprecations/scheduler-argument */
+export function combineLatest<A extends readonly unknown[]>(
+ sources: readonly [...ObservableInputTuple<A>],
+ scheduler: SchedulerLike
+): Observable<A>;
+
+// combineLatest(a, b, c)
+/** @deprecated Pass an array of sources instead. The rest-parameters signature will be removed in v8. Details: https://rxjs.dev/deprecations/array-argument */
+export function combineLatest<A extends readonly unknown[]>(...sources: [...ObservableInputTuple<A>]): Observable<A>;
+/** @deprecated The `scheduler` parameter will be removed in v8. Use `scheduled` and `combineLatestAll`. Details: https://rxjs.dev/deprecations/scheduler-argument */
+export function combineLatest<A extends readonly unknown[], R>(
+ ...sourcesAndResultSelectorAndScheduler: [...ObservableInputTuple<A>, (...values: A) => R, SchedulerLike]
+): Observable<R>;
+/** @deprecated Pass an array of sources instead. The rest-parameters signature will be removed in v8. Details: https://rxjs.dev/deprecations/array-argument */
+export function combineLatest<A extends readonly unknown[], R>(
+ ...sourcesAndResultSelector: [...ObservableInputTuple<A>, (...values: A) => R]
+): Observable<R>;
+/** @deprecated The `scheduler` parameter will be removed in v8. Use `scheduled` and `combineLatestAll`. Details: https://rxjs.dev/deprecations/scheduler-argument */
+export function combineLatest<A extends readonly unknown[]>(
+ ...sourcesAndScheduler: [...ObservableInputTuple<A>, SchedulerLike]
+): Observable<A>;
+
+// combineLatest({a, b, c})
+export function combineLatest(sourcesObject: { [K in any]: never }): Observable<never>;
+export function combineLatest<T extends Record<string, ObservableInput<any>>>(
+ sourcesObject: T
+): Observable<{ [K in keyof T]: ObservedValueOf<T[K]> }>;
+
+/**
+ * Combines multiple Observables to create an Observable whose values are
+ * calculated from the latest values of each of its input Observables.
+ *
+ * <span class="informal">Whenever any input Observable emits a value, it
+ * computes a formula using the latest values from all the inputs, then emits
+ * the output of that formula.</span>
+ *
+ * ![](combineLatest.png)
+ *
+ * `combineLatest` combines the values from all the Observables passed in the
+ * observables array. This is done by subscribing to each Observable in order and,
+ * whenever any Observable emits, collecting an array of the most recent
+ * values from each Observable. So if you pass `n` Observables to this operator,
+ * the returned Observable will always emit an array of `n` values, in an order
+ * corresponding to the order of the passed Observables (the value from the first Observable
+ * will be at index 0 of the array and so on).
+ *
+ * Static version of `combineLatest` accepts an array of Observables. Note that an array of
+ * Observables is a good choice, if you don't know beforehand how many Observables
+ * you will combine. Passing an empty array will result in an Observable that
+ * completes immediately.
+ *
+ * To ensure the output array always has the same length, `combineLatest` will
+ * actually wait for all input Observables to emit at least once,
+ * before it starts emitting results. This means if some Observable emits
+ * values before other Observables started emitting, all these values but the last
+ * will be lost. On the other hand, if some Observable does not emit a value but
+ * completes, resulting Observable will complete at the same moment without
+ * emitting anything, since it will now be impossible to include a value from the
+ * completed Observable in the resulting array. Also, if some input Observable does
+ * not emit any value and never completes, `combineLatest` will also never emit
+ * and never complete, since, again, it will wait for all streams to emit some
+ * value.
+ *
+ * If at least one Observable was passed to `combineLatest` and all passed Observables
+ * emitted something, the resulting Observable will complete when all combined
+ * streams complete. So even if some Observable completes, the result of
+ * `combineLatest` will still emit values when other Observables do. In case
+ * of a completed Observable, its value from now on will always be the last
+ * emitted value. On the other hand, if any Observable errors, `combineLatest`
+ * will error immediately as well, and all other Observables will be unsubscribed.
+ *
+ * ## Examples
+ *
+ * Combine two timer Observables
+ *
+ * ```ts
+ * import { timer, combineLatest } from 'rxjs';
+ *
+ * const firstTimer = timer(0, 1000); // emit 0, 1, 2... after every second, starting from now
+ * const secondTimer = timer(500, 1000); // emit 0, 1, 2... after every second, starting 0,5s from now
+ * const combinedTimers = combineLatest([firstTimer, secondTimer]);
+ * combinedTimers.subscribe(value => console.log(value));
+ * // Logs
+ * // [0, 0] after 0.5s
+ * // [1, 0] after 1s
+ * // [1, 1] after 1.5s
+ * // [2, 1] after 2s
+ * ```
+ *
+ * Combine a dictionary of Observables
+ *
+ * ```ts
+ * import { of, delay, startWith, combineLatest } from 'rxjs';
+ *
+ * const observables = {
+ * a: of(1).pipe(delay(1000), startWith(0)),
+ * b: of(5).pipe(delay(5000), startWith(0)),
+ * c: of(10).pipe(delay(10000), startWith(0))
+ * };
+ * const combined = combineLatest(observables);
+ * combined.subscribe(value => console.log(value));
+ * // Logs
+ * // { a: 0, b: 0, c: 0 } immediately
+ * // { a: 1, b: 0, c: 0 } after 1s
+ * // { a: 1, b: 5, c: 0 } after 5s
+ * // { a: 1, b: 5, c: 10 } after 10s
+ * ```
+ *
+ * Combine an array of Observables
+ *
+ * ```ts
+ * import { of, delay, startWith, combineLatest } from 'rxjs';
+ *
+ * const observables = [1, 5, 10].map(
+ * n => of(n).pipe(
+ * delay(n * 1000), // emit 0 and then emit n after n seconds
+ * startWith(0)
+ * )
+ * );
+ * const combined = combineLatest(observables);
+ * combined.subscribe(value => console.log(value));
+ * // Logs
+ * // [0, 0, 0] immediately
+ * // [1, 0, 0] after 1s
+ * // [1, 5, 0] after 5s
+ * // [1, 5, 10] after 10s
+ * ```
+ *
+ * Use map operator to dynamically calculate the Body-Mass Index
+ *
+ * ```ts
+ * import { of, combineLatest, map } from 'rxjs';
+ *
+ * const weight = of(70, 72, 76, 79, 75);
+ * const height = of(1.76, 1.77, 1.78);
+ * const bmi = combineLatest([weight, height]).pipe(
+ * map(([w, h]) => w / (h * h)),
+ * );
+ * bmi.subscribe(x => console.log('BMI is ' + x));
+ *
+ * // With output to console:
+ * // BMI is 24.212293388429753
+ * // BMI is 23.93948099205209
+ * // BMI is 23.671253629592222
+ * ```
+ *
+ * @see {@link combineLatestAll}
+ * @see {@link merge}
+ * @see {@link withLatestFrom}
+ *
+ * @param args Any number of `ObservableInput`s provided either as an array or as an object
+ * to combine with each other. If the last parameter is the function, it will be used to project the
+ * values from the combined latest values into a new value on the output Observable.
+ * @return An Observable of projected values from the most recent values from each `ObservableInput`,
+ * or an array of the most recent values from each `ObservableInput`.
+ */
+export function combineLatest<O extends ObservableInput<any>, R>(...args: any[]): Observable<R> | Observable<ObservedValueOf<O>[]> {
+ const scheduler = popScheduler(args);
+ const resultSelector = popResultSelector(args);
+
+ const { args: observables, keys } = argsArgArrayOrObject(args);
+
+ if (observables.length === 0) {
+ // If no observables are passed, or someone has passed an empty array
+ // of observables, or even an empty object POJO, we need to just
+ // complete (EMPTY), but we have to honor the scheduler provided if any.
+ return from([], scheduler as any);
+ }
+
+ const result = new Observable<ObservedValueOf<O>[]>(
+ combineLatestInit(
+ observables as ObservableInput<ObservedValueOf<O>>[],
+ scheduler,
+ keys
+ ? // A handler for scrubbing the array of args into a dictionary.
+ (values) => createObject(keys, values)
+ : // A passthrough to just return the array
+ identity
+ )
+ );
+
+ return resultSelector ? (result.pipe(mapOneOrManyArgs(resultSelector)) as Observable<R>) : result;
+}
+
+export function combineLatestInit(
+ observables: ObservableInput<any>[],
+ scheduler?: SchedulerLike,
+ valueTransform: (values: any[]) => any = identity
+) {
+ return (subscriber: Subscriber<any>) => {
+ // The outer subscription. We're capturing this in a function
+ // because we may have to schedule it.
+ maybeSchedule(
+ scheduler,
+ () => {
+ const { length } = observables;
+ // A store for the values each observable has emitted so far. We match observable to value on index.
+ const values = new Array(length);
+ // The number of currently active subscriptions, as they complete, we decrement this number to see if
+ // we are all done combining values, so we can complete the result.
+ let active = length;
+ // The number of inner sources that still haven't emitted the first value
+ // We need to track this because all sources need to emit one value in order
+ // to start emitting values.
+ let remainingFirstValues = length;
+ // The loop to kick off subscription. We're keying everything on index `i` to relate the observables passed
+ // in to the slot in the output array or the key in the array of keys in the output dictionary.
+ for (let i = 0; i < length; i++) {
+ maybeSchedule(
+ scheduler,
+ () => {
+ const source = from(observables[i], scheduler as any);
+ let hasFirstValue = false;
+ source.subscribe(
+ createOperatorSubscriber(
+ subscriber,
+ (value) => {
+ // When we get a value, record it in our set of values.
+ values[i] = value;
+ if (!hasFirstValue) {
+ // If this is our first value, record that.
+ hasFirstValue = true;
+ remainingFirstValues--;
+ }
+ if (!remainingFirstValues) {
+ // We're not waiting for any more
+ // first values, so we can emit!
+ subscriber.next(valueTransform(values.slice()));
+ }
+ },
+ () => {
+ if (!--active) {
+ // We only complete the result if we have no more active
+ // inner observables.
+ subscriber.complete();
+ }
+ }
+ )
+ );
+ },
+ subscriber
+ );
+ }
+ },
+ subscriber
+ );
+ };
+}
+
+/**
+ * A small utility to handle the couple of locations where we want to schedule if a scheduler was provided,
+ * but we don't if there was no scheduler.
+ */
+function maybeSchedule(scheduler: SchedulerLike | undefined, execute: () => void, subscription: Subscription) {
+ if (scheduler) {
+ executeSchedule(subscription, scheduler, execute);
+ } else {
+ execute();
+ }
+}
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage