diff options
Diffstat (limited to 'node_modules/rxjs/src/internal/operators/window.ts')
| -rw-r--r-- | node_modules/rxjs/src/internal/operators/window.ts | 98 |
1 files changed, 98 insertions, 0 deletions
diff --git a/node_modules/rxjs/src/internal/operators/window.ts b/node_modules/rxjs/src/internal/operators/window.ts new file mode 100644 index 0000000..b8250cb --- /dev/null +++ b/node_modules/rxjs/src/internal/operators/window.ts @@ -0,0 +1,98 @@ +import { Observable } from '../Observable'; +import { OperatorFunction, ObservableInput } from '../types'; +import { Subject } from '../Subject'; +import { operate } from '../util/lift'; +import { createOperatorSubscriber } from './OperatorSubscriber'; +import { noop } from '../util/noop'; +import { innerFrom } from '../observable/innerFrom'; + +/** + * Branch out the source Observable values as a nested Observable whenever + * `windowBoundaries` emits. + * + * <span class="informal">It's like {@link buffer}, but emits a nested Observable + * instead of an array.</span> + * + *  + * + * Returns an Observable that emits windows of items it collects from the source + * Observable. The output Observable emits connected, non-overlapping + * windows. It emits the current window and opens a new one whenever the + * `windowBoundaries` emits an item. `windowBoundaries` can be any type that + * `ObservableInput` accepts. It internally gets converted to an Observable. + * Because each window is an Observable, the output is a higher-order Observable. + * + * ## Example + * + * In every window of 1 second each, emit at most 2 click events + * + * ```ts + * import { fromEvent, interval, window, map, take, mergeAll } from 'rxjs'; + * + * const clicks = fromEvent(document, 'click'); + * const sec = interval(1000); + * const result = clicks.pipe( + * window(sec), + * map(win => win.pipe(take(2))), // take at most 2 emissions from each window + * mergeAll() // flatten the Observable-of-Observables + * ); + * result.subscribe(x => console.log(x)); + * ``` + * + * @see {@link windowCount} + * @see {@link windowTime} + * @see {@link windowToggle} + * @see {@link windowWhen} + * @see {@link buffer} + * + * @param windowBoundaries An `ObservableInput` that completes the + * previous window and starts a new window. + * @return A function that returns an Observable of windows, which are + * Observables emitting values of the source Observable. + */ +export function window<T>(windowBoundaries: ObservableInput<any>): OperatorFunction<T, Observable<T>> { + return operate((source, subscriber) => { + let windowSubject: Subject<T> = new Subject<T>(); + + subscriber.next(windowSubject.asObservable()); + + const errorHandler = (err: any) => { + windowSubject.error(err); + subscriber.error(err); + }; + + // Subscribe to our source + source.subscribe( + createOperatorSubscriber( + subscriber, + (value) => windowSubject?.next(value), + () => { + windowSubject.complete(); + subscriber.complete(); + }, + errorHandler + ) + ); + + // Subscribe to the window boundaries. + innerFrom(windowBoundaries).subscribe( + createOperatorSubscriber( + subscriber, + () => { + windowSubject.complete(); + subscriber.next((windowSubject = new Subject())); + }, + noop, + errorHandler + ) + ); + + return () => { + // Unsubscribing the subject ensures that anyone who has captured + // a reference to this window that tries to use it after it can + // no longer get values from the source will get an ObjectUnsubscribedError. + windowSubject?.unsubscribe(); + windowSubject = null!; + }; + }); +} |
