aboutsummaryrefslogtreecommitdiffstats
path: root/node_modules/rxjs/src/internal/operators/buffer.ts
diff options
context:
space:
mode:
Diffstat (limited to 'node_modules/rxjs/src/internal/operators/buffer.ts')
-rw-r--r--node_modules/rxjs/src/internal/operators/buffer.ts81
1 files changed, 81 insertions, 0 deletions
diff --git a/node_modules/rxjs/src/internal/operators/buffer.ts b/node_modules/rxjs/src/internal/operators/buffer.ts
new file mode 100644
index 0000000..2ca2fde
--- /dev/null
+++ b/node_modules/rxjs/src/internal/operators/buffer.ts
@@ -0,0 +1,81 @@
+import { OperatorFunction, ObservableInput } from '../types';
+import { operate } from '../util/lift';
+import { noop } from '../util/noop';
+import { createOperatorSubscriber } from './OperatorSubscriber';
+import { innerFrom } from '../observable/innerFrom';
+
+/**
+ * Buffers the source Observable values until `closingNotifier` emits.
+ *
+ * <span class="informal">Collects values from the past as an array, and emits
+ * that array only when another Observable emits.</span>
+ *
+ * ![](buffer.png)
+ *
+ * Buffers the incoming Observable values until the given `closingNotifier`
+ * `ObservableInput` (that internally gets converted to an Observable)
+ * emits a value, at which point it emits the buffer on the output
+ * Observable and starts a new buffer internally, awaiting the next time
+ * `closingNotifier` emits.
+ *
+ * ## Example
+ *
+ * On every click, emit array of most recent interval events
+ *
+ * ```ts
+ * import { fromEvent, interval, buffer } from 'rxjs';
+ *
+ * const clicks = fromEvent(document, 'click');
+ * const intervalEvents = interval(1000);
+ * const buffered = intervalEvents.pipe(buffer(clicks));
+ * buffered.subscribe(x => console.log(x));
+ * ```
+ *
+ * @see {@link bufferCount}
+ * @see {@link bufferTime}
+ * @see {@link bufferToggle}
+ * @see {@link bufferWhen}
+ * @see {@link window}
+ *
+ * @param closingNotifier An `ObservableInput` that signals the
+ * buffer to be emitted on the output Observable.
+ * @return A function that returns an Observable of buffers, which are arrays
+ * of values.
+ */
+export function buffer<T>(closingNotifier: ObservableInput<any>): OperatorFunction<T, T[]> {
+ return operate((source, subscriber) => {
+ // The current buffered values.
+ let currentBuffer: T[] = [];
+
+ // Subscribe to our source.
+ source.subscribe(
+ createOperatorSubscriber(
+ subscriber,
+ (value) => currentBuffer.push(value),
+ () => {
+ subscriber.next(currentBuffer);
+ subscriber.complete();
+ }
+ )
+ );
+
+ // Subscribe to the closing notifier.
+ innerFrom(closingNotifier).subscribe(
+ createOperatorSubscriber(
+ subscriber,
+ () => {
+ // Start a new buffer and emit the previous one.
+ const b = currentBuffer;
+ currentBuffer = [];
+ subscriber.next(b);
+ },
+ noop
+ )
+ );
+
+ return () => {
+ // Ensure buffered values are released on finalization.
+ currentBuffer = null!;
+ };
+ });
+}
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage