aboutsummaryrefslogtreecommitdiffstats
path: root/node_modules/rxjs/src/internal/observable/dom
diff options
context:
space:
mode:
authorPinapelz <yukais@pinapelz.com>2025-06-28 17:26:46 -0700
committerPinapelz <yukais@pinapelz.com>2025-06-28 17:43:56 -0700
commite4fa1e69e7ebfb627c7198fd1a9881e9327ec4d4 (patch)
tree06284a538a6008eca75051399e47db4e5d50301c /node_modules/rxjs/src/internal/observable/dom
initial commit: scaffolding
Diffstat (limited to 'node_modules/rxjs/src/internal/observable/dom')
-rw-r--r--node_modules/rxjs/src/internal/observable/dom/WebSocketSubject.ts397
-rw-r--r--node_modules/rxjs/src/internal/observable/dom/animationFrames.ts132
-rw-r--r--node_modules/rxjs/src/internal/observable/dom/fetch.ts180
-rw-r--r--node_modules/rxjs/src/internal/observable/dom/webSocket.ts161
4 files changed, 870 insertions, 0 deletions
diff --git a/node_modules/rxjs/src/internal/observable/dom/WebSocketSubject.ts b/node_modules/rxjs/src/internal/observable/dom/WebSocketSubject.ts
new file mode 100644
index 0000000..9eecbf5
--- /dev/null
+++ b/node_modules/rxjs/src/internal/observable/dom/WebSocketSubject.ts
@@ -0,0 +1,397 @@
+import { Subject, AnonymousSubject } from '../../Subject';
+import { Subscriber } from '../../Subscriber';
+import { Observable } from '../../Observable';
+import { Subscription } from '../../Subscription';
+import { Operator } from '../../Operator';
+import { ReplaySubject } from '../../ReplaySubject';
+import { Observer, NextObserver } from '../../types';
+
+/**
+ * WebSocketSubjectConfig is a plain Object that allows us to make our
+ * webSocket configurable.
+ *
+ * <span class="informal">Provides flexibility to {@link webSocket}</span>
+ *
+ * It defines a set of properties to provide custom behavior in specific
+ * moments of the socket's lifecycle. When the connection opens we can
+ * use `openObserver`, when the connection is closed `closeObserver`, if we
+ * are interested in listening for data coming from server: `deserializer`,
+ * which allows us to customize the deserialization strategy of data before passing it
+ * to the socket client. By default, `deserializer` is going to apply `JSON.parse` to each message coming
+ * from the Server.
+ *
+ * ## Examples
+ *
+ * **deserializer**, the default for this property is `JSON.parse` but since there are just two options
+ * for incoming data, either be text or binary data. We can apply a custom deserialization strategy
+ * or just simply skip the default behaviour.
+ *
+ * ```ts
+ * import { webSocket } from 'rxjs/webSocket';
+ *
+ * const wsSubject = webSocket({
+ * url: 'ws://localhost:8081',
+ * //Apply any transformation of your choice.
+ * deserializer: ({ data }) => data
+ * });
+ *
+ * wsSubject.subscribe(console.log);
+ *
+ * // Let's suppose we have this on the Server: ws.send('This is a msg from the server')
+ * //output
+ * //
+ * // This is a msg from the server
+ * ```
+ *
+ * **serializer** allows us to apply custom serialization strategy but for the outgoing messages.
+ *
+ * ```ts
+ * import { webSocket } from 'rxjs/webSocket';
+ *
+ * const wsSubject = webSocket({
+ * url: 'ws://localhost:8081',
+ * // Apply any transformation of your choice.
+ * serializer: msg => JSON.stringify({ channel: 'webDevelopment', msg: msg })
+ * });
+ *
+ * wsSubject.subscribe(() => subject.next('msg to the server'));
+ *
+ * // Let's suppose we have this on the Server:
+ * // ws.on('message', msg => console.log);
+ * // ws.send('This is a msg from the server');
+ * // output at server side:
+ * //
+ * // {"channel":"webDevelopment","msg":"msg to the server"}
+ * ```
+ *
+ * **closeObserver** allows us to set a custom error when an error raises up.
+ *
+ * ```ts
+ * import { webSocket } from 'rxjs/webSocket';
+ *
+ * const wsSubject = webSocket({
+ * url: 'ws://localhost:8081',
+ * closeObserver: {
+ * next() {
+ * const customError = { code: 6666, reason: 'Custom evil reason' }
+ * console.log(`code: ${ customError.code }, reason: ${ customError.reason }`);
+ * }
+ * }
+ * });
+ *
+ * // output
+ * // code: 6666, reason: Custom evil reason
+ * ```
+ *
+ * **openObserver**, Let's say we need to make some kind of init task before sending/receiving msgs to the
+ * webSocket or sending notification that the connection was successful, this is when
+ * openObserver is useful for.
+ *
+ * ```ts
+ * import { webSocket } from 'rxjs/webSocket';
+ *
+ * const wsSubject = webSocket({
+ * url: 'ws://localhost:8081',
+ * openObserver: {
+ * next: () => {
+ * console.log('Connection ok');
+ * }
+ * }
+ * });
+ *
+ * // output
+ * // Connection ok
+ * ```
+ */
+export interface WebSocketSubjectConfig<T> {
+ /** The url of the socket server to connect to */
+ url: string;
+ /** The protocol to use to connect */
+ protocol?: string | Array<string>;
+ /** @deprecated Will be removed in v8. Use {@link deserializer} instead. */
+ resultSelector?: (e: MessageEvent) => T;
+ /**
+ * A serializer used to create messages from passed values before the
+ * messages are sent to the server. Defaults to JSON.stringify.
+ */
+ serializer?: (value: T) => WebSocketMessage;
+ /**
+ * A deserializer used for messages arriving on the socket from the
+ * server. Defaults to JSON.parse.
+ */
+ deserializer?: (e: MessageEvent) => T;
+ /**
+ * An Observer that watches when open events occur on the underlying web socket.
+ */
+ openObserver?: NextObserver<Event>;
+ /**
+ * An Observer that watches when close events occur on the underlying web socket
+ */
+ closeObserver?: NextObserver<CloseEvent>;
+ /**
+ * An Observer that watches when a close is about to occur due to
+ * unsubscription.
+ */
+ closingObserver?: NextObserver<void>;
+ /**
+ * A WebSocket constructor to use. This is useful for situations like using a
+ * WebSocket impl in Node (WebSocket is a DOM API), or for mocking a WebSocket
+ * for testing purposes
+ */
+ WebSocketCtor?: { new (url: string, protocols?: string | string[]): WebSocket };
+ /** Sets the `binaryType` property of the underlying WebSocket. */
+ binaryType?: 'blob' | 'arraybuffer';
+}
+
+const DEFAULT_WEBSOCKET_CONFIG: WebSocketSubjectConfig<any> = {
+ url: '',
+ deserializer: (e: MessageEvent) => JSON.parse(e.data),
+ serializer: (value: any) => JSON.stringify(value),
+};
+
+const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT =
+ 'WebSocketSubject.error must be called with an object with an error code, and an optional reason: { code: number, reason: string }';
+
+export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView;
+
+export class WebSocketSubject<T> extends AnonymousSubject<T> {
+ // @ts-ignore: Property has no initializer and is not definitely assigned
+ private _config: WebSocketSubjectConfig<T>;
+
+ /** @internal */
+ // @ts-ignore: Property has no initializer and is not definitely assigned
+ _output: Subject<T>;
+
+ private _socket: WebSocket | null = null;
+
+ constructor(urlConfigOrSource: string | WebSocketSubjectConfig<T> | Observable<T>, destination?: Observer<T>) {
+ super();
+ if (urlConfigOrSource instanceof Observable) {
+ this.destination = destination;
+ this.source = urlConfigOrSource as Observable<T>;
+ } else {
+ const config = (this._config = { ...DEFAULT_WEBSOCKET_CONFIG });
+ this._output = new Subject<T>();
+ if (typeof urlConfigOrSource === 'string') {
+ config.url = urlConfigOrSource;
+ } else {
+ for (const key in urlConfigOrSource) {
+ if (urlConfigOrSource.hasOwnProperty(key)) {
+ (config as any)[key] = (urlConfigOrSource as any)[key];
+ }
+ }
+ }
+
+ if (!config.WebSocketCtor && WebSocket) {
+ config.WebSocketCtor = WebSocket;
+ } else if (!config.WebSocketCtor) {
+ throw new Error('no WebSocket constructor can be found');
+ }
+ this.destination = new ReplaySubject();
+ }
+ }
+
+ /** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
+ lift<R>(operator: Operator<T, R>): WebSocketSubject<R> {
+ const sock = new WebSocketSubject<R>(this._config as WebSocketSubjectConfig<any>, this.destination as any);
+ sock.operator = operator;
+ sock.source = this;
+ return sock;
+ }
+
+ private _resetState() {
+ this._socket = null;
+ if (!this.source) {
+ this.destination = new ReplaySubject();
+ }
+ this._output = new Subject<T>();
+ }
+
+ /**
+ * Creates an {@link Observable}, that when subscribed to, sends a message,
+ * defined by the `subMsg` function, to the server over the socket to begin a
+ * subscription to data over that socket. Once data arrives, the
+ * `messageFilter` argument will be used to select the appropriate data for
+ * the resulting Observable. When finalization occurs, either due to
+ * unsubscription, completion, or error, a message defined by the `unsubMsg`
+ * argument will be sent to the server over the WebSocketSubject.
+ *
+ * @param subMsg A function to generate the subscription message to be sent to
+ * the server. This will still be processed by the serializer in the
+ * WebSocketSubject's config. (Which defaults to JSON serialization)
+ * @param unsubMsg A function to generate the unsubscription message to be
+ * sent to the server at finalization. This will still be processed by the
+ * serializer in the WebSocketSubject's config.
+ * @param messageFilter A predicate for selecting the appropriate messages
+ * from the server for the output stream.
+ */
+ multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) {
+ const self = this;
+ return new Observable((observer: Observer<T>) => {
+ try {
+ self.next(subMsg());
+ } catch (err) {
+ observer.error(err);
+ }
+
+ const subscription = self.subscribe({
+ next: (x) => {
+ try {
+ if (messageFilter(x)) {
+ observer.next(x);
+ }
+ } catch (err) {
+ observer.error(err);
+ }
+ },
+ error: (err) => observer.error(err),
+ complete: () => observer.complete(),
+ });
+
+ return () => {
+ try {
+ self.next(unsubMsg());
+ } catch (err) {
+ observer.error(err);
+ }
+ subscription.unsubscribe();
+ };
+ });
+ }
+
+ private _connectSocket() {
+ const { WebSocketCtor, protocol, url, binaryType } = this._config;
+ const observer = this._output;
+
+ let socket: WebSocket | null = null;
+ try {
+ socket = protocol ? new WebSocketCtor!(url, protocol) : new WebSocketCtor!(url);
+ this._socket = socket;
+ if (binaryType) {
+ this._socket.binaryType = binaryType;
+ }
+ } catch (e) {
+ observer.error(e);
+ return;
+ }
+
+ const subscription = new Subscription(() => {
+ this._socket = null;
+ if (socket && socket.readyState === 1) {
+ socket.close();
+ }
+ });
+
+ socket.onopen = (evt: Event) => {
+ const { _socket } = this;
+ if (!_socket) {
+ socket!.close();
+ this._resetState();
+ return;
+ }
+ const { openObserver } = this._config;
+ if (openObserver) {
+ openObserver.next(evt);
+ }
+
+ const queue = this.destination;
+
+ this.destination = Subscriber.create<T>(
+ (x) => {
+ if (socket!.readyState === 1) {
+ try {
+ const { serializer } = this._config;
+ socket!.send(serializer!(x!));
+ } catch (e) {
+ this.destination!.error(e);
+ }
+ }
+ },
+ (err) => {
+ const { closingObserver } = this._config;
+ if (closingObserver) {
+ closingObserver.next(undefined);
+ }
+ if (err && err.code) {
+ socket!.close(err.code, err.reason);
+ } else {
+ observer.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT));
+ }
+ this._resetState();
+ },
+ () => {
+ const { closingObserver } = this._config;
+ if (closingObserver) {
+ closingObserver.next(undefined);
+ }
+ socket!.close();
+ this._resetState();
+ }
+ ) as Subscriber<any>;
+
+ if (queue && queue instanceof ReplaySubject) {
+ subscription.add((queue as ReplaySubject<T>).subscribe(this.destination));
+ }
+ };
+
+ socket.onerror = (e: Event) => {
+ this._resetState();
+ observer.error(e);
+ };
+
+ socket.onclose = (e: CloseEvent) => {
+ if (socket === this._socket) {
+ this._resetState();
+ }
+ const { closeObserver } = this._config;
+ if (closeObserver) {
+ closeObserver.next(e);
+ }
+ if (e.wasClean) {
+ observer.complete();
+ } else {
+ observer.error(e);
+ }
+ };
+
+ socket.onmessage = (e: MessageEvent) => {
+ try {
+ const { deserializer } = this._config;
+ observer.next(deserializer!(e));
+ } catch (err) {
+ observer.error(err);
+ }
+ };
+ }
+
+ /** @internal */
+ protected _subscribe(subscriber: Subscriber<T>): Subscription {
+ const { source } = this;
+ if (source) {
+ return source.subscribe(subscriber);
+ }
+ if (!this._socket) {
+ this._connectSocket();
+ }
+ this._output.subscribe(subscriber);
+ subscriber.add(() => {
+ const { _socket } = this;
+ if (this._output.observers.length === 0) {
+ if (_socket && (_socket.readyState === 1 || _socket.readyState === 0)) {
+ _socket.close();
+ }
+ this._resetState();
+ }
+ });
+ return subscriber;
+ }
+
+ unsubscribe() {
+ const { _socket } = this;
+ if (_socket && (_socket.readyState === 1 || _socket.readyState === 0)) {
+ _socket.close();
+ }
+ this._resetState();
+ super.unsubscribe();
+ }
+}
diff --git a/node_modules/rxjs/src/internal/observable/dom/animationFrames.ts b/node_modules/rxjs/src/internal/observable/dom/animationFrames.ts
new file mode 100644
index 0000000..38b338b
--- /dev/null
+++ b/node_modules/rxjs/src/internal/observable/dom/animationFrames.ts
@@ -0,0 +1,132 @@
+import { Observable } from '../../Observable';
+import { TimestampProvider } from '../../types';
+import { performanceTimestampProvider } from '../../scheduler/performanceTimestampProvider';
+import { animationFrameProvider } from '../../scheduler/animationFrameProvider';
+
+/**
+ * An observable of animation frames
+ *
+ * Emits the amount of time elapsed since subscription and the timestamp on each animation frame.
+ * Defaults to milliseconds provided to the requestAnimationFrame's callback. Does not end on its own.
+ *
+ * Every subscription will start a separate animation loop. Since animation frames are always scheduled
+ * by the browser to occur directly before a repaint, scheduling more than one animation frame synchronously
+ * should not be much different or have more overhead than looping over an array of events during
+ * a single animation frame. However, if for some reason the developer would like to ensure the
+ * execution of animation-related handlers are all executed during the same task by the engine,
+ * the `share` operator can be used.
+ *
+ * This is useful for setting up animations with RxJS.
+ *
+ * ## Examples
+ *
+ * Tweening a div to move it on the screen
+ *
+ * ```ts
+ * import { animationFrames, map, takeWhile, endWith } from 'rxjs';
+ *
+ * function tween(start: number, end: number, duration: number) {
+ * const diff = end - start;
+ * return animationFrames().pipe(
+ * // Figure out what percentage of time has passed
+ * map(({ elapsed }) => elapsed / duration),
+ * // Take the vector while less than 100%
+ * takeWhile(v => v < 1),
+ * // Finish with 100%
+ * endWith(1),
+ * // Calculate the distance traveled between start and end
+ * map(v => v * diff + start)
+ * );
+ * }
+ *
+ * // Setup a div for us to move around
+ * const div = document.createElement('div');
+ * document.body.appendChild(div);
+ * div.style.position = 'absolute';
+ * div.style.width = '40px';
+ * div.style.height = '40px';
+ * div.style.backgroundColor = 'lime';
+ * div.style.transform = 'translate3d(10px, 0, 0)';
+ *
+ * tween(10, 200, 4000).subscribe(x => {
+ * div.style.transform = `translate3d(${ x }px, 0, 0)`;
+ * });
+ * ```
+ *
+ * Providing a custom timestamp provider
+ *
+ * ```ts
+ * import { animationFrames, TimestampProvider } from 'rxjs';
+ *
+ * // A custom timestamp provider
+ * let now = 0;
+ * const customTSProvider: TimestampProvider = {
+ * now() { return now++; }
+ * };
+ *
+ * const source$ = animationFrames(customTSProvider);
+ *
+ * // Log increasing numbers 0...1...2... on every animation frame.
+ * source$.subscribe(({ elapsed }) => console.log(elapsed));
+ * ```
+ *
+ * @param timestampProvider An object with a `now` method that provides a numeric timestamp
+ */
+export function animationFrames(timestampProvider?: TimestampProvider) {
+ return timestampProvider ? animationFramesFactory(timestampProvider) : DEFAULT_ANIMATION_FRAMES;
+}
+
+/**
+ * Does the work of creating the observable for `animationFrames`.
+ * @param timestampProvider The timestamp provider to use to create the observable
+ */
+function animationFramesFactory(timestampProvider?: TimestampProvider) {
+ return new Observable<{ timestamp: number; elapsed: number }>((subscriber) => {
+ // If no timestamp provider is specified, use performance.now() - as it
+ // will return timestamps 'compatible' with those passed to the run
+ // callback and won't be affected by NTP adjustments, etc.
+ const provider = timestampProvider || performanceTimestampProvider;
+
+ // Capture the start time upon subscription, as the run callback can remain
+ // queued for a considerable period of time and the elapsed time should
+ // represent the time elapsed since subscription - not the time since the
+ // first rendered animation frame.
+ const start = provider.now();
+
+ let id = 0;
+ const run = () => {
+ if (!subscriber.closed) {
+ id = animationFrameProvider.requestAnimationFrame((timestamp: DOMHighResTimeStamp | number) => {
+ id = 0;
+ // Use the provider's timestamp to calculate the elapsed time. Note that
+ // this means - if the caller hasn't passed a provider - that
+ // performance.now() will be used instead of the timestamp that was
+ // passed to the run callback. The reason for this is that the timestamp
+ // passed to the callback can be earlier than the start time, as it
+ // represents the time at which the browser decided it would render any
+ // queued frames - and that time can be earlier the captured start time.
+ const now = provider.now();
+ subscriber.next({
+ timestamp: timestampProvider ? now : timestamp,
+ elapsed: now - start,
+ });
+ run();
+ });
+ }
+ };
+
+ run();
+
+ return () => {
+ if (id) {
+ animationFrameProvider.cancelAnimationFrame(id);
+ }
+ };
+ });
+}
+
+/**
+ * In the common case, where the timestamp provided by the rAF API is used,
+ * we use this shared observable to reduce overhead.
+ */
+const DEFAULT_ANIMATION_FRAMES = animationFramesFactory();
diff --git a/node_modules/rxjs/src/internal/observable/dom/fetch.ts b/node_modules/rxjs/src/internal/observable/dom/fetch.ts
new file mode 100644
index 0000000..1894d24
--- /dev/null
+++ b/node_modules/rxjs/src/internal/observable/dom/fetch.ts
@@ -0,0 +1,180 @@
+import { createOperatorSubscriber } from '../../operators/OperatorSubscriber';
+import { Observable } from '../../Observable';
+import { innerFrom } from '../../observable/innerFrom';
+import { ObservableInput } from '../../types';
+
+export function fromFetch<T>(
+ input: string | Request,
+ init: RequestInit & {
+ selector: (response: Response) => ObservableInput<T>;
+ }
+): Observable<T>;
+
+export function fromFetch(input: string | Request, init?: RequestInit): Observable<Response>;
+
+/**
+ * Uses [the Fetch API](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) to
+ * make an HTTP request.
+ *
+ * **WARNING** Parts of the fetch API are still experimental. `AbortController` is
+ * required for this implementation to work and use cancellation appropriately.
+ *
+ * Will automatically set up an internal [AbortController](https://developer.mozilla.org/en-US/docs/Web/API/AbortController)
+ * in order to finalize the internal `fetch` when the subscription tears down.
+ *
+ * If a `signal` is provided via the `init` argument, it will behave like it usually does with
+ * `fetch`. If the provided `signal` aborts, the error that `fetch` normally rejects with
+ * in that scenario will be emitted as an error from the observable.
+ *
+ * ## Examples
+ *
+ * Basic use
+ *
+ * ```ts
+ * import { fromFetch } from 'rxjs/fetch';
+ * import { switchMap, of, catchError } from 'rxjs';
+ *
+ * const data$ = fromFetch('https://api.github.com/users?per_page=5').pipe(
+ * switchMap(response => {
+ * if (response.ok) {
+ * // OK return data
+ * return response.json();
+ * } else {
+ * // Server is returning a status requiring the client to try something else.
+ * return of({ error: true, message: `Error ${ response.status }` });
+ * }
+ * }),
+ * catchError(err => {
+ * // Network or other error, handle appropriately
+ * console.error(err);
+ * return of({ error: true, message: err.message })
+ * })
+ * );
+ *
+ * data$.subscribe({
+ * next: result => console.log(result),
+ * complete: () => console.log('done')
+ * });
+ * ```
+ *
+ * ### Use with Chunked Transfer Encoding
+ *
+ * With HTTP responses that use [chunked transfer encoding](https://tools.ietf.org/html/rfc7230#section-3.3.1),
+ * the promise returned by `fetch` will resolve as soon as the response's headers are
+ * received.
+ *
+ * That means the `fromFetch` observable will emit a `Response` - and will
+ * then complete - before the body is received. When one of the methods on the
+ * `Response` - like `text()` or `json()` - is called, the returned promise will not
+ * resolve until the entire body has been received. Unsubscribing from any observable
+ * that uses the promise as an observable input will not abort the request.
+ *
+ * To facilitate aborting the retrieval of responses that use chunked transfer encoding,
+ * a `selector` can be specified via the `init` parameter:
+ *
+ * ```ts
+ * import { of } from 'rxjs';
+ * import { fromFetch } from 'rxjs/fetch';
+ *
+ * const data$ = fromFetch('https://api.github.com/users?per_page=5', {
+ * selector: response => response.json()
+ * });
+ *
+ * data$.subscribe({
+ * next: result => console.log(result),
+ * complete: () => console.log('done')
+ * });
+ * ```
+ *
+ * @param input The resource you would like to fetch. Can be a url or a request object.
+ * @param initWithSelector A configuration object for the fetch.
+ * [See MDN for more details](https://developer.mozilla.org/en-US/docs/Web/API/WindowOrWorkerGlobalScope/fetch#Parameters)
+ * @returns An Observable, that when subscribed to, performs an HTTP request using the native `fetch`
+ * function. The {@link Subscription} is tied to an `AbortController` for the fetch.
+ */
+export function fromFetch<T>(
+ input: string | Request,
+ initWithSelector: RequestInit & {
+ selector?: (response: Response) => ObservableInput<T>;
+ } = {}
+): Observable<Response | T> {
+ const { selector, ...init } = initWithSelector;
+ return new Observable<Response | T>((subscriber) => {
+ // Our controller for aborting this fetch.
+ // Any externally provided AbortSignal will have to call
+ // abort on this controller when signaled, because the
+ // signal from this controller is what is being passed to `fetch`.
+ const controller = new AbortController();
+ const { signal } = controller;
+ // This flag exists to make sure we don't `abort()` the fetch upon tearing down
+ // this observable after emitting a Response. Aborting in such circumstances
+ // would also abort subsequent methods - like `json()` - that could be called
+ // on the Response. Consider: `fromFetch().pipe(take(1), mergeMap(res => res.json()))`
+ let abortable = true;
+
+ // If the user provided an init configuration object,
+ // let's process it and chain our abort signals, if necessary.
+ // If a signal is provided, just have it finalized. It's a cancellation token, basically.
+ const { signal: outerSignal } = init;
+ if (outerSignal) {
+ if (outerSignal.aborted) {
+ controller.abort();
+ } else {
+ // We got an AbortSignal from the arguments passed into `fromFetch`.
+ // We need to wire up our AbortController to abort when this signal aborts.
+ const outerSignalHandler = () => {
+ if (!signal.aborted) {
+ controller.abort();
+ }
+ };
+ outerSignal.addEventListener('abort', outerSignalHandler);
+ subscriber.add(() => outerSignal.removeEventListener('abort', outerSignalHandler));
+ }
+ }
+
+ // The initialization object passed to `fetch` as the second
+ // argument. This ferries in important information, including our
+ // AbortSignal. Create a new init, so we don't accidentally mutate the
+ // passed init, or reassign it. This is because the init passed in
+ // is shared between each subscription to the result.
+ const perSubscriberInit: RequestInit = { ...init, signal };
+
+ const handleError = (err: any) => {
+ abortable = false;
+ subscriber.error(err);
+ };
+
+ fetch(input, perSubscriberInit)
+ .then((response) => {
+ if (selector) {
+ // If we have a selector function, use it to project our response.
+ // Note that any error that comes from our selector will be
+ // sent to the promise `catch` below and handled.
+ innerFrom(selector(response)).subscribe(
+ createOperatorSubscriber(
+ subscriber,
+ // Values are passed through to the subscriber
+ undefined,
+ // The projected response is complete.
+ () => {
+ abortable = false;
+ subscriber.complete();
+ },
+ handleError
+ )
+ );
+ } else {
+ abortable = false;
+ subscriber.next(response);
+ subscriber.complete();
+ }
+ })
+ .catch(handleError);
+
+ return () => {
+ if (abortable) {
+ controller.abort();
+ }
+ };
+ });
+}
diff --git a/node_modules/rxjs/src/internal/observable/dom/webSocket.ts b/node_modules/rxjs/src/internal/observable/dom/webSocket.ts
new file mode 100644
index 0000000..b10c5d8
--- /dev/null
+++ b/node_modules/rxjs/src/internal/observable/dom/webSocket.ts
@@ -0,0 +1,161 @@
+import { WebSocketSubject, WebSocketSubjectConfig } from './WebSocketSubject';
+
+/**
+ * Wrapper around the w3c-compatible WebSocket object provided by the browser.
+ *
+ * <span class="informal">{@link Subject} that communicates with a server via WebSocket</span>
+ *
+ * `webSocket` is a factory function that produces a `WebSocketSubject`,
+ * which can be used to make WebSocket connection with an arbitrary endpoint.
+ * `webSocket` accepts as an argument either a string with url of WebSocket endpoint, or an
+ * {@link WebSocketSubjectConfig} object for providing additional configuration, as
+ * well as Observers for tracking lifecycle of WebSocket connection.
+ *
+ * When `WebSocketSubject` is subscribed, it attempts to make a socket connection,
+ * unless there is one made already. This means that many subscribers will always listen
+ * on the same socket, thus saving resources. If however, two instances are made of `WebSocketSubject`,
+ * even if these two were provided with the same url, they will attempt to make separate
+ * connections. When consumer of a `WebSocketSubject` unsubscribes, socket connection is closed,
+ * only if there are no more subscribers still listening. If after some time a consumer starts
+ * subscribing again, connection is reestablished.
+ *
+ * Once connection is made, whenever a new message comes from the server, `WebSocketSubject` will emit that
+ * message as a value in the stream. By default, a message from the socket is parsed via `JSON.parse`. If you
+ * want to customize how deserialization is handled (if at all), you can provide custom `resultSelector`
+ * function in {@link WebSocketSubject}. When connection closes, stream will complete, provided it happened without
+ * any errors. If at any point (starting, maintaining or closing a connection) there is an error,
+ * stream will also error with whatever WebSocket API has thrown.
+ *
+ * By virtue of being a {@link Subject}, `WebSocketSubject` allows for receiving and sending messages from the server. In order
+ * to communicate with a connected endpoint, use `next`, `error` and `complete` methods. `next` sends a value to the server, so bear in mind
+ * that this value will not be serialized beforehand. Because of This, `JSON.stringify` will have to be called on a value by hand,
+ * before calling `next` with a result. Note also that if at the moment of nexting value
+ * there is no socket connection (for example no one is subscribing), those values will be buffered, and sent when connection
+ * is finally established. `complete` method closes socket connection. `error` does the same,
+ * as well as notifying the server that something went wrong via status code and string with details of what happened.
+ * Since status code is required in WebSocket API, `WebSocketSubject` does not allow, like regular `Subject`,
+ * arbitrary values being passed to the `error` method. It needs to be called with an object that has `code`
+ * property with status code number and optional `reason` property with string describing details
+ * of an error.
+ *
+ * Calling `next` does not affect subscribers of `WebSocketSubject` - they have no
+ * information that something was sent to the server (unless of course the server
+ * responds somehow to a message). On the other hand, since calling `complete` triggers
+ * an attempt to close socket connection. If that connection is closed without any errors, stream will
+ * complete, thus notifying all subscribers. And since calling `error` closes
+ * socket connection as well, just with a different status code for the server, if closing itself proceeds
+ * without errors, subscribed Observable will not error, as one might expect, but complete as usual. In both cases
+ * (calling `complete` or `error`), if process of closing socket connection results in some errors, *then* stream
+ * will error.
+ *
+ * **Multiplexing**
+ *
+ * `WebSocketSubject` has an additional operator, not found in other Subjects. It is called `multiplex` and it is
+ * used to simulate opening several socket connections, while in reality maintaining only one.
+ * For example, an application has both chat panel and real-time notifications about sport news. Since these are two distinct functions,
+ * it would make sense to have two separate connections for each. Perhaps there could even be two separate services with WebSocket
+ * endpoints, running on separate machines with only GUI combining them together. Having a socket connection
+ * for each functionality could become too resource expensive. It is a common pattern to have single
+ * WebSocket endpoint that acts as a gateway for the other services (in this case chat and sport news services).
+ * Even though there is a single connection in a client app, having the ability to manipulate streams as if it
+ * were two separate sockets is desirable. This eliminates manually registering and unregistering in a gateway for
+ * given service and filter out messages of interest. This is exactly what `multiplex` method is for.
+ *
+ * Method accepts three parameters. First two are functions returning subscription and unsubscription messages
+ * respectively. These are messages that will be sent to the server, whenever consumer of resulting Observable
+ * subscribes and unsubscribes. Server can use them to verify that some kind of messages should start or stop
+ * being forwarded to the client. In case of the above example application, after getting subscription message with proper identifier,
+ * gateway server can decide that it should connect to real sport news service and start forwarding messages from it.
+ * Note that both messages will be sent as returned by the functions, they are by default serialized using JSON.stringify, just
+ * as messages pushed via `next`. Also bear in mind that these messages will be sent on *every* subscription and
+ * unsubscription. This is potentially dangerous, because one consumer of an Observable may unsubscribe and the server
+ * might stop sending messages, since it got unsubscription message. This needs to be handled
+ * on the server or using {@link publish} on a Observable returned from 'multiplex'.
+ *
+ * Last argument to `multiplex` is a `messageFilter` function which should return a boolean. It is used to filter out messages
+ * sent by the server to only those that belong to simulated WebSocket stream. For example, server might mark these
+ * messages with some kind of string identifier on a message object and `messageFilter` would return `true`
+ * if there is such identifier on an object emitted by the socket. Messages which returns `false` in `messageFilter` are simply skipped,
+ * and are not passed down the stream.
+ *
+ * Return value of `multiplex` is an Observable with messages incoming from emulated socket connection. Note that this
+ * is not a `WebSocketSubject`, so calling `next` or `multiplex` again will fail. For pushing values to the
+ * server, use root `WebSocketSubject`.
+ *
+ * ## Examples
+ *
+ * Listening for messages from the server
+ *
+ * ```ts
+ * import { webSocket } from 'rxjs/webSocket';
+ *
+ * const subject = webSocket('ws://localhost:8081');
+ *
+ * subject.subscribe({
+ * next: msg => console.log('message received: ' + msg), // Called whenever there is a message from the server.
+ * error: err => console.log(err), // Called if at any point WebSocket API signals some kind of error.
+ * complete: () => console.log('complete') // Called when connection is closed (for whatever reason).
+ * });
+ * ```
+ *
+ * Pushing messages to the server
+ *
+ * ```ts
+ * import { webSocket } from 'rxjs/webSocket';
+ *
+ * const subject = webSocket('ws://localhost:8081');
+ *
+ * subject.subscribe();
+ * // Note that at least one consumer has to subscribe to the created subject - otherwise "nexted" values will be just buffered and not sent,
+ * // since no connection was established!
+ *
+ * subject.next({ message: 'some message' });
+ * // This will send a message to the server once a connection is made. Remember value is serialized with JSON.stringify by default!
+ *
+ * subject.complete(); // Closes the connection.
+ *
+ * subject.error({ code: 4000, reason: 'I think our app just broke!' });
+ * // Also closes the connection, but let's the server know that this closing is caused by some error.
+ * ```
+ *
+ * Multiplexing WebSocket
+ *
+ * ```ts
+ * import { webSocket } from 'rxjs/webSocket';
+ *
+ * const subject = webSocket('ws://localhost:8081');
+ *
+ * const observableA = subject.multiplex(
+ * () => ({ subscribe: 'A' }), // When server gets this message, it will start sending messages for 'A'...
+ * () => ({ unsubscribe: 'A' }), // ...and when gets this one, it will stop.
+ * message => message.type === 'A' // If the function returns `true` message is passed down the stream. Skipped if the function returns false.
+ * );
+ *
+ * const observableB = subject.multiplex( // And the same goes for 'B'.
+ * () => ({ subscribe: 'B' }),
+ * () => ({ unsubscribe: 'B' }),
+ * message => message.type === 'B'
+ * );
+ *
+ * const subA = observableA.subscribe(messageForA => console.log(messageForA));
+ * // At this moment WebSocket connection is established. Server gets '{"subscribe": "A"}' message and starts sending messages for 'A',
+ * // which we log here.
+ *
+ * const subB = observableB.subscribe(messageForB => console.log(messageForB));
+ * // Since we already have a connection, we just send '{"subscribe": "B"}' message to the server. It starts sending messages for 'B',
+ * // which we log here.
+ *
+ * subB.unsubscribe();
+ * // Message '{"unsubscribe": "B"}' is sent to the server, which stops sending 'B' messages.
+ *
+ * subA.unsubscribe();
+ * // Message '{"unsubscribe": "A"}' makes the server stop sending messages for 'A'. Since there is no more subscribers to root Subject,
+ * // socket connection closes.
+ * ```
+ *
+ * @param urlConfigOrSource The WebSocket endpoint as an url or an object with configuration and additional Observers.
+ * @return Subject which allows to both send and receive messages via WebSocket connection.
+ */
+export function webSocket<T>(urlConfigOrSource: string | WebSocketSubjectConfig<T>): WebSocketSubject<T> {
+ return new WebSocketSubject<T>(urlConfigOrSource);
+}
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage