import { ulid } from "ulid";
import { createContext, useContext, useEffect, useRef } from "react";
import { Mutex } from "async-mutex";
import { createSharedRecordsAccessor } from "./concurrency";
import {
  WebSocketConnectionStatuses,
  WebSocketImplementation,
} from "./use_websocket";
import { adaptiveSubscribe, useWsSubscribe } from "./use_pubsub";
import { IWebSocketAdapter } from "./common";

const PUBLISHED_TOPIC = "published";

export type WebSocketEventProviderProps = {
  children: React.ReactNode;
  implementation: WebSocketImplementation;
  connectionStatus: WebSocketConnectionStatuses;
  userId?: string;
  connectionId?: string;
  wsAdapter: IWebSocketAdapter;
};

export type SubscriptionCallback = (payload: unknown) => void;

export type WebSocketEventProviderContextProps = {
  subscribe: (topic: string, key: string, cb: SubscriptionCallback) => void;
  unsubscribe: (topic: string, key: string) => void;
  implementation: WebSocketImplementation;
  connectionStatus: WebSocketConnectionStatuses;
};

const WebSocketEventProviderContext =
  createContext<WebSocketEventProviderContextProps>({
    subscribe: () => {},
    unsubscribe: () => {},
    implementation: "unknown",
    connectionStatus: "closed",
  });

export const WebSocketEventProvider = ({
  children,
  implementation,
  connectionStatus,
  userId,
  connectionId,
  wsAdapter,
}: WebSocketEventProviderProps) => {
  console.log("[tako][comp] events.tsx: WebSocketEventProvider invoked: ", {
    implementation,
    connectionStatus,
  });

  const ref = useRef<{
    [topic: string]: { [key: string]: SubscriptionCallback };
  }>({});

  const mutexRef = useRef<Mutex>(new Mutex());

  console.log(
    "[tako][comp] events.tsx: WebSocketEventProvider: Implementation: ",
    implementation
  );

  function subscribe(topic: string, key: string, cb: SubscriptionCallback) {
    const sharedRecords = createSharedRecordsAccessor(
      ref.current,
      mutexRef.current
    );

    const func = async () => {
      await sharedRecords.use(async (records) => {
        if (!records[topic]) {
          records[topic] = {};
        }

        records[topic][key] = cb;
      });
    };

    func();
  }

  function unsubscribe(topic: string, key: string) {
    const sharedRecords = createSharedRecordsAccessor(
      ref.current,
      mutexRef.current
    );

    const func = async () => {
      await sharedRecords.use(async (records) => {
        if (!records[topic]) return;

        delete records[topic][key];
      });
    };

    func();
  }

  const callback = async (payload: any) => {
    const json = payload as {
      event: string;
      payload: unknown;
    };

    const sharedRecords = createSharedRecordsAccessor(
      ref.current,
      mutexRef.current
    );

    await sharedRecords.use(async (records) => {
      const topic = records[json.event];

      if (!topic) return;

      const subscribers = Object.entries(topic).map((o) => o[1]);

      subscribers.forEach((subscribe) => {
        subscribe(json.payload);
      });
    });
  };

  if (implementation === "bento") {
    wsAdapter.onConnect = async (wso) => {
      const correlationId = ulid();
      await adaptiveSubscribe(
        wsAdapter,
        PUBLISHED_TOPIC,
        async (payload) => {
          console.log(
            `[tako][comp] events.tsx: WebSocketEventProvider: wsAdapter.onConnect(): [${correlationId}] subscribe payload: `,
            { payload }
          );

          await callback(payload);
        },
        async (err) => {
          console.error(
            "events.tsx: WebSocketEventProvider: wsAdapter.onConnect(): ",
            err
          );
        },
        correlationId,
        wso
      );
    };
  }

  useWsSubscribe(PUBLISHED_TOPIC, callback, [connectionStatus, connectionId]);

  return (
    <WebSocketEventProviderContext.Provider
      value={{ subscribe, unsubscribe, implementation, connectionStatus }}
    >
      {children}
    </WebSocketEventProviderContext.Provider>
  );
};

export const useWebSocketEventProvider = () => {
  return useContext(WebSocketEventProviderContext);
};

export function useWebSocketEventSubscribe(
  topic: string,
  key: string,
  cb: SubscriptionCallback,
  deps: unknown[] = []
) {
  const { subscribe, unsubscribe } = useContext(WebSocketEventProviderContext);

  const mountedRef = useRef(false);

  useEffect(() => {
    mountedRef.current = true;

    (async () => {
      console.log(
        "[tako][comp] events.tsx: useWebSocketEventSubscribe(): Subscribing to: ",
        {
          topic,
          key,
        }
      );

      subscribe && (await subscribe(topic, key, cb));
    })();

    return () => {
      mountedRef.current = false;

      (async () => {
        console.log(
          "[tako][comp] events.tsx: useWebSocketEventSubscribe(): Unsubscribing from: ",
          {
            topic,
            key,
          }
        );

        unsubscribe && (await unsubscribe(topic, key));
      })();
    };
  }, deps);
}
