aboutsummaryrefslogtreecommitdiffstats
path: root/node_modules/rxjs/src/internal/operators/windowToggle.ts
diff options
context:
space:
mode:
Diffstat (limited to 'node_modules/rxjs/src/internal/operators/windowToggle.ts')
-rw-r--r--node_modules/rxjs/src/internal/operators/windowToggle.ts132
1 files changed, 0 insertions, 132 deletions
diff --git a/node_modules/rxjs/src/internal/operators/windowToggle.ts b/node_modules/rxjs/src/internal/operators/windowToggle.ts
deleted file mode 100644
index a17bef5..0000000
--- a/node_modules/rxjs/src/internal/operators/windowToggle.ts
+++ /dev/null
@@ -1,132 +0,0 @@
-import { Observable } from '../Observable';
-import { Subject } from '../Subject';
-import { Subscription } from '../Subscription';
-import { ObservableInput, OperatorFunction } from '../types';
-import { operate } from '../util/lift';
-import { innerFrom } from '../observable/innerFrom';
-import { createOperatorSubscriber } from './OperatorSubscriber';
-import { noop } from '../util/noop';
-import { arrRemove } from '../util/arrRemove';
-
-/**
- * Branch out the source Observable values as a nested Observable starting from
- * an emission from `openings` and ending when the output of `closingSelector`
- * emits.
- *
- * <span class="informal">It's like {@link bufferToggle}, but emits a nested
- * Observable instead of an array.</span>
- *
- * ![](windowToggle.png)
- *
- * Returns an Observable that emits windows of items it collects from the source
- * Observable. The output Observable emits windows that contain those items
- * emitted by the source Observable between the time when the `openings`
- * Observable emits an item and when the Observable returned by
- * `closingSelector` emits an item.
- *
- * ## Example
- *
- * Every other second, emit the click events from the next 500ms
- *
- * ```ts
- * import { fromEvent, interval, windowToggle, EMPTY, mergeAll } from 'rxjs';
- *
- * const clicks = fromEvent(document, 'click');
- * const openings = interval(1000);
- * const result = clicks.pipe(
- * windowToggle(openings, i => i % 2 ? interval(500) : EMPTY),
- * mergeAll()
- * );
- * result.subscribe(x => console.log(x));
- * ```
- *
- * @see {@link window}
- * @see {@link windowCount}
- * @see {@link windowTime}
- * @see {@link windowWhen}
- * @see {@link bufferToggle}
- *
- * @param openings An observable of notifications to start new windows.
- * @param closingSelector A function that takes the value emitted by the
- * `openings` observable and returns an Observable, which, when it emits a next
- * notification, signals that the associated window should complete.
- * @return A function that returns an Observable of windows, which in turn are
- * Observables.
- */
-export function windowToggle<T, O>(
- openings: ObservableInput<O>,
- closingSelector: (openValue: O) => ObservableInput<any>
-): OperatorFunction<T, Observable<T>> {
- return operate((source, subscriber) => {
- const windows: Subject<T>[] = [];
-
- const handleError = (err: any) => {
- while (0 < windows.length) {
- windows.shift()!.error(err);
- }
- subscriber.error(err);
- };
-
- innerFrom(openings).subscribe(
- createOperatorSubscriber(
- subscriber,
- (openValue) => {
- const window = new Subject<T>();
- windows.push(window);
- const closingSubscription = new Subscription();
- const closeWindow = () => {
- arrRemove(windows, window);
- window.complete();
- closingSubscription.unsubscribe();
- };
-
- let closingNotifier: Observable<any>;
- try {
- closingNotifier = innerFrom(closingSelector(openValue));
- } catch (err) {
- handleError(err);
- return;
- }
-
- subscriber.next(window.asObservable());
-
- closingSubscription.add(closingNotifier.subscribe(createOperatorSubscriber(subscriber, closeWindow, noop, handleError)));
- },
- noop
- )
- );
-
- // Subscribe to the source to get things started.
- source.subscribe(
- createOperatorSubscriber(
- subscriber,
- (value: T) => {
- // Copy the windows array before we emit to
- // make sure we don't have issues with reentrant code.
- const windowsCopy = windows.slice();
- for (const window of windowsCopy) {
- window.next(value);
- }
- },
- () => {
- // Complete all of our windows before we complete.
- while (0 < windows.length) {
- windows.shift()!.complete();
- }
- subscriber.complete();
- },
- handleError,
- () => {
- // Add this finalization so that all window subjects are
- // disposed of. This way, if a user tries to subscribe
- // to a window *after* the outer subscription has been unsubscribed,
- // they will get an error, instead of waiting forever to
- // see if a value arrives.
- while (0 < windows.length) {
- windows.shift()!.unsubscribe();
- }
- }
- )
- );
- });
-}
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage