diff options
Diffstat (limited to 'node_modules/rxjs/src/internal/scheduled')
7 files changed, 192 insertions, 0 deletions
diff --git a/node_modules/rxjs/src/internal/scheduled/scheduleArray.ts b/node_modules/rxjs/src/internal/scheduled/scheduleArray.ts new file mode 100644 index 0000000..ea8fa24 --- /dev/null +++ b/node_modules/rxjs/src/internal/scheduled/scheduleArray.ts @@ -0,0 +1,27 @@ +import { Observable } from '../Observable'; +import { SchedulerLike } from '../types'; + +export function scheduleArray<T>(input: ArrayLike<T>, scheduler: SchedulerLike) { + return new Observable<T>((subscriber) => { + // The current array index. + let i = 0; + // Start iterating over the array like on a schedule. + return scheduler.schedule(function () { + if (i === input.length) { + // If we have hit the end of the array like in the + // previous job, we can complete. + subscriber.complete(); + } else { + // Otherwise let's next the value at the current index, + // then increment our index. + subscriber.next(input[i++]); + // If the last emission didn't cause us to close the subscriber + // (via take or some side effect), reschedule the job and we'll + // make another pass. + if (!subscriber.closed) { + this.schedule(); + } + } + }); + }); +} diff --git a/node_modules/rxjs/src/internal/scheduled/scheduleAsyncIterable.ts b/node_modules/rxjs/src/internal/scheduled/scheduleAsyncIterable.ts new file mode 100644 index 0000000..daa0346 --- /dev/null +++ b/node_modules/rxjs/src/internal/scheduled/scheduleAsyncIterable.ts @@ -0,0 +1,31 @@ +import { SchedulerLike } from '../types'; +import { Observable } from '../Observable'; +import { executeSchedule } from '../util/executeSchedule'; + +export function scheduleAsyncIterable<T>(input: AsyncIterable<T>, scheduler: SchedulerLike) { + if (!input) { + throw new Error('Iterable cannot be null'); + } + return new Observable<T>((subscriber) => { + executeSchedule(subscriber, scheduler, () => { + const iterator = input[Symbol.asyncIterator](); + executeSchedule( + subscriber, + scheduler, + () => { + iterator.next().then((result) => { + if (result.done) { + // This will remove the subscriptions from + // the parent subscription. + subscriber.complete(); + } else { + subscriber.next(result.value); + } + }); + }, + 0, + true + ); + }); + }); +} diff --git a/node_modules/rxjs/src/internal/scheduled/scheduleIterable.ts b/node_modules/rxjs/src/internal/scheduled/scheduleIterable.ts new file mode 100644 index 0000000..aa1459d --- /dev/null +++ b/node_modules/rxjs/src/internal/scheduled/scheduleIterable.ts @@ -0,0 +1,60 @@ +import { Observable } from '../Observable'; +import { SchedulerLike } from '../types'; +import { iterator as Symbol_iterator } from '../symbol/iterator'; +import { isFunction } from '../util/isFunction'; +import { executeSchedule } from '../util/executeSchedule'; + +/** + * Used in {@link scheduled} to create an observable from an Iterable. + * @param input The iterable to create an observable from + * @param scheduler The scheduler to use + */ +export function scheduleIterable<T>(input: Iterable<T>, scheduler: SchedulerLike) { + return new Observable<T>((subscriber) => { + let iterator: Iterator<T, T>; + + // Schedule the initial creation of the iterator from + // the iterable. This is so the code in the iterable is + // not called until the scheduled job fires. + executeSchedule(subscriber, scheduler, () => { + // Create the iterator. + iterator = (input as any)[Symbol_iterator](); + + executeSchedule( + subscriber, + scheduler, + () => { + let value: T; + let done: boolean | undefined; + try { + // Pull the value out of the iterator + ({ value, done } = iterator.next()); + } catch (err) { + // We got an error while pulling from the iterator + subscriber.error(err); + return; + } + + if (done) { + // If it is "done" we just complete. This mimics the + // behavior of JavaScript's `for..of` consumption of + // iterables, which will not emit the value from an iterator + // result of `{ done: true: value: 'here' }`. + subscriber.complete(); + } else { + // The iterable is not done, emit the value. + subscriber.next(value); + } + }, + 0, + true + ); + }); + + // During finalization, if we see this iterator has a `return` method, + // then we know it is a Generator, and not just an Iterator. So we call + // the `return()` function. This will ensure that any `finally { }` blocks + // inside of the generator we can hit will be hit properly. + return () => isFunction(iterator?.return) && iterator.return(); + }); +} diff --git a/node_modules/rxjs/src/internal/scheduled/scheduleObservable.ts b/node_modules/rxjs/src/internal/scheduled/scheduleObservable.ts new file mode 100644 index 0000000..29ba3b5 --- /dev/null +++ b/node_modules/rxjs/src/internal/scheduled/scheduleObservable.ts @@ -0,0 +1,8 @@ +import { innerFrom } from '../observable/innerFrom'; +import { observeOn } from '../operators/observeOn'; +import { subscribeOn } from '../operators/subscribeOn'; +import { InteropObservable, SchedulerLike } from '../types'; + +export function scheduleObservable<T>(input: InteropObservable<T>, scheduler: SchedulerLike) { + return innerFrom(input).pipe(subscribeOn(scheduler), observeOn(scheduler)); +} diff --git a/node_modules/rxjs/src/internal/scheduled/schedulePromise.ts b/node_modules/rxjs/src/internal/scheduled/schedulePromise.ts new file mode 100644 index 0000000..f1211d0 --- /dev/null +++ b/node_modules/rxjs/src/internal/scheduled/schedulePromise.ts @@ -0,0 +1,8 @@ +import { innerFrom } from '../observable/innerFrom'; +import { observeOn } from '../operators/observeOn'; +import { subscribeOn } from '../operators/subscribeOn'; +import { SchedulerLike } from '../types'; + +export function schedulePromise<T>(input: PromiseLike<T>, scheduler: SchedulerLike) { + return innerFrom(input).pipe(subscribeOn(scheduler), observeOn(scheduler)); +} diff --git a/node_modules/rxjs/src/internal/scheduled/scheduleReadableStreamLike.ts b/node_modules/rxjs/src/internal/scheduled/scheduleReadableStreamLike.ts new file mode 100644 index 0000000..d742f10 --- /dev/null +++ b/node_modules/rxjs/src/internal/scheduled/scheduleReadableStreamLike.ts @@ -0,0 +1,8 @@ +import { SchedulerLike, ReadableStreamLike } from '../types'; +import { Observable } from '../Observable'; +import { scheduleAsyncIterable } from './scheduleAsyncIterable'; +import { readableStreamLikeToAsyncGenerator } from '../util/isReadableStreamLike'; + +export function scheduleReadableStreamLike<T>(input: ReadableStreamLike<T>, scheduler: SchedulerLike): Observable<T> { + return scheduleAsyncIterable(readableStreamLikeToAsyncGenerator(input), scheduler); +} diff --git a/node_modules/rxjs/src/internal/scheduled/scheduled.ts b/node_modules/rxjs/src/internal/scheduled/scheduled.ts new file mode 100644 index 0000000..bb2e425 --- /dev/null +++ b/node_modules/rxjs/src/internal/scheduled/scheduled.ts @@ -0,0 +1,50 @@ +import { scheduleObservable } from './scheduleObservable'; +import { schedulePromise } from './schedulePromise'; +import { scheduleArray } from './scheduleArray'; +import { scheduleIterable } from './scheduleIterable'; +import { scheduleAsyncIterable } from './scheduleAsyncIterable'; +import { isInteropObservable } from '../util/isInteropObservable'; +import { isPromise } from '../util/isPromise'; +import { isArrayLike } from '../util/isArrayLike'; +import { isIterable } from '../util/isIterable'; +import { ObservableInput, SchedulerLike } from '../types'; +import { Observable } from '../Observable'; +import { isAsyncIterable } from '../util/isAsyncIterable'; +import { createInvalidObservableTypeError } from '../util/throwUnobservableError'; +import { isReadableStreamLike } from '../util/isReadableStreamLike'; +import { scheduleReadableStreamLike } from './scheduleReadableStreamLike'; + +/** + * Converts from a common {@link ObservableInput} type to an observable where subscription and emissions + * are scheduled on the provided scheduler. + * + * @see {@link from} + * @see {@link of} + * + * @param input The observable, array, promise, iterable, etc you would like to schedule + * @param scheduler The scheduler to use to schedule the subscription and emissions from + * the returned observable. + */ +export function scheduled<T>(input: ObservableInput<T>, scheduler: SchedulerLike): Observable<T> { + if (input != null) { + if (isInteropObservable(input)) { + return scheduleObservable(input, scheduler); + } + if (isArrayLike(input)) { + return scheduleArray(input, scheduler); + } + if (isPromise(input)) { + return schedulePromise(input, scheduler); + } + if (isAsyncIterable(input)) { + return scheduleAsyncIterable(input, scheduler); + } + if (isIterable(input)) { + return scheduleIterable(input, scheduler); + } + if (isReadableStreamLike(input)) { + return scheduleReadableStreamLike(input, scheduler); + } + } + throw createInvalidObservableTypeError(input); +} |
