aboutsummaryrefslogtreecommitdiffstats
path: root/node_modules/rxjs/src/internal/ReplaySubject.ts
diff options
context:
space:
mode:
authorPinapelz <yukais@pinapelz.com>2025-06-28 17:26:46 -0700
committerPinapelz <yukais@pinapelz.com>2025-06-28 17:43:56 -0700
commite4fa1e69e7ebfb627c7198fd1a9881e9327ec4d4 (patch)
tree06284a538a6008eca75051399e47db4e5d50301c /node_modules/rxjs/src/internal/ReplaySubject.ts
initial commit: scaffolding
Diffstat (limited to 'node_modules/rxjs/src/internal/ReplaySubject.ts')
-rw-r--r--node_modules/rxjs/src/internal/ReplaySubject.ts110
1 files changed, 110 insertions, 0 deletions
diff --git a/node_modules/rxjs/src/internal/ReplaySubject.ts b/node_modules/rxjs/src/internal/ReplaySubject.ts
new file mode 100644
index 0000000..67394b7
--- /dev/null
+++ b/node_modules/rxjs/src/internal/ReplaySubject.ts
@@ -0,0 +1,110 @@
+import { Subject } from './Subject';
+import { TimestampProvider } from './types';
+import { Subscriber } from './Subscriber';
+import { Subscription } from './Subscription';
+import { dateTimestampProvider } from './scheduler/dateTimestampProvider';
+
+/**
+ * A variant of {@link Subject} that "replays" old values to new subscribers by emitting them when they first subscribe.
+ *
+ * `ReplaySubject` has an internal buffer that will store a specified number of values that it has observed. Like `Subject`,
+ * `ReplaySubject` "observes" values by having them passed to its `next` method. When it observes a value, it will store that
+ * value for a time determined by the configuration of the `ReplaySubject`, as passed to its constructor.
+ *
+ * When a new subscriber subscribes to the `ReplaySubject` instance, it will synchronously emit all values in its buffer in
+ * a First-In-First-Out (FIFO) manner. The `ReplaySubject` will also complete, if it has observed completion; and it will
+ * error if it has observed an error.
+ *
+ * There are two main configuration items to be concerned with:
+ *
+ * 1. `bufferSize` - This will determine how many items are stored in the buffer, defaults to infinite.
+ * 2. `windowTime` - The amount of time to hold a value in the buffer before removing it from the buffer.
+ *
+ * Both configurations may exist simultaneously. So if you would like to buffer a maximum of 3 values, as long as the values
+ * are less than 2 seconds old, you could do so with a `new ReplaySubject(3, 2000)`.
+ *
+ * ### Differences with BehaviorSubject
+ *
+ * `BehaviorSubject` is similar to `new ReplaySubject(1)`, with a couple of exceptions:
+ *
+ * 1. `BehaviorSubject` comes "primed" with a single value upon construction.
+ * 2. `ReplaySubject` will replay values, even after observing an error, where `BehaviorSubject` will not.
+ *
+ * @see {@link Subject}
+ * @see {@link BehaviorSubject}
+ * @see {@link shareReplay}
+ */
+export class ReplaySubject<T> extends Subject<T> {
+ private _buffer: (T | number)[] = [];
+ private _infiniteTimeWindow = true;
+
+ /**
+ * @param _bufferSize The size of the buffer to replay on subscription
+ * @param _windowTime The amount of time the buffered items will stay buffered
+ * @param _timestampProvider An object with a `now()` method that provides the current timestamp. This is used to
+ * calculate the amount of time something has been buffered.
+ */
+ constructor(
+ private _bufferSize = Infinity,
+ private _windowTime = Infinity,
+ private _timestampProvider: TimestampProvider = dateTimestampProvider
+ ) {
+ super();
+ this._infiniteTimeWindow = _windowTime === Infinity;
+ this._bufferSize = Math.max(1, _bufferSize);
+ this._windowTime = Math.max(1, _windowTime);
+ }
+
+ next(value: T): void {
+ const { isStopped, _buffer, _infiniteTimeWindow, _timestampProvider, _windowTime } = this;
+ if (!isStopped) {
+ _buffer.push(value);
+ !_infiniteTimeWindow && _buffer.push(_timestampProvider.now() + _windowTime);
+ }
+ this._trimBuffer();
+ super.next(value);
+ }
+
+ /** @internal */
+ protected _subscribe(subscriber: Subscriber<T>): Subscription {
+ this._throwIfClosed();
+ this._trimBuffer();
+
+ const subscription = this._innerSubscribe(subscriber);
+
+ const { _infiniteTimeWindow, _buffer } = this;
+ // We use a copy here, so reentrant code does not mutate our array while we're
+ // emitting it to a new subscriber.
+ const copy = _buffer.slice();
+ for (let i = 0; i < copy.length && !subscriber.closed; i += _infiniteTimeWindow ? 1 : 2) {
+ subscriber.next(copy[i] as T);
+ }
+
+ this._checkFinalizedStatuses(subscriber);
+
+ return subscription;
+ }
+
+ private _trimBuffer() {
+ const { _bufferSize, _timestampProvider, _buffer, _infiniteTimeWindow } = this;
+ // If we don't have an infinite buffer size, and we're over the length,
+ // use splice to truncate the old buffer values off. Note that we have to
+ // double the size for instances where we're not using an infinite time window
+ // because we're storing the values and the timestamps in the same array.
+ const adjustedBufferSize = (_infiniteTimeWindow ? 1 : 2) * _bufferSize;
+ _bufferSize < Infinity && adjustedBufferSize < _buffer.length && _buffer.splice(0, _buffer.length - adjustedBufferSize);
+
+ // Now, if we're not in an infinite time window, remove all values where the time is
+ // older than what is allowed.
+ if (!_infiniteTimeWindow) {
+ const now = _timestampProvider.now();
+ let last = 0;
+ // Search the array for the first timestamp that isn't expired and
+ // truncate the buffer up to that point.
+ for (let i = 1; i < _buffer.length && (_buffer[i] as number) <= now; i += 2) {
+ last = i;
+ }
+ last && _buffer.splice(0, last + 1);
+ }
+ }
+}
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage