diff options
Diffstat (limited to 'node_modules/rxjs/src/internal/operators/window.ts')
| -rw-r--r-- | node_modules/rxjs/src/internal/operators/window.ts | 98 |
1 files changed, 0 insertions, 98 deletions
diff --git a/node_modules/rxjs/src/internal/operators/window.ts b/node_modules/rxjs/src/internal/operators/window.ts deleted file mode 100644 index b8250cb..0000000 --- a/node_modules/rxjs/src/internal/operators/window.ts +++ /dev/null @@ -1,98 +0,0 @@ -import { Observable } from '../Observable'; -import { OperatorFunction, ObservableInput } from '../types'; -import { Subject } from '../Subject'; -import { operate } from '../util/lift'; -import { createOperatorSubscriber } from './OperatorSubscriber'; -import { noop } from '../util/noop'; -import { innerFrom } from '../observable/innerFrom'; - -/** - * Branch out the source Observable values as a nested Observable whenever - * `windowBoundaries` emits. - * - * <span class="informal">It's like {@link buffer}, but emits a nested Observable - * instead of an array.</span> - * - *  - * - * Returns an Observable that emits windows of items it collects from the source - * Observable. The output Observable emits connected, non-overlapping - * windows. It emits the current window and opens a new one whenever the - * `windowBoundaries` emits an item. `windowBoundaries` can be any type that - * `ObservableInput` accepts. It internally gets converted to an Observable. - * Because each window is an Observable, the output is a higher-order Observable. - * - * ## Example - * - * In every window of 1 second each, emit at most 2 click events - * - * ```ts - * import { fromEvent, interval, window, map, take, mergeAll } from 'rxjs'; - * - * const clicks = fromEvent(document, 'click'); - * const sec = interval(1000); - * const result = clicks.pipe( - * window(sec), - * map(win => win.pipe(take(2))), // take at most 2 emissions from each window - * mergeAll() // flatten the Observable-of-Observables - * ); - * result.subscribe(x => console.log(x)); - * ``` - * - * @see {@link windowCount} - * @see {@link windowTime} - * @see {@link windowToggle} - * @see {@link windowWhen} - * @see {@link buffer} - * - * @param windowBoundaries An `ObservableInput` that completes the - * previous window and starts a new window. - * @return A function that returns an Observable of windows, which are - * Observables emitting values of the source Observable. - */ -export function window<T>(windowBoundaries: ObservableInput<any>): OperatorFunction<T, Observable<T>> { - return operate((source, subscriber) => { - let windowSubject: Subject<T> = new Subject<T>(); - - subscriber.next(windowSubject.asObservable()); - - const errorHandler = (err: any) => { - windowSubject.error(err); - subscriber.error(err); - }; - - // Subscribe to our source - source.subscribe( - createOperatorSubscriber( - subscriber, - (value) => windowSubject?.next(value), - () => { - windowSubject.complete(); - subscriber.complete(); - }, - errorHandler - ) - ); - - // Subscribe to the window boundaries. - innerFrom(windowBoundaries).subscribe( - createOperatorSubscriber( - subscriber, - () => { - windowSubject.complete(); - subscriber.next((windowSubject = new Subject())); - }, - noop, - errorHandler - ) - ); - - return () => { - // Unsubscribing the subject ensures that anyone who has captured - // a reference to this window that tries to use it after it can - // no longer get values from the source will get an ObjectUnsubscribedError. - windowSubject?.unsubscribe(); - windowSubject = null!; - }; - }); -} |
