import {
  Observable,
  ObservableInput,
  Subject,
  SubjectLike,
  Subscribable,
  Subscriber,
  Subscription,
  TeardownLogic,
  from,
  noop,
  of,
  switchMap,
} from 'rxjs';

import { createInterest$, indicateInterest } from './distributed-interest.js';

export function createDistributedObservable$<T>(
  resourceId: string,
  subscribe: (
    subscriber: Subscriber<T>,
    subscriptionInterest?: Observable<string>,
    currentValue?: T
  ) => TeardownLogic,
  options?: {
    onLockReleased?: (r?: unknown) => ObservableInput<unknown>;
    skipLeaderBid?: boolean;
  }
) {
  const { onLockReleased, skipLeaderBid } = options || {};

  const requiresSubInterest = subscribe.length > 1;
  const instance$ = new Observable<T>((o) => {
    let currentValue: T | undefined;

    // changing these would break compatibility between lib versions running on
    // the same environment
    const interestChannelName = `${resourceId}`;
    const nextChannelName = `${resourceId}:x`;
    const completeChannelName = `${resourceId}:c`;
    const errorChannelName = `${resourceId}:e`;

    let lockAcquired = false;
    let releaseLock: (value: unknown) => void = noop;
    let rejectLock: (reason?: unknown) => void;
    let active$: Observable<T>;
    const activeSubscription = new Subscription();

    const nextChannel = new BroadcastChannel(nextChannelName);
    const completeChannel = new BroadcastChannel(completeChannelName);
    const errorChannel = new BroadcastChannel(errorChannelName);

    const controller = new AbortController();

    if (!navigator.locks) {
      // eslint-disable-next-line no-console
      console.warn(
        'Insecure context,',
        `unable to run distributed observable ${resourceId}.`,
        'Halting.'
      );
      return;
    }

    nextChannel.onmessage = (event: MessageEvent<T>) => {
      const { data } = event;
      currentValue = data;
      o.next(data);
    };

    completeChannel.onmessage = () => {
      o.complete();
    };

    errorChannel.onmessage = (event: MessageEvent<unknown>) => {
      o.error(event.data);
    };

    if (!skipLeaderBid) {
      const lockManagement: Promise<unknown> = navigator.locks.request(
        resourceId,
        { signal: controller.signal },
        () => {
          const release = new Promise((r, x) => {
            releaseLock = r;
            rejectLock = x;
          });

          if (requiresSubInterest) {
            const subscriptionInterest = createInterest$(interestChannelName);
            const ss: (s: Subscriber<T>) => TeardownLogic = (o) =>
              subscribe(o, subscriptionInterest, currentValue);
            active$ = new Observable(ss);
          } else {
            active$ = new Observable(subscribe);
          }

          activeSubscription.add(
            active$.subscribe({
              next: (n) => {
                nextChannel.postMessage(n);
                currentValue = n;
                o.next(n);
              },

              error: (e) => {
                errorChannel.postMessage(e);
                rejectLock(e);
                o.error(e);
              },

              complete: () => {
                completeChannel.postMessage(true);
                releaseLock(`Internal release lock for ${resourceId}`);
                o.complete();
              },
            })
          );

          lockAcquired = true;

          return release;
        }
      );

      from(lockManagement)
        .pipe(switchMap(onLockReleased || of))
        .subscribe({ error: noop });
    }

    if (requiresSubInterest) {
      indicateInterest(interestChannelName, 'internal');
    }

    return () => {
      if (!lockAcquired) controller.abort('Abandon lock request');
      activeSubscription.unsubscribe();
      nextChannel.close();
      errorChannel.close();
      completeChannel.close();
      releaseLock(`Lock released ${resourceId}`);
    };
  });

  return instance$;
}

export function createDistributedSubject$<T>(
  resourceId: string
): SubjectLike<T> & Subscribable<T> & { pipe: Observable<T>['pipe'] } {
  const localSubject$ = new Subject<T>();
  const broadcastChannel = new BroadcastChannel(resourceId);

  localSubject$.subscribe({
    next: (n) => {
      try {
        broadcastChannel.postMessage(n);
      } catch (error) {
        localSubject$.error(error);
      }
    },
  });

  const base$ = new Observable<T>((observer) => {
    broadcastChannel.onmessage = (event: MessageEvent<T>) => {
      observer.next(event.data);
    };

    const subscription = localSubject$.subscribe({
      next: (value) => {
        observer.next(value);
      },
      error: observer.error,
      complete: observer.complete,
    });

    return () => {
      subscription.unsubscribe();
      broadcastChannel.onmessage = noop;
    };
  });

  const subjectMethods = {
    next: localSubject$.next.bind(localSubject$),
    error: localSubject$.error.bind(localSubject$),
    complete: localSubject$.complete.bind(localSubject$),
  };

  return Object.assign(base$, subjectMethods);
}
