diff options
| author | Pinapelz <yukais@pinapelz.com> | 2025-06-28 17:26:46 -0700 |
|---|---|---|
| committer | Pinapelz <yukais@pinapelz.com> | 2025-06-28 17:43:56 -0700 |
| commit | e4fa1e69e7ebfb627c7198fd1a9881e9327ec4d4 (patch) | |
| tree | 06284a538a6008eca75051399e47db4e5d50301c /node_modules/rxjs/src/internal/observable/race.ts | |
initial commit: scaffolding
Diffstat (limited to 'node_modules/rxjs/src/internal/observable/race.ts')
| -rw-r--r-- | node_modules/rxjs/src/internal/observable/race.ts | 88 |
1 files changed, 88 insertions, 0 deletions
diff --git a/node_modules/rxjs/src/internal/observable/race.ts b/node_modules/rxjs/src/internal/observable/race.ts new file mode 100644 index 0000000..d87293f --- /dev/null +++ b/node_modules/rxjs/src/internal/observable/race.ts @@ -0,0 +1,88 @@ +import { Observable } from '../Observable'; +import { innerFrom } from './innerFrom'; +import { Subscription } from '../Subscription'; +import { ObservableInput, ObservableInputTuple } from '../types'; +import { argsOrArgArray } from '../util/argsOrArgArray'; +import { createOperatorSubscriber } from '../operators/OperatorSubscriber'; +import { Subscriber } from '../Subscriber'; + +export function race<T extends readonly unknown[]>(inputs: [...ObservableInputTuple<T>]): Observable<T[number]>; +export function race<T extends readonly unknown[]>(...inputs: [...ObservableInputTuple<T>]): Observable<T[number]>; + +/** + * Returns an observable that mirrors the first source observable to emit an item. + * + *  + * + * `race` returns an observable, that when subscribed to, subscribes to all source observables immediately. + * As soon as one of the source observables emits a value, the result unsubscribes from the other sources. + * The resulting observable will forward all notifications, including error and completion, from the "winning" + * source observable. + * + * If one of the used source observable throws an errors before a first notification + * the race operator will also throw an error, no matter if another source observable + * could potentially win the race. + * + * `race` can be useful for selecting the response from the fastest network connection for + * HTTP or WebSockets. `race` can also be useful for switching observable context based on user + * input. + * + * ## Example + * + * Subscribes to the observable that was the first to start emitting. + * + * ```ts + * import { interval, map, race } from 'rxjs'; + * + * const obs1 = interval(7000).pipe(map(() => 'slow one')); + * const obs2 = interval(3000).pipe(map(() => 'fast one')); + * const obs3 = interval(5000).pipe(map(() => 'medium one')); + * + * race(obs1, obs2, obs3) + * .subscribe(winner => console.log(winner)); + * + * // Outputs + * // a series of 'fast one' + * ``` + * + * @param sources Used to race for which `ObservableInput` emits first. + * @return An Observable that mirrors the output of the first Observable to emit an item. + */ +export function race<T>(...sources: (ObservableInput<T> | ObservableInput<T>[])[]): Observable<any> { + sources = argsOrArgArray(sources); + // If only one source was passed, just return it. Otherwise return the race. + return sources.length === 1 ? innerFrom(sources[0] as ObservableInput<T>) : new Observable<T>(raceInit(sources as ObservableInput<T>[])); +} + +/** + * An observable initializer function for both the static version and the + * operator version of race. + * @param sources The sources to race + */ +export function raceInit<T>(sources: ObservableInput<T>[]) { + return (subscriber: Subscriber<T>) => { + let subscriptions: Subscription[] = []; + + // Subscribe to all of the sources. Note that we are checking `subscriptions` here + // Is is an array of all actively "racing" subscriptions, and it is `null` after the + // race has been won. So, if we have racer that synchronously "wins", this loop will + // stop before it subscribes to any more. + for (let i = 0; subscriptions && !subscriber.closed && i < sources.length; i++) { + subscriptions.push( + innerFrom(sources[i] as ObservableInput<T>).subscribe( + createOperatorSubscriber(subscriber, (value) => { + if (subscriptions) { + // We're still racing, but we won! So unsubscribe + // all other subscriptions that we have, except this one. + for (let s = 0; s < subscriptions.length; s++) { + s !== i && subscriptions[s].unsubscribe(); + } + subscriptions = null!; + } + subscriber.next(value); + }) + ) + ); + } + }; +} |
