aboutsummaryrefslogtreecommitdiffstats
path: root/node_modules/rxjs/src/internal/operators/sequenceEqual.ts
diff options
context:
space:
mode:
authorPinapelz <yukais@pinapelz.com>2025-06-28 17:26:46 -0700
committerPinapelz <yukais@pinapelz.com>2025-06-28 17:43:56 -0700
commite4fa1e69e7ebfb627c7198fd1a9881e9327ec4d4 (patch)
tree06284a538a6008eca75051399e47db4e5d50301c /node_modules/rxjs/src/internal/operators/sequenceEqual.ts
initial commit: scaffolding
Diffstat (limited to 'node_modules/rxjs/src/internal/operators/sequenceEqual.ts')
-rw-r--r--node_modules/rxjs/src/internal/operators/sequenceEqual.ts146
1 files changed, 146 insertions, 0 deletions
diff --git a/node_modules/rxjs/src/internal/operators/sequenceEqual.ts b/node_modules/rxjs/src/internal/operators/sequenceEqual.ts
new file mode 100644
index 0000000..a6f9bec
--- /dev/null
+++ b/node_modules/rxjs/src/internal/operators/sequenceEqual.ts
@@ -0,0 +1,146 @@
+import { OperatorFunction, ObservableInput } from '../types';
+import { operate } from '../util/lift';
+import { createOperatorSubscriber } from './OperatorSubscriber';
+import { innerFrom } from '../observable/innerFrom';
+
+/**
+ * Compares all values of two observables in sequence using an optional comparator function
+ * and returns an observable of a single boolean value representing whether or not the two sequences
+ * are equal.
+ *
+ * <span class="informal">Checks to see of all values emitted by both observables are equal, in order.</span>
+ *
+ * ![](sequenceEqual.png)
+ *
+ * `sequenceEqual` subscribes to source observable and `compareTo` `ObservableInput` (that internally
+ * gets converted to an observable) and buffers incoming values from each observable. Whenever either
+ * observable emits a value, the value is buffered and the buffers are shifted and compared from the bottom
+ * up; If any value pair doesn't match, the returned observable will emit `false` and complete. If one of the
+ * observables completes, the operator will wait for the other observable to complete; If the other
+ * observable emits before completing, the returned observable will emit `false` and complete. If one observable never
+ * completes or emits after the other completes, the returned observable will never complete.
+ *
+ * ## Example
+ *
+ * Figure out if the Konami code matches
+ *
+ * ```ts
+ * import { from, fromEvent, map, bufferCount, mergeMap, sequenceEqual } from 'rxjs';
+ *
+ * const codes = from([
+ * 'ArrowUp',
+ * 'ArrowUp',
+ * 'ArrowDown',
+ * 'ArrowDown',
+ * 'ArrowLeft',
+ * 'ArrowRight',
+ * 'ArrowLeft',
+ * 'ArrowRight',
+ * 'KeyB',
+ * 'KeyA',
+ * 'Enter', // no start key, clearly.
+ * ]);
+ *
+ * const keys = fromEvent<KeyboardEvent>(document, 'keyup').pipe(map(e => e.code));
+ * const matches = keys.pipe(
+ * bufferCount(11, 1),
+ * mergeMap(last11 => from(last11).pipe(sequenceEqual(codes)))
+ * );
+ * matches.subscribe(matched => console.log('Successful cheat at Contra? ', matched));
+ * ```
+ *
+ * @see {@link combineLatest}
+ * @see {@link zip}
+ * @see {@link withLatestFrom}
+ *
+ * @param compareTo The `ObservableInput` sequence to compare the source sequence to.
+ * @param comparator An optional function to compare each value pair.
+ *
+ * @return A function that returns an Observable that emits a single boolean
+ * value representing whether or not the values emitted by the source
+ * Observable and provided `ObservableInput` were equal in sequence.
+ */
+export function sequenceEqual<T>(
+ compareTo: ObservableInput<T>,
+ comparator: (a: T, b: T) => boolean = (a, b) => a === b
+): OperatorFunction<T, boolean> {
+ return operate((source, subscriber) => {
+ // The state for the source observable
+ const aState = createState<T>();
+ // The state for the compareTo observable;
+ const bState = createState<T>();
+
+ /** A utility to emit and complete */
+ const emit = (isEqual: boolean) => {
+ subscriber.next(isEqual);
+ subscriber.complete();
+ };
+
+ /**
+ * Creates a subscriber that subscribes to one of the sources, and compares its collected
+ * state -- `selfState` -- to the other source's collected state -- `otherState`. This
+ * is used for both streams.
+ */
+ const createSubscriber = (selfState: SequenceState<T>, otherState: SequenceState<T>) => {
+ const sequenceEqualSubscriber = createOperatorSubscriber(
+ subscriber,
+ (a: T) => {
+ const { buffer, complete } = otherState;
+ if (buffer.length === 0) {
+ // If there's no values in the other buffer
+ // and the other stream is complete, we know
+ // this isn't a match, because we got one more value.
+ // Otherwise, we push onto our buffer, so when the other
+ // stream emits, it can pull this value off our buffer and check it
+ // at the appropriate time.
+ complete ? emit(false) : selfState.buffer.push(a);
+ } else {
+ // If the other stream *does* have values in its buffer,
+ // pull the oldest one off so we can compare it to what we
+ // just got. If it wasn't a match, emit `false` and complete.
+ !comparator(a, buffer.shift()!) && emit(false);
+ }
+ },
+ () => {
+ // Or observable completed
+ selfState.complete = true;
+ const { complete, buffer } = otherState;
+ // If the other observable is also complete, and there's
+ // still stuff left in their buffer, it doesn't match, if their
+ // buffer is empty, then it does match. This is because we can't
+ // possibly get more values here anymore.
+ complete && emit(buffer.length === 0);
+ // Be sure to clean up our stream as soon as possible if we can.
+ sequenceEqualSubscriber?.unsubscribe();
+ }
+ );
+
+ return sequenceEqualSubscriber;
+ };
+
+ // Subscribe to each source.
+ source.subscribe(createSubscriber(aState, bState));
+ innerFrom(compareTo).subscribe(createSubscriber(bState, aState));
+ });
+}
+
+/**
+ * A simple structure for the data used to test each sequence
+ */
+interface SequenceState<T> {
+ /** A temporary store for arrived values before they are checked */
+ buffer: T[];
+ /** Whether or not the sequence source has completed. */
+ complete: boolean;
+}
+
+/**
+ * Creates a simple structure that is used to represent
+ * data used to test each sequence.
+ */
+function createState<T>(): SequenceState<T> {
+ return {
+ buffer: [],
+ complete: false,
+ };
+}
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage