diff options
Diffstat (limited to 'node_modules/rxjs/src/internal/observable/innerFrom.ts')
| -rw-r--r-- | node_modules/rxjs/src/internal/observable/innerFrom.ts | 132 |
1 files changed, 132 insertions, 0 deletions
diff --git a/node_modules/rxjs/src/internal/observable/innerFrom.ts b/node_modules/rxjs/src/internal/observable/innerFrom.ts new file mode 100644 index 0000000..c3852c1 --- /dev/null +++ b/node_modules/rxjs/src/internal/observable/innerFrom.ts @@ -0,0 +1,132 @@ +import { isArrayLike } from '../util/isArrayLike'; +import { isPromise } from '../util/isPromise'; +import { Observable } from '../Observable'; +import { ObservableInput, ObservedValueOf, ReadableStreamLike } from '../types'; +import { isInteropObservable } from '../util/isInteropObservable'; +import { isAsyncIterable } from '../util/isAsyncIterable'; +import { createInvalidObservableTypeError } from '../util/throwUnobservableError'; +import { isIterable } from '../util/isIterable'; +import { isReadableStreamLike, readableStreamLikeToAsyncGenerator } from '../util/isReadableStreamLike'; +import { Subscriber } from '../Subscriber'; +import { isFunction } from '../util/isFunction'; +import { reportUnhandledError } from '../util/reportUnhandledError'; +import { observable as Symbol_observable } from '../symbol/observable'; + +export function innerFrom<O extends ObservableInput<any>>(input: O): Observable<ObservedValueOf<O>>; +export function innerFrom<T>(input: ObservableInput<T>): Observable<T> { + if (input instanceof Observable) { + return input; + } + if (input != null) { + if (isInteropObservable(input)) { + return fromInteropObservable(input); + } + if (isArrayLike(input)) { + return fromArrayLike(input); + } + if (isPromise(input)) { + return fromPromise(input); + } + if (isAsyncIterable(input)) { + return fromAsyncIterable(input); + } + if (isIterable(input)) { + return fromIterable(input); + } + if (isReadableStreamLike(input)) { + return fromReadableStreamLike(input); + } + } + + throw createInvalidObservableTypeError(input); +} + +/** + * Creates an RxJS Observable from an object that implements `Symbol.observable`. + * @param obj An object that properly implements `Symbol.observable`. + */ +export function fromInteropObservable<T>(obj: any) { + return new Observable((subscriber: Subscriber<T>) => { + const obs = obj[Symbol_observable](); + if (isFunction(obs.subscribe)) { + return obs.subscribe(subscriber); + } + // Should be caught by observable subscribe function error handling. + throw new TypeError('Provided object does not correctly implement Symbol.observable'); + }); +} + +/** + * Synchronously emits the values of an array like and completes. + * This is exported because there are creation functions and operators that need to + * make direct use of the same logic, and there's no reason to make them run through + * `from` conditionals because we *know* they're dealing with an array. + * @param array The array to emit values from + */ +export function fromArrayLike<T>(array: ArrayLike<T>) { + return new Observable((subscriber: Subscriber<T>) => { + // Loop over the array and emit each value. Note two things here: + // 1. We're making sure that the subscriber is not closed on each loop. + // This is so we don't continue looping over a very large array after + // something like a `take`, `takeWhile`, or other synchronous unsubscription + // has already unsubscribed. + // 2. In this form, reentrant code can alter that array we're looping over. + // This is a known issue, but considered an edge case. The alternative would + // be to copy the array before executing the loop, but this has + // performance implications. + for (let i = 0; i < array.length && !subscriber.closed; i++) { + subscriber.next(array[i]); + } + subscriber.complete(); + }); +} + +export function fromPromise<T>(promise: PromiseLike<T>) { + return new Observable((subscriber: Subscriber<T>) => { + promise + .then( + (value) => { + if (!subscriber.closed) { + subscriber.next(value); + subscriber.complete(); + } + }, + (err: any) => subscriber.error(err) + ) + .then(null, reportUnhandledError); + }); +} + +export function fromIterable<T>(iterable: Iterable<T>) { + return new Observable((subscriber: Subscriber<T>) => { + for (const value of iterable) { + subscriber.next(value); + if (subscriber.closed) { + return; + } + } + subscriber.complete(); + }); +} + +export function fromAsyncIterable<T>(asyncIterable: AsyncIterable<T>) { + return new Observable((subscriber: Subscriber<T>) => { + process(asyncIterable, subscriber).catch((err) => subscriber.error(err)); + }); +} + +export function fromReadableStreamLike<T>(readableStream: ReadableStreamLike<T>) { + return fromAsyncIterable(readableStreamLikeToAsyncGenerator(readableStream)); +} + +async function process<T>(asyncIterable: AsyncIterable<T>, subscriber: Subscriber<T>) { + for await (const value of asyncIterable) { + subscriber.next(value); + // A side-effect may have closed our subscriber, + // check before the next iteration. + if (subscriber.closed) { + return; + } + } + subscriber.complete(); +} |
