aboutsummaryrefslogtreecommitdiffstats
path: root/node_modules/rxjs/src/internal/observable/dom
diff options
context:
space:
mode:
Diffstat (limited to 'node_modules/rxjs/src/internal/observable/dom')
-rw-r--r--node_modules/rxjs/src/internal/observable/dom/WebSocketSubject.ts397
-rw-r--r--node_modules/rxjs/src/internal/observable/dom/animationFrames.ts132
-rw-r--r--node_modules/rxjs/src/internal/observable/dom/fetch.ts180
-rw-r--r--node_modules/rxjs/src/internal/observable/dom/webSocket.ts161
4 files changed, 0 insertions, 870 deletions
diff --git a/node_modules/rxjs/src/internal/observable/dom/WebSocketSubject.ts b/node_modules/rxjs/src/internal/observable/dom/WebSocketSubject.ts
deleted file mode 100644
index 9eecbf5..0000000
--- a/node_modules/rxjs/src/internal/observable/dom/WebSocketSubject.ts
+++ /dev/null
@@ -1,397 +0,0 @@
-import { Subject, AnonymousSubject } from '../../Subject';
-import { Subscriber } from '../../Subscriber';
-import { Observable } from '../../Observable';
-import { Subscription } from '../../Subscription';
-import { Operator } from '../../Operator';
-import { ReplaySubject } from '../../ReplaySubject';
-import { Observer, NextObserver } from '../../types';
-
-/**
- * WebSocketSubjectConfig is a plain Object that allows us to make our
- * webSocket configurable.
- *
- * <span class="informal">Provides flexibility to {@link webSocket}</span>
- *
- * It defines a set of properties to provide custom behavior in specific
- * moments of the socket's lifecycle. When the connection opens we can
- * use `openObserver`, when the connection is closed `closeObserver`, if we
- * are interested in listening for data coming from server: `deserializer`,
- * which allows us to customize the deserialization strategy of data before passing it
- * to the socket client. By default, `deserializer` is going to apply `JSON.parse` to each message coming
- * from the Server.
- *
- * ## Examples
- *
- * **deserializer**, the default for this property is `JSON.parse` but since there are just two options
- * for incoming data, either be text or binary data. We can apply a custom deserialization strategy
- * or just simply skip the default behaviour.
- *
- * ```ts
- * import { webSocket } from 'rxjs/webSocket';
- *
- * const wsSubject = webSocket({
- * url: 'ws://localhost:8081',
- * //Apply any transformation of your choice.
- * deserializer: ({ data }) => data
- * });
- *
- * wsSubject.subscribe(console.log);
- *
- * // Let's suppose we have this on the Server: ws.send('This is a msg from the server')
- * //output
- * //
- * // This is a msg from the server
- * ```
- *
- * **serializer** allows us to apply custom serialization strategy but for the outgoing messages.
- *
- * ```ts
- * import { webSocket } from 'rxjs/webSocket';
- *
- * const wsSubject = webSocket({
- * url: 'ws://localhost:8081',
- * // Apply any transformation of your choice.
- * serializer: msg => JSON.stringify({ channel: 'webDevelopment', msg: msg })
- * });
- *
- * wsSubject.subscribe(() => subject.next('msg to the server'));
- *
- * // Let's suppose we have this on the Server:
- * // ws.on('message', msg => console.log);
- * // ws.send('This is a msg from the server');
- * // output at server side:
- * //
- * // {"channel":"webDevelopment","msg":"msg to the server"}
- * ```
- *
- * **closeObserver** allows us to set a custom error when an error raises up.
- *
- * ```ts
- * import { webSocket } from 'rxjs/webSocket';
- *
- * const wsSubject = webSocket({
- * url: 'ws://localhost:8081',
- * closeObserver: {
- * next() {
- * const customError = { code: 6666, reason: 'Custom evil reason' }
- * console.log(`code: ${ customError.code }, reason: ${ customError.reason }`);
- * }
- * }
- * });
- *
- * // output
- * // code: 6666, reason: Custom evil reason
- * ```
- *
- * **openObserver**, Let's say we need to make some kind of init task before sending/receiving msgs to the
- * webSocket or sending notification that the connection was successful, this is when
- * openObserver is useful for.
- *
- * ```ts
- * import { webSocket } from 'rxjs/webSocket';
- *
- * const wsSubject = webSocket({
- * url: 'ws://localhost:8081',
- * openObserver: {
- * next: () => {
- * console.log('Connection ok');
- * }
- * }
- * });
- *
- * // output
- * // Connection ok
- * ```
- */
-export interface WebSocketSubjectConfig<T> {
- /** The url of the socket server to connect to */
- url: string;
- /** The protocol to use to connect */
- protocol?: string | Array<string>;
- /** @deprecated Will be removed in v8. Use {@link deserializer} instead. */
- resultSelector?: (e: MessageEvent) => T;
- /**
- * A serializer used to create messages from passed values before the
- * messages are sent to the server. Defaults to JSON.stringify.
- */
- serializer?: (value: T) => WebSocketMessage;
- /**
- * A deserializer used for messages arriving on the socket from the
- * server. Defaults to JSON.parse.
- */
- deserializer?: (e: MessageEvent) => T;
- /**
- * An Observer that watches when open events occur on the underlying web socket.
- */
- openObserver?: NextObserver<Event>;
- /**
- * An Observer that watches when close events occur on the underlying web socket
- */
- closeObserver?: NextObserver<CloseEvent>;
- /**
- * An Observer that watches when a close is about to occur due to
- * unsubscription.
- */
- closingObserver?: NextObserver<void>;
- /**
- * A WebSocket constructor to use. This is useful for situations like using a
- * WebSocket impl in Node (WebSocket is a DOM API), or for mocking a WebSocket
- * for testing purposes
- */
- WebSocketCtor?: { new (url: string, protocols?: string | string[]): WebSocket };
- /** Sets the `binaryType` property of the underlying WebSocket. */
- binaryType?: 'blob' | 'arraybuffer';
-}
-
-const DEFAULT_WEBSOCKET_CONFIG: WebSocketSubjectConfig<any> = {
- url: '',
- deserializer: (e: MessageEvent) => JSON.parse(e.data),
- serializer: (value: any) => JSON.stringify(value),
-};
-
-const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT =
- 'WebSocketSubject.error must be called with an object with an error code, and an optional reason: { code: number, reason: string }';
-
-export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView;
-
-export class WebSocketSubject<T> extends AnonymousSubject<T> {
- // @ts-ignore: Property has no initializer and is not definitely assigned
- private _config: WebSocketSubjectConfig<T>;
-
- /** @internal */
- // @ts-ignore: Property has no initializer and is not definitely assigned
- _output: Subject<T>;
-
- private _socket: WebSocket | null = null;
-
- constructor(urlConfigOrSource: string | WebSocketSubjectConfig<T> | Observable<T>, destination?: Observer<T>) {
- super();
- if (urlConfigOrSource instanceof Observable) {
- this.destination = destination;
- this.source = urlConfigOrSource as Observable<T>;
- } else {
- const config = (this._config = { ...DEFAULT_WEBSOCKET_CONFIG });
- this._output = new Subject<T>();
- if (typeof urlConfigOrSource === 'string') {
- config.url = urlConfigOrSource;
- } else {
- for (const key in urlConfigOrSource) {
- if (urlConfigOrSource.hasOwnProperty(key)) {
- (config as any)[key] = (urlConfigOrSource as any)[key];
- }
- }
- }
-
- if (!config.WebSocketCtor && WebSocket) {
- config.WebSocketCtor = WebSocket;
- } else if (!config.WebSocketCtor) {
- throw new Error('no WebSocket constructor can be found');
- }
- this.destination = new ReplaySubject();
- }
- }
-
- /** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
- lift<R>(operator: Operator<T, R>): WebSocketSubject<R> {
- const sock = new WebSocketSubject<R>(this._config as WebSocketSubjectConfig<any>, this.destination as any);
- sock.operator = operator;
- sock.source = this;
- return sock;
- }
-
- private _resetState() {
- this._socket = null;
- if (!this.source) {
- this.destination = new ReplaySubject();
- }
- this._output = new Subject<T>();
- }
-
- /**
- * Creates an {@link Observable}, that when subscribed to, sends a message,
- * defined by the `subMsg` function, to the server over the socket to begin a
- * subscription to data over that socket. Once data arrives, the
- * `messageFilter` argument will be used to select the appropriate data for
- * the resulting Observable. When finalization occurs, either due to
- * unsubscription, completion, or error, a message defined by the `unsubMsg`
- * argument will be sent to the server over the WebSocketSubject.
- *
- * @param subMsg A function to generate the subscription message to be sent to
- * the server. This will still be processed by the serializer in the
- * WebSocketSubject's config. (Which defaults to JSON serialization)
- * @param unsubMsg A function to generate the unsubscription message to be
- * sent to the server at finalization. This will still be processed by the
- * serializer in the WebSocketSubject's config.
- * @param messageFilter A predicate for selecting the appropriate messages
- * from the server for the output stream.
- */
- multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) {
- const self = this;
- return new Observable((observer: Observer<T>) => {
- try {
- self.next(subMsg());
- } catch (err) {
- observer.error(err);
- }
-
- const subscription = self.subscribe({
- next: (x) => {
- try {
- if (messageFilter(x)) {
- observer.next(x);
- }
- } catch (err) {
- observer.error(err);
- }
- },
- error: (err) => observer.error(err),
- complete: () => observer.complete(),
- });
-
- return () => {
- try {
- self.next(unsubMsg());
- } catch (err) {
- observer.error(err);
- }
- subscription.unsubscribe();
- };
- });
- }
-
- private _connectSocket() {
- const { WebSocketCtor, protocol, url, binaryType } = this._config;
- const observer = this._output;
-
- let socket: WebSocket | null = null;
- try {
- socket = protocol ? new WebSocketCtor!(url, protocol) : new WebSocketCtor!(url);
- this._socket = socket;
- if (binaryType) {
- this._socket.binaryType = binaryType;
- }
- } catch (e) {
- observer.error(e);
- return;
- }
-
- const subscription = new Subscription(() => {
- this._socket = null;
- if (socket && socket.readyState === 1) {
- socket.close();
- }
- });
-
- socket.onopen = (evt: Event) => {
- const { _socket } = this;
- if (!_socket) {
- socket!.close();
- this._resetState();
- return;
- }
- const { openObserver } = this._config;
- if (openObserver) {
- openObserver.next(evt);
- }
-
- const queue = this.destination;
-
- this.destination = Subscriber.create<T>(
- (x) => {
- if (socket!.readyState === 1) {
- try {
- const { serializer } = this._config;
- socket!.send(serializer!(x!));
- } catch (e) {
- this.destination!.error(e);
- }
- }
- },
- (err) => {
- const { closingObserver } = this._config;
- if (closingObserver) {
- closingObserver.next(undefined);
- }
- if (err && err.code) {
- socket!.close(err.code, err.reason);
- } else {
- observer.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT));
- }
- this._resetState();
- },
- () => {
- const { closingObserver } = this._config;
- if (closingObserver) {
- closingObserver.next(undefined);
- }
- socket!.close();
- this._resetState();
- }
- ) as Subscriber<any>;
-
- if (queue && queue instanceof ReplaySubject) {
- subscription.add((queue as ReplaySubject<T>).subscribe(this.destination));
- }
- };
-
- socket.onerror = (e: Event) => {
- this._resetState();
- observer.error(e);
- };
-
- socket.onclose = (e: CloseEvent) => {
- if (socket === this._socket) {
- this._resetState();
- }
- const { closeObserver } = this._config;
- if (closeObserver) {
- closeObserver.next(e);
- }
- if (e.wasClean) {
- observer.complete();
- } else {
- observer.error(e);
- }
- };
-
- socket.onmessage = (e: MessageEvent) => {
- try {
- const { deserializer } = this._config;
- observer.next(deserializer!(e));
- } catch (err) {
- observer.error(err);
- }
- };
- }
-
- /** @internal */
- protected _subscribe(subscriber: Subscriber<T>): Subscription {
- const { source } = this;
- if (source) {
- return source.subscribe(subscriber);
- }
- if (!this._socket) {
- this._connectSocket();
- }
- this._output.subscribe(subscriber);
- subscriber.add(() => {
- const { _socket } = this;
- if (this._output.observers.length === 0) {
- if (_socket && (_socket.readyState === 1 || _socket.readyState === 0)) {
- _socket.close();
- }
- this._resetState();
- }
- });
- return subscriber;
- }
-
- unsubscribe() {
- const { _socket } = this;
- if (_socket && (_socket.readyState === 1 || _socket.readyState === 0)) {
- _socket.close();
- }
- this._resetState();
- super.unsubscribe();
- }
-}
diff --git a/node_modules/rxjs/src/internal/observable/dom/animationFrames.ts b/node_modules/rxjs/src/internal/observable/dom/animationFrames.ts
deleted file mode 100644
index 38b338b..0000000
--- a/node_modules/rxjs/src/internal/observable/dom/animationFrames.ts
+++ /dev/null
@@ -1,132 +0,0 @@
-import { Observable } from '../../Observable';
-import { TimestampProvider } from '../../types';
-import { performanceTimestampProvider } from '../../scheduler/performanceTimestampProvider';
-import { animationFrameProvider } from '../../scheduler/animationFrameProvider';
-
-/**
- * An observable of animation frames
- *
- * Emits the amount of time elapsed since subscription and the timestamp on each animation frame.
- * Defaults to milliseconds provided to the requestAnimationFrame's callback. Does not end on its own.
- *
- * Every subscription will start a separate animation loop. Since animation frames are always scheduled
- * by the browser to occur directly before a repaint, scheduling more than one animation frame synchronously
- * should not be much different or have more overhead than looping over an array of events during
- * a single animation frame. However, if for some reason the developer would like to ensure the
- * execution of animation-related handlers are all executed during the same task by the engine,
- * the `share` operator can be used.
- *
- * This is useful for setting up animations with RxJS.
- *
- * ## Examples
- *
- * Tweening a div to move it on the screen
- *
- * ```ts
- * import { animationFrames, map, takeWhile, endWith } from 'rxjs';
- *
- * function tween(start: number, end: number, duration: number) {
- * const diff = end - start;
- * return animationFrames().pipe(
- * // Figure out what percentage of time has passed
- * map(({ elapsed }) => elapsed / duration),
- * // Take the vector while less than 100%
- * takeWhile(v => v < 1),
- * // Finish with 100%
- * endWith(1),
- * // Calculate the distance traveled between start and end
- * map(v => v * diff + start)
- * );
- * }
- *
- * // Setup a div for us to move around
- * const div = document.createElement('div');
- * document.body.appendChild(div);
- * div.style.position = 'absolute';
- * div.style.width = '40px';
- * div.style.height = '40px';
- * div.style.backgroundColor = 'lime';
- * div.style.transform = 'translate3d(10px, 0, 0)';
- *
- * tween(10, 200, 4000).subscribe(x => {
- * div.style.transform = `translate3d(${ x }px, 0, 0)`;
- * });
- * ```
- *
- * Providing a custom timestamp provider
- *
- * ```ts
- * import { animationFrames, TimestampProvider } from 'rxjs';
- *
- * // A custom timestamp provider
- * let now = 0;
- * const customTSProvider: TimestampProvider = {
- * now() { return now++; }
- * };
- *
- * const source$ = animationFrames(customTSProvider);
- *
- * // Log increasing numbers 0...1...2... on every animation frame.
- * source$.subscribe(({ elapsed }) => console.log(elapsed));
- * ```
- *
- * @param timestampProvider An object with a `now` method that provides a numeric timestamp
- */
-export function animationFrames(timestampProvider?: TimestampProvider) {
- return timestampProvider ? animationFramesFactory(timestampProvider) : DEFAULT_ANIMATION_FRAMES;
-}
-
-/**
- * Does the work of creating the observable for `animationFrames`.
- * @param timestampProvider The timestamp provider to use to create the observable
- */
-function animationFramesFactory(timestampProvider?: TimestampProvider) {
- return new Observable<{ timestamp: number; elapsed: number }>((subscriber) => {
- // If no timestamp provider is specified, use performance.now() - as it
- // will return timestamps 'compatible' with those passed to the run
- // callback and won't be affected by NTP adjustments, etc.
- const provider = timestampProvider || performanceTimestampProvider;
-
- // Capture the start time upon subscription, as the run callback can remain
- // queued for a considerable period of time and the elapsed time should
- // represent the time elapsed since subscription - not the time since the
- // first rendered animation frame.
- const start = provider.now();
-
- let id = 0;
- const run = () => {
- if (!subscriber.closed) {
- id = animationFrameProvider.requestAnimationFrame((timestamp: DOMHighResTimeStamp | number) => {
- id = 0;
- // Use the provider's timestamp to calculate the elapsed time. Note that
- // this means - if the caller hasn't passed a provider - that
- // performance.now() will be used instead of the timestamp that was
- // passed to the run callback. The reason for this is that the timestamp
- // passed to the callback can be earlier than the start time, as it
- // represents the time at which the browser decided it would render any
- // queued frames - and that time can be earlier the captured start time.
- const now = provider.now();
- subscriber.next({
- timestamp: timestampProvider ? now : timestamp,
- elapsed: now - start,
- });
- run();
- });
- }
- };
-
- run();
-
- return () => {
- if (id) {
- animationFrameProvider.cancelAnimationFrame(id);
- }
- };
- });
-}
-
-/**
- * In the common case, where the timestamp provided by the rAF API is used,
- * we use this shared observable to reduce overhead.
- */
-const DEFAULT_ANIMATION_FRAMES = animationFramesFactory();
diff --git a/node_modules/rxjs/src/internal/observable/dom/fetch.ts b/node_modules/rxjs/src/internal/observable/dom/fetch.ts
deleted file mode 100644
index 1894d24..0000000
--- a/node_modules/rxjs/src/internal/observable/dom/fetch.ts
+++ /dev/null
@@ -1,180 +0,0 @@
-import { createOperatorSubscriber } from '../../operators/OperatorSubscriber';
-import { Observable } from '../../Observable';
-import { innerFrom } from '../../observable/innerFrom';
-import { ObservableInput } from '../../types';
-
-export function fromFetch<T>(
- input: string | Request,
- init: RequestInit & {
- selector: (response: Response) => ObservableInput<T>;
- }
-): Observable<T>;
-
-export function fromFetch(input: string | Request, init?: RequestInit): Observable<Response>;
-
-/**
- * Uses [the Fetch API](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) to
- * make an HTTP request.
- *
- * **WARNING** Parts of the fetch API are still experimental. `AbortController` is
- * required for this implementation to work and use cancellation appropriately.
- *
- * Will automatically set up an internal [AbortController](https://developer.mozilla.org/en-US/docs/Web/API/AbortController)
- * in order to finalize the internal `fetch` when the subscription tears down.
- *
- * If a `signal` is provided via the `init` argument, it will behave like it usually does with
- * `fetch`. If the provided `signal` aborts, the error that `fetch` normally rejects with
- * in that scenario will be emitted as an error from the observable.
- *
- * ## Examples
- *
- * Basic use
- *
- * ```ts
- * import { fromFetch } from 'rxjs/fetch';
- * import { switchMap, of, catchError } from 'rxjs';
- *
- * const data$ = fromFetch('https://api.github.com/users?per_page=5').pipe(
- * switchMap(response => {
- * if (response.ok) {
- * // OK return data
- * return response.json();
- * } else {
- * // Server is returning a status requiring the client to try something else.
- * return of({ error: true, message: `Error ${ response.status }` });
- * }
- * }),
- * catchError(err => {
- * // Network or other error, handle appropriately
- * console.error(err);
- * return of({ error: true, message: err.message })
- * })
- * );
- *
- * data$.subscribe({
- * next: result => console.log(result),
- * complete: () => console.log('done')
- * });
- * ```
- *
- * ### Use with Chunked Transfer Encoding
- *
- * With HTTP responses that use [chunked transfer encoding](https://tools.ietf.org/html/rfc7230#section-3.3.1),
- * the promise returned by `fetch` will resolve as soon as the response's headers are
- * received.
- *
- * That means the `fromFetch` observable will emit a `Response` - and will
- * then complete - before the body is received. When one of the methods on the
- * `Response` - like `text()` or `json()` - is called, the returned promise will not
- * resolve until the entire body has been received. Unsubscribing from any observable
- * that uses the promise as an observable input will not abort the request.
- *
- * To facilitate aborting the retrieval of responses that use chunked transfer encoding,
- * a `selector` can be specified via the `init` parameter:
- *
- * ```ts
- * import { of } from 'rxjs';
- * import { fromFetch } from 'rxjs/fetch';
- *
- * const data$ = fromFetch('https://api.github.com/users?per_page=5', {
- * selector: response => response.json()
- * });
- *
- * data$.subscribe({
- * next: result => console.log(result),
- * complete: () => console.log('done')
- * });
- * ```
- *
- * @param input The resource you would like to fetch. Can be a url or a request object.
- * @param initWithSelector A configuration object for the fetch.
- * [See MDN for more details](https://developer.mozilla.org/en-US/docs/Web/API/WindowOrWorkerGlobalScope/fetch#Parameters)
- * @returns An Observable, that when subscribed to, performs an HTTP request using the native `fetch`
- * function. The {@link Subscription} is tied to an `AbortController` for the fetch.
- */
-export function fromFetch<T>(
- input: string | Request,
- initWithSelector: RequestInit & {
- selector?: (response: Response) => ObservableInput<T>;
- } = {}
-): Observable<Response | T> {
- const { selector, ...init } = initWithSelector;
- return new Observable<Response | T>((subscriber) => {
- // Our controller for aborting this fetch.
- // Any externally provided AbortSignal will have to call
- // abort on this controller when signaled, because the
- // signal from this controller is what is being passed to `fetch`.
- const controller = new AbortController();
- const { signal } = controller;
- // This flag exists to make sure we don't `abort()` the fetch upon tearing down
- // this observable after emitting a Response. Aborting in such circumstances
- // would also abort subsequent methods - like `json()` - that could be called
- // on the Response. Consider: `fromFetch().pipe(take(1), mergeMap(res => res.json()))`
- let abortable = true;
-
- // If the user provided an init configuration object,
- // let's process it and chain our abort signals, if necessary.
- // If a signal is provided, just have it finalized. It's a cancellation token, basically.
- const { signal: outerSignal } = init;
- if (outerSignal) {
- if (outerSignal.aborted) {
- controller.abort();
- } else {
- // We got an AbortSignal from the arguments passed into `fromFetch`.
- // We need to wire up our AbortController to abort when this signal aborts.
- const outerSignalHandler = () => {
- if (!signal.aborted) {
- controller.abort();
- }
- };
- outerSignal.addEventListener('abort', outerSignalHandler);
- subscriber.add(() => outerSignal.removeEventListener('abort', outerSignalHandler));
- }
- }
-
- // The initialization object passed to `fetch` as the second
- // argument. This ferries in important information, including our
- // AbortSignal. Create a new init, so we don't accidentally mutate the
- // passed init, or reassign it. This is because the init passed in
- // is shared between each subscription to the result.
- const perSubscriberInit: RequestInit = { ...init, signal };
-
- const handleError = (err: any) => {
- abortable = false;
- subscriber.error(err);
- };
-
- fetch(input, perSubscriberInit)
- .then((response) => {
- if (selector) {
- // If we have a selector function, use it to project our response.
- // Note that any error that comes from our selector will be
- // sent to the promise `catch` below and handled.
- innerFrom(selector(response)).subscribe(
- createOperatorSubscriber(
- subscriber,
- // Values are passed through to the subscriber
- undefined,
- // The projected response is complete.
- () => {
- abortable = false;
- subscriber.complete();
- },
- handleError
- )
- );
- } else {
- abortable = false;
- subscriber.next(response);
- subscriber.complete();
- }
- })
- .catch(handleError);
-
- return () => {
- if (abortable) {
- controller.abort();
- }
- };
- });
-}
diff --git a/node_modules/rxjs/src/internal/observable/dom/webSocket.ts b/node_modules/rxjs/src/internal/observable/dom/webSocket.ts
deleted file mode 100644
index b10c5d8..0000000
--- a/node_modules/rxjs/src/internal/observable/dom/webSocket.ts
+++ /dev/null
@@ -1,161 +0,0 @@
-import { WebSocketSubject, WebSocketSubjectConfig } from './WebSocketSubject';
-
-/**
- * Wrapper around the w3c-compatible WebSocket object provided by the browser.
- *
- * <span class="informal">{@link Subject} that communicates with a server via WebSocket</span>
- *
- * `webSocket` is a factory function that produces a `WebSocketSubject`,
- * which can be used to make WebSocket connection with an arbitrary endpoint.
- * `webSocket` accepts as an argument either a string with url of WebSocket endpoint, or an
- * {@link WebSocketSubjectConfig} object for providing additional configuration, as
- * well as Observers for tracking lifecycle of WebSocket connection.
- *
- * When `WebSocketSubject` is subscribed, it attempts to make a socket connection,
- * unless there is one made already. This means that many subscribers will always listen
- * on the same socket, thus saving resources. If however, two instances are made of `WebSocketSubject`,
- * even if these two were provided with the same url, they will attempt to make separate
- * connections. When consumer of a `WebSocketSubject` unsubscribes, socket connection is closed,
- * only if there are no more subscribers still listening. If after some time a consumer starts
- * subscribing again, connection is reestablished.
- *
- * Once connection is made, whenever a new message comes from the server, `WebSocketSubject` will emit that
- * message as a value in the stream. By default, a message from the socket is parsed via `JSON.parse`. If you
- * want to customize how deserialization is handled (if at all), you can provide custom `resultSelector`
- * function in {@link WebSocketSubject}. When connection closes, stream will complete, provided it happened without
- * any errors. If at any point (starting, maintaining or closing a connection) there is an error,
- * stream will also error with whatever WebSocket API has thrown.
- *
- * By virtue of being a {@link Subject}, `WebSocketSubject` allows for receiving and sending messages from the server. In order
- * to communicate with a connected endpoint, use `next`, `error` and `complete` methods. `next` sends a value to the server, so bear in mind
- * that this value will not be serialized beforehand. Because of This, `JSON.stringify` will have to be called on a value by hand,
- * before calling `next` with a result. Note also that if at the moment of nexting value
- * there is no socket connection (for example no one is subscribing), those values will be buffered, and sent when connection
- * is finally established. `complete` method closes socket connection. `error` does the same,
- * as well as notifying the server that something went wrong via status code and string with details of what happened.
- * Since status code is required in WebSocket API, `WebSocketSubject` does not allow, like regular `Subject`,
- * arbitrary values being passed to the `error` method. It needs to be called with an object that has `code`
- * property with status code number and optional `reason` property with string describing details
- * of an error.
- *
- * Calling `next` does not affect subscribers of `WebSocketSubject` - they have no
- * information that something was sent to the server (unless of course the server
- * responds somehow to a message). On the other hand, since calling `complete` triggers
- * an attempt to close socket connection. If that connection is closed without any errors, stream will
- * complete, thus notifying all subscribers. And since calling `error` closes
- * socket connection as well, just with a different status code for the server, if closing itself proceeds
- * without errors, subscribed Observable will not error, as one might expect, but complete as usual. In both cases
- * (calling `complete` or `error`), if process of closing socket connection results in some errors, *then* stream
- * will error.
- *
- * **Multiplexing**
- *
- * `WebSocketSubject` has an additional operator, not found in other Subjects. It is called `multiplex` and it is
- * used to simulate opening several socket connections, while in reality maintaining only one.
- * For example, an application has both chat panel and real-time notifications about sport news. Since these are two distinct functions,
- * it would make sense to have two separate connections for each. Perhaps there could even be two separate services with WebSocket
- * endpoints, running on separate machines with only GUI combining them together. Having a socket connection
- * for each functionality could become too resource expensive. It is a common pattern to have single
- * WebSocket endpoint that acts as a gateway for the other services (in this case chat and sport news services).
- * Even though there is a single connection in a client app, having the ability to manipulate streams as if it
- * were two separate sockets is desirable. This eliminates manually registering and unregistering in a gateway for
- * given service and filter out messages of interest. This is exactly what `multiplex` method is for.
- *
- * Method accepts three parameters. First two are functions returning subscription and unsubscription messages
- * respectively. These are messages that will be sent to the server, whenever consumer of resulting Observable
- * subscribes and unsubscribes. Server can use them to verify that some kind of messages should start or stop
- * being forwarded to the client. In case of the above example application, after getting subscription message with proper identifier,
- * gateway server can decide that it should connect to real sport news service and start forwarding messages from it.
- * Note that both messages will be sent as returned by the functions, they are by default serialized using JSON.stringify, just
- * as messages pushed via `next`. Also bear in mind that these messages will be sent on *every* subscription and
- * unsubscription. This is potentially dangerous, because one consumer of an Observable may unsubscribe and the server
- * might stop sending messages, since it got unsubscription message. This needs to be handled
- * on the server or using {@link publish} on a Observable returned from 'multiplex'.
- *
- * Last argument to `multiplex` is a `messageFilter` function which should return a boolean. It is used to filter out messages
- * sent by the server to only those that belong to simulated WebSocket stream. For example, server might mark these
- * messages with some kind of string identifier on a message object and `messageFilter` would return `true`
- * if there is such identifier on an object emitted by the socket. Messages which returns `false` in `messageFilter` are simply skipped,
- * and are not passed down the stream.
- *
- * Return value of `multiplex` is an Observable with messages incoming from emulated socket connection. Note that this
- * is not a `WebSocketSubject`, so calling `next` or `multiplex` again will fail. For pushing values to the
- * server, use root `WebSocketSubject`.
- *
- * ## Examples
- *
- * Listening for messages from the server
- *
- * ```ts
- * import { webSocket } from 'rxjs/webSocket';
- *
- * const subject = webSocket('ws://localhost:8081');
- *
- * subject.subscribe({
- * next: msg => console.log('message received: ' + msg), // Called whenever there is a message from the server.
- * error: err => console.log(err), // Called if at any point WebSocket API signals some kind of error.
- * complete: () => console.log('complete') // Called when connection is closed (for whatever reason).
- * });
- * ```
- *
- * Pushing messages to the server
- *
- * ```ts
- * import { webSocket } from 'rxjs/webSocket';
- *
- * const subject = webSocket('ws://localhost:8081');
- *
- * subject.subscribe();
- * // Note that at least one consumer has to subscribe to the created subject - otherwise "nexted" values will be just buffered and not sent,
- * // since no connection was established!
- *
- * subject.next({ message: 'some message' });
- * // This will send a message to the server once a connection is made. Remember value is serialized with JSON.stringify by default!
- *
- * subject.complete(); // Closes the connection.
- *
- * subject.error({ code: 4000, reason: 'I think our app just broke!' });
- * // Also closes the connection, but let's the server know that this closing is caused by some error.
- * ```
- *
- * Multiplexing WebSocket
- *
- * ```ts
- * import { webSocket } from 'rxjs/webSocket';
- *
- * const subject = webSocket('ws://localhost:8081');
- *
- * const observableA = subject.multiplex(
- * () => ({ subscribe: 'A' }), // When server gets this message, it will start sending messages for 'A'...
- * () => ({ unsubscribe: 'A' }), // ...and when gets this one, it will stop.
- * message => message.type === 'A' // If the function returns `true` message is passed down the stream. Skipped if the function returns false.
- * );
- *
- * const observableB = subject.multiplex( // And the same goes for 'B'.
- * () => ({ subscribe: 'B' }),
- * () => ({ unsubscribe: 'B' }),
- * message => message.type === 'B'
- * );
- *
- * const subA = observableA.subscribe(messageForA => console.log(messageForA));
- * // At this moment WebSocket connection is established. Server gets '{"subscribe": "A"}' message and starts sending messages for 'A',
- * // which we log here.
- *
- * const subB = observableB.subscribe(messageForB => console.log(messageForB));
- * // Since we already have a connection, we just send '{"subscribe": "B"}' message to the server. It starts sending messages for 'B',
- * // which we log here.
- *
- * subB.unsubscribe();
- * // Message '{"unsubscribe": "B"}' is sent to the server, which stops sending 'B' messages.
- *
- * subA.unsubscribe();
- * // Message '{"unsubscribe": "A"}' makes the server stop sending messages for 'A'. Since there is no more subscribers to root Subject,
- * // socket connection closes.
- * ```
- *
- * @param urlConfigOrSource The WebSocket endpoint as an url or an object with configuration and additional Observers.
- * @return Subject which allows to both send and receive messages via WebSocket connection.
- */
-export function webSocket<T>(urlConfigOrSource: string | WebSocketSubjectConfig<T>): WebSocketSubject<T> {
- return new WebSocketSubject<T>(urlConfigOrSource);
-}
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage