aboutsummaryrefslogtreecommitdiffstats
path: root/node_modules/rxjs/src/internal/observable/ConnectableObservable.ts
diff options
context:
space:
mode:
Diffstat (limited to 'node_modules/rxjs/src/internal/observable/ConnectableObservable.ts')
-rw-r--r--node_modules/rxjs/src/internal/observable/ConnectableObservable.ts104
1 files changed, 104 insertions, 0 deletions
diff --git a/node_modules/rxjs/src/internal/observable/ConnectableObservable.ts b/node_modules/rxjs/src/internal/observable/ConnectableObservable.ts
new file mode 100644
index 0000000..bd1c76f
--- /dev/null
+++ b/node_modules/rxjs/src/internal/observable/ConnectableObservable.ts
@@ -0,0 +1,104 @@
+import { Subject } from '../Subject';
+import { Observable } from '../Observable';
+import { Subscriber } from '../Subscriber';
+import { Subscription } from '../Subscription';
+import { refCount as higherOrderRefCount } from '../operators/refCount';
+import { createOperatorSubscriber } from '../operators/OperatorSubscriber';
+import { hasLift } from '../util/lift';
+
+/**
+ * @class ConnectableObservable<T>
+ * @deprecated Will be removed in v8. Use {@link connectable} to create a connectable observable.
+ * If you are using the `refCount` method of `ConnectableObservable`, use the {@link share} operator
+ * instead.
+ * Details: https://rxjs.dev/deprecations/multicasting
+ */
+export class ConnectableObservable<T> extends Observable<T> {
+ protected _subject: Subject<T> | null = null;
+ protected _refCount: number = 0;
+ protected _connection: Subscription | null = null;
+
+ /**
+ * @param source The source observable
+ * @param subjectFactory The factory that creates the subject used internally.
+ * @deprecated Will be removed in v8. Use {@link connectable} to create a connectable observable.
+ * `new ConnectableObservable(source, factory)` is equivalent to
+ * `connectable(source, { connector: factory })`.
+ * When the `refCount()` method is needed, the {@link share} operator should be used instead:
+ * `new ConnectableObservable(source, factory).refCount()` is equivalent to
+ * `source.pipe(share({ connector: factory }))`.
+ * Details: https://rxjs.dev/deprecations/multicasting
+ */
+ constructor(public source: Observable<T>, protected subjectFactory: () => Subject<T>) {
+ super();
+ // If we have lift, monkey patch that here. This is done so custom observable
+ // types will compose through multicast. Otherwise the resulting observable would
+ // simply be an instance of `ConnectableObservable`.
+ if (hasLift(source)) {
+ this.lift = source.lift;
+ }
+ }
+
+ /** @internal */
+ protected _subscribe(subscriber: Subscriber<T>) {
+ return this.getSubject().subscribe(subscriber);
+ }
+
+ protected getSubject(): Subject<T> {
+ const subject = this._subject;
+ if (!subject || subject.isStopped) {
+ this._subject = this.subjectFactory();
+ }
+ return this._subject!;
+ }
+
+ protected _teardown() {
+ this._refCount = 0;
+ const { _connection } = this;
+ this._subject = this._connection = null;
+ _connection?.unsubscribe();
+ }
+
+ /**
+ * @deprecated {@link ConnectableObservable} will be removed in v8. Use {@link connectable} instead.
+ * Details: https://rxjs.dev/deprecations/multicasting
+ */
+ connect(): Subscription {
+ let connection = this._connection;
+ if (!connection) {
+ connection = this._connection = new Subscription();
+ const subject = this.getSubject();
+ connection.add(
+ this.source.subscribe(
+ createOperatorSubscriber(
+ subject as any,
+ undefined,
+ () => {
+ this._teardown();
+ subject.complete();
+ },
+ (err) => {
+ this._teardown();
+ subject.error(err);
+ },
+ () => this._teardown()
+ )
+ )
+ );
+
+ if (connection.closed) {
+ this._connection = null;
+ connection = Subscription.EMPTY;
+ }
+ }
+ return connection;
+ }
+
+ /**
+ * @deprecated {@link ConnectableObservable} will be removed in v8. Use the {@link share} operator instead.
+ * Details: https://rxjs.dev/deprecations/multicasting
+ */
+ refCount(): Observable<T> {
+ return higherOrderRefCount()(this) as Observable<T>;
+ }
+}
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage