aboutsummaryrefslogtreecommitdiffstats
path: root/node_modules/rxjs/src/internal/observable/bindCallbackInternals.ts
diff options
context:
space:
mode:
authorPinapelz <yukais@pinapelz.com>2025-06-28 17:26:46 -0700
committerPinapelz <yukais@pinapelz.com>2025-06-28 17:43:56 -0700
commite4fa1e69e7ebfb627c7198fd1a9881e9327ec4d4 (patch)
tree06284a538a6008eca75051399e47db4e5d50301c /node_modules/rxjs/src/internal/observable/bindCallbackInternals.ts
initial commit: scaffolding
Diffstat (limited to 'node_modules/rxjs/src/internal/observable/bindCallbackInternals.ts')
-rw-r--r--node_modules/rxjs/src/internal/observable/bindCallbackInternals.ts119
1 files changed, 119 insertions, 0 deletions
diff --git a/node_modules/rxjs/src/internal/observable/bindCallbackInternals.ts b/node_modules/rxjs/src/internal/observable/bindCallbackInternals.ts
new file mode 100644
index 0000000..e614044
--- /dev/null
+++ b/node_modules/rxjs/src/internal/observable/bindCallbackInternals.ts
@@ -0,0 +1,119 @@
+import { SchedulerLike } from '../types';
+import { isScheduler } from '../util/isScheduler';
+import { Observable } from '../Observable';
+import { subscribeOn } from '../operators/subscribeOn';
+import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs';
+import { observeOn } from '../operators/observeOn';
+import { AsyncSubject } from '../AsyncSubject';
+
+export function bindCallbackInternals(
+ isNodeStyle: boolean,
+ callbackFunc: any,
+ resultSelector?: any,
+ scheduler?: SchedulerLike
+): (...args: any[]) => Observable<unknown> {
+ if (resultSelector) {
+ if (isScheduler(resultSelector)) {
+ scheduler = resultSelector;
+ } else {
+ // The user provided a result selector.
+ return function (this: any, ...args: any[]) {
+ return (bindCallbackInternals(isNodeStyle, callbackFunc, scheduler) as any)
+ .apply(this, args)
+ .pipe(mapOneOrManyArgs(resultSelector as any));
+ };
+ }
+ }
+
+ // If a scheduler was passed, use our `subscribeOn` and `observeOn` operators
+ // to compose that behavior for the user.
+ if (scheduler) {
+ return function (this: any, ...args: any[]) {
+ return (bindCallbackInternals(isNodeStyle, callbackFunc) as any)
+ .apply(this, args)
+ .pipe(subscribeOn(scheduler!), observeOn(scheduler!));
+ };
+ }
+
+ return function (this: any, ...args: any[]): Observable<any> {
+ // We're using AsyncSubject, because it emits when it completes,
+ // and it will play the value to all late-arriving subscribers.
+ const subject = new AsyncSubject<any>();
+
+ // If this is true, then we haven't called our function yet.
+ let uninitialized = true;
+ return new Observable((subscriber) => {
+ // Add our subscriber to the subject.
+ const subs = subject.subscribe(subscriber);
+
+ if (uninitialized) {
+ uninitialized = false;
+ // We're going to execute the bound function
+ // This bit is to signal that we are hitting the callback asynchronously.
+ // Because we don't have any anti-"Zalgo" guarantees with whatever
+ // function we are handed, we use this bit to figure out whether or not
+ // we are getting hit in a callback synchronously during our call.
+ let isAsync = false;
+
+ // This is used to signal that the callback completed synchronously.
+ let isComplete = false;
+
+ // Call our function that has a callback. If at any time during this
+ // call, an error is thrown, it will be caught by the Observable
+ // subscription process and sent to the consumer.
+ callbackFunc.apply(
+ // Pass the appropriate `this` context.
+ this,
+ [
+ // Pass the arguments.
+ ...args,
+ // And our callback handler.
+ (...results: any[]) => {
+ if (isNodeStyle) {
+ // If this is a node callback, shift the first value off of the
+ // results and check it, as it is the error argument. By shifting,
+ // we leave only the argument(s) we want to pass to the consumer.
+ const err = results.shift();
+ if (err != null) {
+ subject.error(err);
+ // If we've errored, we can stop processing this function
+ // as there's nothing else to do. Just return to escape.
+ return;
+ }
+ }
+ // If we have one argument, notify the consumer
+ // of it as a single value, otherwise, if there's more than one, pass
+ // them as an array. Note that if there are no arguments, `undefined`
+ // will be emitted.
+ subject.next(1 < results.length ? results : results[0]);
+ // Flip this flag, so we know we can complete it in the synchronous
+ // case below.
+ isComplete = true;
+ // If we're not asynchronous, we need to defer the `complete` call
+ // until after the call to the function is over. This is because an
+ // error could be thrown in the function after it calls our callback,
+ // and if that is the case, if we complete here, we are unable to notify
+ // the consumer than an error occurred.
+ if (isAsync) {
+ subject.complete();
+ }
+ },
+ ]
+ );
+ // If we flipped `isComplete` during the call, we resolved synchronously,
+ // notify complete, because we skipped it in the callback to wait
+ // to make sure there were no errors during the call.
+ if (isComplete) {
+ subject.complete();
+ }
+
+ // We're no longer synchronous. If the callback is called at this point
+ // we can notify complete on the spot.
+ isAsync = true;
+ }
+
+ // Return the subscription from adding our subscriber to the subject.
+ return subs;
+ });
+ };
+}
send patches to the email below
yukais@pinapelz.com
include the subject [PATCH repo_name]
pinapelz.com
homepage