aboutsummaryrefslogtreecommitdiffstats
path: root/node_modules/rxjs/src/internal/operators/windowToggle.ts
diff options
context:
space:
mode:
Diffstat (limited to 'node_modules/rxjs/src/internal/operators/windowToggle.ts')
-rw-r--r--node_modules/rxjs/src/internal/operators/windowToggle.ts132
1 files changed, 132 insertions, 0 deletions
diff --git a/node_modules/rxjs/src/internal/operators/windowToggle.ts b/node_modules/rxjs/src/internal/operators/windowToggle.ts
new file mode 100644
index 0000000..a17bef5
--- /dev/null
+++ b/node_modules/rxjs/src/internal/operators/windowToggle.ts
@@ -0,0 +1,132 @@
+import { Observable } from '../Observable';
+import { Subject } from '../Subject';
+import { Subscription } from '../Subscription';
+import { ObservableInput, OperatorFunction } from '../types';
+import { operate } from '../util/lift';
+import { innerFrom } from '../observable/innerFrom';
+import { createOperatorSubscriber } from './OperatorSubscriber';
+import { noop } from '../util/noop';
+import { arrRemove } from '../util/arrRemove';
+
+/**
+ * Branch out the source Observable values as a nested Observable starting from
+ * an emission from `openings` and ending when the output of `closingSelector`
+ * emits.
+ *
+ * <span class="informal">It's like {@link bufferToggle}, but emits a nested
+ * Observable instead of an array.</span>
+ *
+ * ![](windowToggle.png)
+ *
+ * Returns an Observable that emits windows of items it collects from the source
+ * Observable. The output Observable emits windows that contain those items
+ * emitted by the source Observable between the time when the `openings`
+ * Observable emits an item and when the Observable returned by
+ * `closingSelector` emits an item.
+ *
+ * ## Example
+ *
+ * Every other second, emit the click events from the next 500ms
+ *
+ * ```ts
+ * import { fromEvent, interval, windowToggle, EMPTY, mergeAll } from 'rxjs';
+ *
+ * const clicks = fromEvent(document, 'click');
+ * const openings = interval(1000);
+ * const result = clicks.pipe(
+ * windowToggle(openings, i => i % 2 ? interval(500) : EMPTY),
+ * mergeAll()
+ * );
+ * result.subscribe(x => console.log(x));
+ * ```
+ *
+ * @see {@link window}
+ * @see {@link windowCount}
+ * @see {@link windowTime}
+ * @see {@link windowWhen}
+ * @see {@link bufferToggle}
+ *
+ * @param openings An observable of notifications to start new windows.
+ * @param closingSelector A function that takes the value emitted by the
+ * `openings` observable and returns an Observable, which, when it emits a next
+ * notification, signals that the associated window should complete.
+ * @return A function that returns an Observable of windows, which in turn are
+ * Observables.
+ */
+export function windowToggle<T, O>(
+ openings: ObservableInput<O>,
+ closingSelector: (openValue: O) => ObservableInput<any>
+): OperatorFunction<T, Observable<T>> {
+ return operate((source, subscriber) => {
+ const windows: Subject<T>[] = [];
+
+ const handleError = (err: any) => {
+ while (0 < windows.length) {
+ windows.shift()!.error(err);
+ }
+ subscriber.error(err);
+ };
+
+ innerFrom(openings).subscribe(
+ createOperatorSubscriber(
+ subscriber,
+ (openValue) => {
+ const window = new Subject<T>();
+ windows.push(window);
+ const closingSubscription = new Subscription();
+ const closeWindow = () => {
+ arrRemove(windows, window);
+ window.complete();
+ closingSubscription.unsubscribe();
+ };
+
+ let closingNotifier: Observable<any>;
+ try {
+ closingNotifier = innerFrom(closingSelector(openValue));
+ } catch (err) {
+ handleError(err);
+ return;
+ }
+
+ subscriber.next(window.asObservable());
+
+ closingSubscription.add(closingNotifier.subscribe(createOperatorSubscriber(subscriber, closeWindow, noop, handleError)));
+ },
+ noop
+ )
+ );
+
+ // Subscribe to the source to get things started.
+ source.subscribe(
+ createOperatorSubscriber(
+ subscriber,
+ (value: T) => {
+ // Copy the windows array before we emit to
+ // make sure we don't have issues with reentrant code.
+ const windowsCopy = windows.slice();
+ for (const window of windowsCopy) {
+ window.next(value);
+ }
+ },
+ () => {
+ // Complete all of our windows before we complete.
+ while (0 < windows.length) {
+ windows.shift()!.complete();
+ }
+ subscriber.complete();
+ },
+ handleError,
+ () => {
+ // Add this finalization so that all window subjects are
+ // disposed of. This way, if a user tries to subscribe
+ // to a window *after* the outer subscription has been unsubscribed,
+ // they will get an error, instead of waiting forever to
+ // see if a value arrives.
+ while (0 < windows.length) {
+ windows.shift()!.unsubscribe();
+ }
+ }
+ )
+ );
+ });
+}
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage