import { useEffect, useRef, useState } from "react";
import { useAtom } from "jotai";
// import { Mutex } from "async-mutex";
import awsIot from "aws-iot-device-sdk";
import { ulid } from "ulid";

import {
  accessKeyIdAtom,
  connectionIdAtom,
  correlationIdAtom,
  identityIdAtom,
  secretKeyAtom,
  sessionIdAtom,
  stsTokenAtom,
  takoExpiresAtAtom,
  userIdAtom,
} from "../atoms";
// import { createLocker } from "./concurrency";
import { CredsObject, GetCredentialsApiResponse } from "./common";
import { REGION } from "../constants";
import { appPrefix } from "./constants";
// import { makeItemKey } from "./helper";
import { useWebSocket } from "./use_websocket";

export type IoTConnectionStatuses =
  | "connected"
  | "reconnecting"
  | "closed"
  | "offline"
  | "pending";

interface Prop {
  apiEndpoint: string;
  wssUrl: string;
  userId?: string;
}

export default function useIot({ apiEndpoint, wssUrl, userId }: Prop) {
  const { setConnectionStatus } = useWebSocket();

  const mqttClient = useRef<awsIot.device | null>(null);

  // const connMutexRef = useRef(new Mutex());

  // const connLocker = createLocker({
  //   isEnabled: true,
  //   isLocal: true,
  //   mutexObject: connMutexRef.current,
  // });

  // const CONN_LOCKER = "connLocker";

  let topicMap = useRef<
    Record<string, Record<string, (payload: unknown) => void>>
  >({});
  const [iotStatus, setIotStatus] = useState<IoTConnectionStatuses>("pending");
  const [currentUserId, setCurrentUserId] = useAtom(userIdAtom);
  const [sessionId] = useAtom(sessionIdAtom);
  const [correlationId] = useAtom(correlationIdAtom);
  const [connectionId, setConnectionId] = useAtom(connectionIdAtom);
  const [accessKeyId, setAccessKeyId] = useAtom(accessKeyIdAtom);
  const [secretKey, setSecretKey] = useAtom(secretKeyAtom);
  const [stsToken, setStsToken] = useAtom(stsTokenAtom);
  const [identityId, setIdentityId] = useAtom(identityIdAtom);
  const [takoExpiresAt, setTakoExpiresAt] = useAtom(takoExpiresAtAtom);

  const CREDS_EXPIRY_ALLOWANCE_MINS = 5;
  const MAX_RETRIES_QUERY_GET_CREDENTIALS = 3;
  const WAIT_MS_QUERY_GET_CREDENTIALS = 200;
  const prefix = "tako";

  const sleep = async (millis: number): Promise<void> =>
    new Promise<void>((resolve) => {
      setTimeout(resolve, millis);
    });

  const saveSession = (userId: string, data: GetCredentialsApiResponse) => {
    console.info(
      `[${prefix}][session] use_iot.ts: saveSession: [${appPrefix}] saving new session...`
    );

    setTakoExpiresAt(
      JSON.stringify(
        data.credentials.expiration
          ? new Date(data.credentials.expiration).getTime()
          : 0
      )
    );

    setCurrentUserId(userId);
    setConnectionId(data.connectionId);
    setAccessKeyId(data.credentials.accessKeyId);
    setSecretKey(data.credentials.secretAccessKey);
    setStsToken(data.credentials.sessionToken ?? "");
    setIdentityId(data.credentials.identityId);
  };

  const unsetSession = () => {
    setCurrentUserId("");
    setConnectionId("");
    setAccessKeyId("");
    setSecretKey("");
    setStsToken("");
    setIdentityId("");
    setTakoExpiresAt("");
  };

  const getSession = () => {
    if (
      !takoExpiresAt ||
      !accessKeyId ||
      !secretKey ||
      !stsToken ||
      !identityId ||
      !connectionId
    )
      return null;

    const exp = JSON.parse(takoExpiresAt);

    const ret: GetCredentialsApiResponse = {
      connectionId,
      credentials: {
        accessKeyId,
        secretAccessKey: secretKey,
        sessionToken: stsToken,
        expiration: !isNaN(exp) && exp > 0 ? new Date(exp).toISOString() : "",
        identityId,
      },
    };

    return ret;
  };

  const setRefreshTimer = async (expire: string) => {
    const expirationMillis = new Date(expire).getTime();
    const now = Date.now();

    let timeLeft =
      expirationMillis - now - CREDS_EXPIRY_ALLOWANCE_MINS * 60 * 1000;
    if (timeLeft < 0) timeLeft = 0;

    console.info(
      `[${prefix}][session] use_iot.ts: setRefreshTimer: Setting timer, time left: ${timeLeft} ms`
    );

    // set a job timeout to run a refresh credentials here
    const timer = setTimeout(async () => {
      // refresh creds
      console.info(
        `[${prefix}][session] use_iot.tsx: setRefreshTimer: refreshing credentials`
      );

      const gcr = await getCredentials({
        prevConnectionId: connectionId,
        forced: true,
      });

      if (!gcr?.creds)
        throw new Error(
          `[${prefix}][session] Unable to refresh connection credentials`
        );

      if (mqttClient.current) {
        console.info(
          `[${prefix}][session] use_iot.tsx: setRefreshTimer: mqttClient update creds`
        );
        mqttClient.current.updateWebSocketCredentials(
          gcr.creds.aws_access_id,
          gcr.creds.aws_secret_key,
          gcr.creds.aws_sts_token,
          new Date(gcr.creds.expiration)
        );
      }

      clearTimeout(timer);
    }, timeLeft);

    return timer;
  };

  const isCredentialsExpired = (expire?: string) => {
    if (!expire) return true;

    const now = Date.now();

    const expirationMillis = new Date(expire).getTime();

    return expirationMillis - now < CREDS_EXPIRY_ALLOWANCE_MINS * 60 * 1000;
  };

  const getCredentials = async ({
    prevConnectionId,
    forced,
  }: {
    prevConnectionId?: string;
    forced?: boolean;
  }) => {
    console.log(`[${prefix}][session] use_iot.tsx: setting credentials`);

    const storedCreds = getSession();

    if (
      currentUserId === userId &&
      !!storedCreds &&
      !forced &&
      !isCredentialsExpired(storedCreds.credentials.expiration)
    ) {
      console.info(
        `[${prefix}][session] use_iot.ts: getCredentials: Using existing session, not invoking API`
      );

      const creds: CredsObject = {
        aws_region: REGION,
        aws_access_id: storedCreds.credentials.accessKeyId,
        aws_secret_key: storedCreds.credentials.secretAccessKey,
        aws_sts_token: storedCreds.credentials.sessionToken ?? "",
        aws_identity_id: storedCreds.credentials.identityId,
        expiration: storedCreds.credentials.expiration ?? "",
        connectionId: storedCreds.connectionId,
      };

      const timer = await setRefreshTimer(creds.expiration);

      return {
        timer,
        creds,
      };
    }

    console.info(
      `[${prefix}][session] use_iot.ts: getCredentials: No saved session, credentials expired, or forced, invoking API...`
    );

    let CREDS_URL = `${apiEndpoint}/v2/credentials`;
    const q = new URLSearchParams();

    if (userId) q.append("user_id", userId);
    if (sessionId) q.append("session_id", sessionId);
    if (correlationId) q.append("correlation_id", correlationId);
    if (prevConnectionId) q.append("prev_conn_id", prevConnectionId);

    if (q.entries.length > 0) CREDS_URL = `${CREDS_URL}?${q.toString()}`;

    const retryCount = MAX_RETRIES_QUERY_GET_CREDENTIALS;
    const waitBaseDurationMs = WAIT_MS_QUERY_GET_CREDENTIALS;

    let i = 0;
    let waitDuration = waitBaseDurationMs;
    let resp: Response | null = null;

    while (i <= retryCount) {
      resp = await fetch(CREDS_URL, {
        headers: {
          "Content-Type": "application/json",
        },
      });

      if (resp.ok) break;

      if (resp.status === 503 || resp.status === 429) {
        // Service Unavailable/Too Many Request
        ++i;

        if (i > retryCount) {
          console.info(
            `[${prefix}][session] use_iot.ts: getCredentials: Max retry count ${retryCount} already exceeded, giving up`
          );

          break;
        }

        console.info(
          `[${prefix}][session] use_iot.ts: getCredentials: Retrying operation, retry count = ${i}, max = ${retryCount}, status was ${resp.status} - ${resp.statusText}`
        );

        await sleep(waitDuration);
        waitDuration = waitDuration * 2; // Exponential wait
      } else {
        // Do not retry on other errors
        console.info(
          `[${prefix}][session] use_iot.ts: getCredentials: Got error, not retrying: ${resp.status} - ${resp.statusText}`
        );

        break;
      }
    }

    if (!resp)
      throw new Error(
        `[${prefix}][session] use_iot.ts: getCredentials: Unable to retrieve credentials`
      );

    if (resp.ok) {
      const result = (await resp.json()) as GetCredentialsApiResponse;

      const creds: CredsObject = {
        aws_region: REGION,
        aws_access_id: result.credentials.accessKeyId,
        aws_secret_key: result.credentials.secretAccessKey,
        aws_sts_token: result.credentials.sessionToken ?? "",
        aws_identity_id: result.credentials.identityId,
        expiration: result.credentials.expiration ?? "",
        connectionId: result.connectionId,
      };

      saveSession(userId ?? "", result);
      const timer = await setRefreshTimer(creds.expiration);

      return {
        timer,
        creds,
      };
    }

    return null;
  };

  const connectToMqtt = async (creds: {
    connectionId: string;
    accessKeyId: string;
    secretKey: string;
    stsToken: string;
  }) => {
    console.log("[tako][session] :: connecting to tako mqtt server ...");

    const endpointUrl = new URL(wssUrl);

    const deviceOpts: awsIot.DeviceOptions = {
      protocol: "wss",
      clientId: creds.connectionId,
      host: endpointUrl.hostname,
      path: endpointUrl.pathname,
      accessKeyId: creds.accessKeyId,
      secretKey: creds.secretKey,
      sessionToken: creds.stsToken,
      region: REGION,
    };

    if (endpointUrl.port) deviceOpts.port = +endpointUrl.port;

    try {
      const client = await new Promise<awsIot.device>((resolve, reject) => {
        try {
          const device = new awsIot.device(deviceOpts);
          device.on("close", () => {});
          device.on("connect", () => {
            console.info(
              `[${prefix}][session] use_iot.ts: connectToMqtt: connected to tako succuessfuly!`
            );
            setIotStatus("connected");
            setConnectionStatus("connected");
            mqttClient.current = device;
            resolve(device);
          });
        } catch (err: any) {
          reject(err);
        }
      });

      window.addEventListener("offline", () => {
        console.info(
          `[${prefix}][session] use_iot.ts: connectToMqtt: browser offline`
        );
        setIotStatus("offline");
        setConnectionStatus("offline");
      });

      window.addEventListener("online", () => {
        console.info(
          `[${prefix}][session] use_iot.ts: connectToMqtt: browser online`
        );
        setIotStatus("connected");
        setConnectionStatus("connected");
      });

      client.on("close", async () => {
        console.info(
          `[${prefix}][session] use_iot.ts: connectToMqtt: connection closed`
        );
        setIotStatus("closed");
        setConnectionStatus("closed");
        mqttClient.current = null;
      });

      client.on("error", () => {
        setIotStatus("closed");
        setConnectionStatus("closed");
      });

      client.on("offline", () => {
        console.info(
          `[${prefix}][session] use_iot.ts: connectToMqtt: iot offline`
        );
        setIotStatus("offline");
        setConnectionStatus("offline");
      });

      client.on("reconnect", () => {
        console.info(
          `[${prefix}][session] use_iot.ts: connectToMqtt: reconnecting to tako ...`
        );
        setIotStatus("reconnecting");
        setConnectionStatus("reconnecting");
      });

      client.on("message", (topic, rawPayload) => {
        const payload = JSON.parse(rawPayload.toString());
        console.info(`[${prefix}][session] message: `, payload);

        if (Array.isArray(payload)) {
          payload.forEach((p) => {
            if (topicMap.current[topic]) {
              const funcs = Object.values(topicMap.current[topic]);
              funcs.forEach((func) => {
                func(p);
              });
            }
          });
        } else if (topicMap.current[topic]) {
          const funcs = Object.values(topicMap.current[topic]);
          funcs.forEach((func) => {
            func(payload);
          });
        }
      });
    } catch (connErr: any) {
      console.error(
        `[${prefix}][session] use_iot.ts: connectToMqtt: `,
        connErr
      );
      throw connErr;
    }
  };

  const disconnect = async () => {
    if (mqttClient.current) {
      console.info(
        `[${prefix}][session] use_iot.ts: connectToMqtt: closing connection ${connectionId}`
      );
      mqttClient.current.end(true);
      mqttClient.current = null;
      setIotStatus("closed");
      setConnectionStatus("closed");
    }
  };

  const subscribeSync = async (
    topic: string,
    cb: (payload: unknown) => void,
    errCb?: (error: Error) => void
  ) => {
    if (!mqttClient.current) return;

    const subscriptionId = ulid();

    if (!!topicMap.current[topic])
      topicMap.current = {
        ...topicMap.current,
        [topic]: {
          ...topicMap.current[topic],
          [subscriptionId]: cb,
        },
      };
    else {
      mqttClient.current.subscribe(topic, undefined, (err) => {
        if (errCb && err) {
          errCb(err);
        }
      });

      topicMap.current = {
        ...topicMap.current,
        [topic]: { [subscriptionId]: cb },
      };
    }

    return subscriptionId;
  };

  const unsubscribeSync = async (topic: string, subscriptionId?: string) => {
    if (!mqttClient.current) return;

    if (subscriptionId) delete topicMap.current[topic][subscriptionId];
    else delete topicMap.current[topic];

    if (!subscriptionId) mqttClient.current.unsubscribe(topic);
  };

  const publish = async (topic: string, message: unknown): Promise<void> => {
    const msg = JSON.stringify(message);

    return new Promise<void>((resolve, reject) => {
      if (!mqttClient.current) {
        throw new Error("[tako][session] Device not connected");
      } else {
        mqttClient.current.publish(topic, msg, undefined, (err) => {
          if (err) {
            console.error("[tako][session] iotdevice.ts: publish", err);
            reject(err);
          } else {
            resolve();
          }
        });
      }
    });
  };

  useEffect(() => {
    let timeOut: null | NodeJS.Timeout = null;
    let creds = false;

    (async () => {
      const result = await getCredentials({});

      if (!!result) {
        timeOut = result.timer;
        creds = !!result.creds;
        await connectToMqtt({
          accessKeyId: result.creds.aws_access_id,
          secretKey: result.creds.aws_secret_key,
          stsToken: result.creds.aws_sts_token,
          connectionId: result.creds.connectionId,
        });
      }
    })();

    return () => {
      if (timeOut) clearTimeout(timeOut);
      if (!!creds) unsetSession();
      (async () => {
        await disconnect();
      })();
    };
  }, [userId]);

  return {
    iotStatus,
    subscribeSync,
    unsubscribeSync,
    publish,
  };
}
