diff options
Diffstat (limited to 'node_modules/rxjs/src/internal/operators/windowToggle.ts')
| -rw-r--r-- | node_modules/rxjs/src/internal/operators/windowToggle.ts | 132 |
1 files changed, 0 insertions, 132 deletions
diff --git a/node_modules/rxjs/src/internal/operators/windowToggle.ts b/node_modules/rxjs/src/internal/operators/windowToggle.ts deleted file mode 100644 index a17bef5..0000000 --- a/node_modules/rxjs/src/internal/operators/windowToggle.ts +++ /dev/null @@ -1,132 +0,0 @@ -import { Observable } from '../Observable'; -import { Subject } from '../Subject'; -import { Subscription } from '../Subscription'; -import { ObservableInput, OperatorFunction } from '../types'; -import { operate } from '../util/lift'; -import { innerFrom } from '../observable/innerFrom'; -import { createOperatorSubscriber } from './OperatorSubscriber'; -import { noop } from '../util/noop'; -import { arrRemove } from '../util/arrRemove'; - -/** - * Branch out the source Observable values as a nested Observable starting from - * an emission from `openings` and ending when the output of `closingSelector` - * emits. - * - * <span class="informal">It's like {@link bufferToggle}, but emits a nested - * Observable instead of an array.</span> - * - *  - * - * Returns an Observable that emits windows of items it collects from the source - * Observable. The output Observable emits windows that contain those items - * emitted by the source Observable between the time when the `openings` - * Observable emits an item and when the Observable returned by - * `closingSelector` emits an item. - * - * ## Example - * - * Every other second, emit the click events from the next 500ms - * - * ```ts - * import { fromEvent, interval, windowToggle, EMPTY, mergeAll } from 'rxjs'; - * - * const clicks = fromEvent(document, 'click'); - * const openings = interval(1000); - * const result = clicks.pipe( - * windowToggle(openings, i => i % 2 ? interval(500) : EMPTY), - * mergeAll() - * ); - * result.subscribe(x => console.log(x)); - * ``` - * - * @see {@link window} - * @see {@link windowCount} - * @see {@link windowTime} - * @see {@link windowWhen} - * @see {@link bufferToggle} - * - * @param openings An observable of notifications to start new windows. - * @param closingSelector A function that takes the value emitted by the - * `openings` observable and returns an Observable, which, when it emits a next - * notification, signals that the associated window should complete. - * @return A function that returns an Observable of windows, which in turn are - * Observables. - */ -export function windowToggle<T, O>( - openings: ObservableInput<O>, - closingSelector: (openValue: O) => ObservableInput<any> -): OperatorFunction<T, Observable<T>> { - return operate((source, subscriber) => { - const windows: Subject<T>[] = []; - - const handleError = (err: any) => { - while (0 < windows.length) { - windows.shift()!.error(err); - } - subscriber.error(err); - }; - - innerFrom(openings).subscribe( - createOperatorSubscriber( - subscriber, - (openValue) => { - const window = new Subject<T>(); - windows.push(window); - const closingSubscription = new Subscription(); - const closeWindow = () => { - arrRemove(windows, window); - window.complete(); - closingSubscription.unsubscribe(); - }; - - let closingNotifier: Observable<any>; - try { - closingNotifier = innerFrom(closingSelector(openValue)); - } catch (err) { - handleError(err); - return; - } - - subscriber.next(window.asObservable()); - - closingSubscription.add(closingNotifier.subscribe(createOperatorSubscriber(subscriber, closeWindow, noop, handleError))); - }, - noop - ) - ); - - // Subscribe to the source to get things started. - source.subscribe( - createOperatorSubscriber( - subscriber, - (value: T) => { - // Copy the windows array before we emit to - // make sure we don't have issues with reentrant code. - const windowsCopy = windows.slice(); - for (const window of windowsCopy) { - window.next(value); - } - }, - () => { - // Complete all of our windows before we complete. - while (0 < windows.length) { - windows.shift()!.complete(); - } - subscriber.complete(); - }, - handleError, - () => { - // Add this finalization so that all window subjects are - // disposed of. This way, if a user tries to subscribe - // to a window *after* the outer subscription has been unsubscribed, - // they will get an error, instead of waiting forever to - // see if a value arrives. - while (0 < windows.length) { - windows.shift()!.unsubscribe(); - } - } - ) - ); - }); -} |
