diff options
| author | Pinapelz <yukais@pinapelz.com> | 2025-06-29 11:49:28 -0700 |
|---|---|---|
| committer | Pinapelz <yukais@pinapelz.com> | 2025-06-29 11:49:28 -0700 |
| commit | d55b767039605256c736166a942a9138e3eacfd7 (patch) | |
| tree | 947063b634c50d438a794325f13275e134aa5993 /node_modules/rxjs/src/internal/observable/dom/WebSocketSubject.ts | |
| parent | 864ce67d89c77d8ef9c3361f80d619853abcf91c (diff) | |
remove dev node_modules (oops)
Diffstat (limited to 'node_modules/rxjs/src/internal/observable/dom/WebSocketSubject.ts')
| -rw-r--r-- | node_modules/rxjs/src/internal/observable/dom/WebSocketSubject.ts | 397 |
1 files changed, 0 insertions, 397 deletions
diff --git a/node_modules/rxjs/src/internal/observable/dom/WebSocketSubject.ts b/node_modules/rxjs/src/internal/observable/dom/WebSocketSubject.ts deleted file mode 100644 index 9eecbf5..0000000 --- a/node_modules/rxjs/src/internal/observable/dom/WebSocketSubject.ts +++ /dev/null @@ -1,397 +0,0 @@ -import { Subject, AnonymousSubject } from '../../Subject'; -import { Subscriber } from '../../Subscriber'; -import { Observable } from '../../Observable'; -import { Subscription } from '../../Subscription'; -import { Operator } from '../../Operator'; -import { ReplaySubject } from '../../ReplaySubject'; -import { Observer, NextObserver } from '../../types'; - -/** - * WebSocketSubjectConfig is a plain Object that allows us to make our - * webSocket configurable. - * - * <span class="informal">Provides flexibility to {@link webSocket}</span> - * - * It defines a set of properties to provide custom behavior in specific - * moments of the socket's lifecycle. When the connection opens we can - * use `openObserver`, when the connection is closed `closeObserver`, if we - * are interested in listening for data coming from server: `deserializer`, - * which allows us to customize the deserialization strategy of data before passing it - * to the socket client. By default, `deserializer` is going to apply `JSON.parse` to each message coming - * from the Server. - * - * ## Examples - * - * **deserializer**, the default for this property is `JSON.parse` but since there are just two options - * for incoming data, either be text or binary data. We can apply a custom deserialization strategy - * or just simply skip the default behaviour. - * - * ```ts - * import { webSocket } from 'rxjs/webSocket'; - * - * const wsSubject = webSocket({ - * url: 'ws://localhost:8081', - * //Apply any transformation of your choice. - * deserializer: ({ data }) => data - * }); - * - * wsSubject.subscribe(console.log); - * - * // Let's suppose we have this on the Server: ws.send('This is a msg from the server') - * //output - * // - * // This is a msg from the server - * ``` - * - * **serializer** allows us to apply custom serialization strategy but for the outgoing messages. - * - * ```ts - * import { webSocket } from 'rxjs/webSocket'; - * - * const wsSubject = webSocket({ - * url: 'ws://localhost:8081', - * // Apply any transformation of your choice. - * serializer: msg => JSON.stringify({ channel: 'webDevelopment', msg: msg }) - * }); - * - * wsSubject.subscribe(() => subject.next('msg to the server')); - * - * // Let's suppose we have this on the Server: - * // ws.on('message', msg => console.log); - * // ws.send('This is a msg from the server'); - * // output at server side: - * // - * // {"channel":"webDevelopment","msg":"msg to the server"} - * ``` - * - * **closeObserver** allows us to set a custom error when an error raises up. - * - * ```ts - * import { webSocket } from 'rxjs/webSocket'; - * - * const wsSubject = webSocket({ - * url: 'ws://localhost:8081', - * closeObserver: { - * next() { - * const customError = { code: 6666, reason: 'Custom evil reason' } - * console.log(`code: ${ customError.code }, reason: ${ customError.reason }`); - * } - * } - * }); - * - * // output - * // code: 6666, reason: Custom evil reason - * ``` - * - * **openObserver**, Let's say we need to make some kind of init task before sending/receiving msgs to the - * webSocket or sending notification that the connection was successful, this is when - * openObserver is useful for. - * - * ```ts - * import { webSocket } from 'rxjs/webSocket'; - * - * const wsSubject = webSocket({ - * url: 'ws://localhost:8081', - * openObserver: { - * next: () => { - * console.log('Connection ok'); - * } - * } - * }); - * - * // output - * // Connection ok - * ``` - */ -export interface WebSocketSubjectConfig<T> { - /** The url of the socket server to connect to */ - url: string; - /** The protocol to use to connect */ - protocol?: string | Array<string>; - /** @deprecated Will be removed in v8. Use {@link deserializer} instead. */ - resultSelector?: (e: MessageEvent) => T; - /** - * A serializer used to create messages from passed values before the - * messages are sent to the server. Defaults to JSON.stringify. - */ - serializer?: (value: T) => WebSocketMessage; - /** - * A deserializer used for messages arriving on the socket from the - * server. Defaults to JSON.parse. - */ - deserializer?: (e: MessageEvent) => T; - /** - * An Observer that watches when open events occur on the underlying web socket. - */ - openObserver?: NextObserver<Event>; - /** - * An Observer that watches when close events occur on the underlying web socket - */ - closeObserver?: NextObserver<CloseEvent>; - /** - * An Observer that watches when a close is about to occur due to - * unsubscription. - */ - closingObserver?: NextObserver<void>; - /** - * A WebSocket constructor to use. This is useful for situations like using a - * WebSocket impl in Node (WebSocket is a DOM API), or for mocking a WebSocket - * for testing purposes - */ - WebSocketCtor?: { new (url: string, protocols?: string | string[]): WebSocket }; - /** Sets the `binaryType` property of the underlying WebSocket. */ - binaryType?: 'blob' | 'arraybuffer'; -} - -const DEFAULT_WEBSOCKET_CONFIG: WebSocketSubjectConfig<any> = { - url: '', - deserializer: (e: MessageEvent) => JSON.parse(e.data), - serializer: (value: any) => JSON.stringify(value), -}; - -const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT = - 'WebSocketSubject.error must be called with an object with an error code, and an optional reason: { code: number, reason: string }'; - -export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView; - -export class WebSocketSubject<T> extends AnonymousSubject<T> { - // @ts-ignore: Property has no initializer and is not definitely assigned - private _config: WebSocketSubjectConfig<T>; - - /** @internal */ - // @ts-ignore: Property has no initializer and is not definitely assigned - _output: Subject<T>; - - private _socket: WebSocket | null = null; - - constructor(urlConfigOrSource: string | WebSocketSubjectConfig<T> | Observable<T>, destination?: Observer<T>) { - super(); - if (urlConfigOrSource instanceof Observable) { - this.destination = destination; - this.source = urlConfigOrSource as Observable<T>; - } else { - const config = (this._config = { ...DEFAULT_WEBSOCKET_CONFIG }); - this._output = new Subject<T>(); - if (typeof urlConfigOrSource === 'string') { - config.url = urlConfigOrSource; - } else { - for (const key in urlConfigOrSource) { - if (urlConfigOrSource.hasOwnProperty(key)) { - (config as any)[key] = (urlConfigOrSource as any)[key]; - } - } - } - - if (!config.WebSocketCtor && WebSocket) { - config.WebSocketCtor = WebSocket; - } else if (!config.WebSocketCtor) { - throw new Error('no WebSocket constructor can be found'); - } - this.destination = new ReplaySubject(); - } - } - - /** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */ - lift<R>(operator: Operator<T, R>): WebSocketSubject<R> { - const sock = new WebSocketSubject<R>(this._config as WebSocketSubjectConfig<any>, this.destination as any); - sock.operator = operator; - sock.source = this; - return sock; - } - - private _resetState() { - this._socket = null; - if (!this.source) { - this.destination = new ReplaySubject(); - } - this._output = new Subject<T>(); - } - - /** - * Creates an {@link Observable}, that when subscribed to, sends a message, - * defined by the `subMsg` function, to the server over the socket to begin a - * subscription to data over that socket. Once data arrives, the - * `messageFilter` argument will be used to select the appropriate data for - * the resulting Observable. When finalization occurs, either due to - * unsubscription, completion, or error, a message defined by the `unsubMsg` - * argument will be sent to the server over the WebSocketSubject. - * - * @param subMsg A function to generate the subscription message to be sent to - * the server. This will still be processed by the serializer in the - * WebSocketSubject's config. (Which defaults to JSON serialization) - * @param unsubMsg A function to generate the unsubscription message to be - * sent to the server at finalization. This will still be processed by the - * serializer in the WebSocketSubject's config. - * @param messageFilter A predicate for selecting the appropriate messages - * from the server for the output stream. - */ - multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) { - const self = this; - return new Observable((observer: Observer<T>) => { - try { - self.next(subMsg()); - } catch (err) { - observer.error(err); - } - - const subscription = self.subscribe({ - next: (x) => { - try { - if (messageFilter(x)) { - observer.next(x); - } - } catch (err) { - observer.error(err); - } - }, - error: (err) => observer.error(err), - complete: () => observer.complete(), - }); - - return () => { - try { - self.next(unsubMsg()); - } catch (err) { - observer.error(err); - } - subscription.unsubscribe(); - }; - }); - } - - private _connectSocket() { - const { WebSocketCtor, protocol, url, binaryType } = this._config; - const observer = this._output; - - let socket: WebSocket | null = null; - try { - socket = protocol ? new WebSocketCtor!(url, protocol) : new WebSocketCtor!(url); - this._socket = socket; - if (binaryType) { - this._socket.binaryType = binaryType; - } - } catch (e) { - observer.error(e); - return; - } - - const subscription = new Subscription(() => { - this._socket = null; - if (socket && socket.readyState === 1) { - socket.close(); - } - }); - - socket.onopen = (evt: Event) => { - const { _socket } = this; - if (!_socket) { - socket!.close(); - this._resetState(); - return; - } - const { openObserver } = this._config; - if (openObserver) { - openObserver.next(evt); - } - - const queue = this.destination; - - this.destination = Subscriber.create<T>( - (x) => { - if (socket!.readyState === 1) { - try { - const { serializer } = this._config; - socket!.send(serializer!(x!)); - } catch (e) { - this.destination!.error(e); - } - } - }, - (err) => { - const { closingObserver } = this._config; - if (closingObserver) { - closingObserver.next(undefined); - } - if (err && err.code) { - socket!.close(err.code, err.reason); - } else { - observer.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT)); - } - this._resetState(); - }, - () => { - const { closingObserver } = this._config; - if (closingObserver) { - closingObserver.next(undefined); - } - socket!.close(); - this._resetState(); - } - ) as Subscriber<any>; - - if (queue && queue instanceof ReplaySubject) { - subscription.add((queue as ReplaySubject<T>).subscribe(this.destination)); - } - }; - - socket.onerror = (e: Event) => { - this._resetState(); - observer.error(e); - }; - - socket.onclose = (e: CloseEvent) => { - if (socket === this._socket) { - this._resetState(); - } - const { closeObserver } = this._config; - if (closeObserver) { - closeObserver.next(e); - } - if (e.wasClean) { - observer.complete(); - } else { - observer.error(e); - } - }; - - socket.onmessage = (e: MessageEvent) => { - try { - const { deserializer } = this._config; - observer.next(deserializer!(e)); - } catch (err) { - observer.error(err); - } - }; - } - - /** @internal */ - protected _subscribe(subscriber: Subscriber<T>): Subscription { - const { source } = this; - if (source) { - return source.subscribe(subscriber); - } - if (!this._socket) { - this._connectSocket(); - } - this._output.subscribe(subscriber); - subscriber.add(() => { - const { _socket } = this; - if (this._output.observers.length === 0) { - if (_socket && (_socket.readyState === 1 || _socket.readyState === 0)) { - _socket.close(); - } - this._resetState(); - } - }); - return subscriber; - } - - unsubscribe() { - const { _socket } = this; - if (_socket && (_socket.readyState === 1 || _socket.readyState === 0)) { - _socket.close(); - } - this._resetState(); - super.unsubscribe(); - } -} |
