diff options
Diffstat (limited to 'node_modules/rxjs/src/internal/observable/connectable.ts')
| -rw-r--r-- | node_modules/rxjs/src/internal/observable/connectable.ts | 64 |
1 files changed, 64 insertions, 0 deletions
diff --git a/node_modules/rxjs/src/internal/observable/connectable.ts b/node_modules/rxjs/src/internal/observable/connectable.ts new file mode 100644 index 0000000..4609118 --- /dev/null +++ b/node_modules/rxjs/src/internal/observable/connectable.ts @@ -0,0 +1,64 @@ +import { Connectable, ObservableInput, SubjectLike } from '../types'; +import { Subject } from '../Subject'; +import { Subscription } from '../Subscription'; +import { Observable } from '../Observable'; +import { defer } from './defer'; + +export interface ConnectableConfig<T> { + /** + * A factory function used to create the Subject through which the source + * is multicast. By default this creates a {@link Subject}. + */ + connector: () => SubjectLike<T>; + /** + * If true, the resulting observable will reset internal state upon disconnection + * and return to a "cold" state. This allows the resulting observable to be + * reconnected. + * If false, upon disconnection, the connecting subject will remain the + * connecting subject, meaning the resulting observable will not go "cold" again, + * and subsequent repeats or resubscriptions will resubscribe to that same subject. + */ + resetOnDisconnect?: boolean; +} + +/** + * The default configuration for `connectable`. + */ +const DEFAULT_CONFIG: ConnectableConfig<unknown> = { + connector: () => new Subject<unknown>(), + resetOnDisconnect: true, +}; + +/** + * Creates an observable that multicasts once `connect()` is called on it. + * + * @param source The observable source to make connectable. + * @param config The configuration object for `connectable`. + * @returns A "connectable" observable, that has a `connect()` method, that you must call to + * connect the source to all consumers through the subject provided as the connector. + */ +export function connectable<T>(source: ObservableInput<T>, config: ConnectableConfig<T> = DEFAULT_CONFIG): Connectable<T> { + // The subscription representing the connection. + let connection: Subscription | null = null; + const { connector, resetOnDisconnect = true } = config; + let subject = connector(); + + const result: any = new Observable<T>((subscriber) => { + return subject.subscribe(subscriber); + }); + + // Define the `connect` function. This is what users must call + // in order to "connect" the source to the subject that is + // multicasting it. + result.connect = () => { + if (!connection || connection.closed) { + connection = defer(() => source).subscribe(subject); + if (resetOnDisconnect) { + connection.add(() => (subject = connector())); + } + } + return connection; + }; + + return result; +} |
