diff options
| author | Pinapelz <yukais@pinapelz.com> | 2025-06-29 11:49:28 -0700 |
|---|---|---|
| committer | Pinapelz <yukais@pinapelz.com> | 2025-06-29 11:49:28 -0700 |
| commit | d55b767039605256c736166a942a9138e3eacfd7 (patch) | |
| tree | 947063b634c50d438a794325f13275e134aa5993 /node_modules/rxjs/src/internal/observable/innerFrom.ts | |
| parent | 864ce67d89c77d8ef9c3361f80d619853abcf91c (diff) | |
remove dev node_modules (oops)
Diffstat (limited to 'node_modules/rxjs/src/internal/observable/innerFrom.ts')
| -rw-r--r-- | node_modules/rxjs/src/internal/observable/innerFrom.ts | 132 |
1 files changed, 0 insertions, 132 deletions
diff --git a/node_modules/rxjs/src/internal/observable/innerFrom.ts b/node_modules/rxjs/src/internal/observable/innerFrom.ts deleted file mode 100644 index c3852c1..0000000 --- a/node_modules/rxjs/src/internal/observable/innerFrom.ts +++ /dev/null @@ -1,132 +0,0 @@ -import { isArrayLike } from '../util/isArrayLike'; -import { isPromise } from '../util/isPromise'; -import { Observable } from '../Observable'; -import { ObservableInput, ObservedValueOf, ReadableStreamLike } from '../types'; -import { isInteropObservable } from '../util/isInteropObservable'; -import { isAsyncIterable } from '../util/isAsyncIterable'; -import { createInvalidObservableTypeError } from '../util/throwUnobservableError'; -import { isIterable } from '../util/isIterable'; -import { isReadableStreamLike, readableStreamLikeToAsyncGenerator } from '../util/isReadableStreamLike'; -import { Subscriber } from '../Subscriber'; -import { isFunction } from '../util/isFunction'; -import { reportUnhandledError } from '../util/reportUnhandledError'; -import { observable as Symbol_observable } from '../symbol/observable'; - -export function innerFrom<O extends ObservableInput<any>>(input: O): Observable<ObservedValueOf<O>>; -export function innerFrom<T>(input: ObservableInput<T>): Observable<T> { - if (input instanceof Observable) { - return input; - } - if (input != null) { - if (isInteropObservable(input)) { - return fromInteropObservable(input); - } - if (isArrayLike(input)) { - return fromArrayLike(input); - } - if (isPromise(input)) { - return fromPromise(input); - } - if (isAsyncIterable(input)) { - return fromAsyncIterable(input); - } - if (isIterable(input)) { - return fromIterable(input); - } - if (isReadableStreamLike(input)) { - return fromReadableStreamLike(input); - } - } - - throw createInvalidObservableTypeError(input); -} - -/** - * Creates an RxJS Observable from an object that implements `Symbol.observable`. - * @param obj An object that properly implements `Symbol.observable`. - */ -export function fromInteropObservable<T>(obj: any) { - return new Observable((subscriber: Subscriber<T>) => { - const obs = obj[Symbol_observable](); - if (isFunction(obs.subscribe)) { - return obs.subscribe(subscriber); - } - // Should be caught by observable subscribe function error handling. - throw new TypeError('Provided object does not correctly implement Symbol.observable'); - }); -} - -/** - * Synchronously emits the values of an array like and completes. - * This is exported because there are creation functions and operators that need to - * make direct use of the same logic, and there's no reason to make them run through - * `from` conditionals because we *know* they're dealing with an array. - * @param array The array to emit values from - */ -export function fromArrayLike<T>(array: ArrayLike<T>) { - return new Observable((subscriber: Subscriber<T>) => { - // Loop over the array and emit each value. Note two things here: - // 1. We're making sure that the subscriber is not closed on each loop. - // This is so we don't continue looping over a very large array after - // something like a `take`, `takeWhile`, or other synchronous unsubscription - // has already unsubscribed. - // 2. In this form, reentrant code can alter that array we're looping over. - // This is a known issue, but considered an edge case. The alternative would - // be to copy the array before executing the loop, but this has - // performance implications. - for (let i = 0; i < array.length && !subscriber.closed; i++) { - subscriber.next(array[i]); - } - subscriber.complete(); - }); -} - -export function fromPromise<T>(promise: PromiseLike<T>) { - return new Observable((subscriber: Subscriber<T>) => { - promise - .then( - (value) => { - if (!subscriber.closed) { - subscriber.next(value); - subscriber.complete(); - } - }, - (err: any) => subscriber.error(err) - ) - .then(null, reportUnhandledError); - }); -} - -export function fromIterable<T>(iterable: Iterable<T>) { - return new Observable((subscriber: Subscriber<T>) => { - for (const value of iterable) { - subscriber.next(value); - if (subscriber.closed) { - return; - } - } - subscriber.complete(); - }); -} - -export function fromAsyncIterable<T>(asyncIterable: AsyncIterable<T>) { - return new Observable((subscriber: Subscriber<T>) => { - process(asyncIterable, subscriber).catch((err) => subscriber.error(err)); - }); -} - -export function fromReadableStreamLike<T>(readableStream: ReadableStreamLike<T>) { - return fromAsyncIterable(readableStreamLikeToAsyncGenerator(readableStream)); -} - -async function process<T>(asyncIterable: AsyncIterable<T>, subscriber: Subscriber<T>) { - for await (const value of asyncIterable) { - subscriber.next(value); - // A side-effect may have closed our subscriber, - // check before the next iteration. - if (subscriber.closed) { - return; - } - } - subscriber.complete(); -} |
