// Assuming browser environment that supports fetch and ReadableStream
import { PYTH_BENCHMARKS_TV_API } from "./usePythDatafeed";
import useSWRSubscription from "swr/subscription";
import { getUnixTime } from "date-fns";

const channelToSubscription = new Map();
let controller;

type PriceStream = {
  price: number;
  time: number;
};
type PriceStreamData = {
  [key: string]: PriceStream;
};

function handleStreamingData(data) {
  const { id, p, t } = data;

  const tradePrice = p;
  const tradeTime = t * 1000; // Multiplying by 1000 to get milliseconds

  const channelString = id;
  const subscriptionItem = channelToSubscription.get(channelString);

  if (!subscriptionItem) {
    return;
  }

  const lastDailyBar = subscriptionItem.lastDailyBar;
  const nextDailyBarTime = getNextDailyBarTime(lastDailyBar.time);

  let bar;
  if (tradeTime >= nextDailyBarTime) {
    bar = {
      time: nextDailyBarTime,
      open: tradePrice,
      high: tradePrice,
      low: tradePrice,
      close: tradePrice,
    };
    // console.log("[stream] Generate new bar", bar);
  } else {
    bar = {
      ...lastDailyBar,
      high: Math.max(lastDailyBar.high, tradePrice),
      low: Math.min(lastDailyBar.low, tradePrice),
      close: tradePrice,
    };
    // console.log("[stream] Update the latest bar by price", tradePrice);
  }

  subscriptionItem.lastDailyBar = bar;
  // Send data to every subscriber of that symbol
  subscriptionItem.handlers.forEach((handler) => handler.callback(bar));
  channelToSubscription.set(channelString, subscriptionItem);
}

function startStreaming(delay: number = 3000) {
  if (controller) {
    return controller;
  }
  controller = new AbortController();
  const abortTimeout = setTimeout(() => controller.abort(), delay);
  fetch(`${PYTH_BENCHMARKS_TV_API}/streaming`, { signal: controller.signal })
    .then((response) => {
      clearTimeout(abortTimeout);
      const reader = response.body?.getReader();

      function streamData() {
        if (!reader) {
          return;
        }
        reader
          .read()
          .then(({ value, done }) => {
            if (done) {
              // console.error("[stream] Streaming ended.");
              return;
            }

            // Assuming the streaming data is separated by line breaks
            const dataStrings = new TextDecoder().decode(value).split("\n");

            dataStrings.forEach((dataString) => {
              const trimmedDataString = dataString.trim();
              if (trimmedDataString) {
                try {
                  let jsonData = JSON.parse(trimmedDataString);
                  handleStreamingData(jsonData);
                } catch (e) {
                  // console.error("Error parsing JSON:", e.message);
                }
              }
            });

            streamData(); // Continue processing the stream
          })
          .catch((error) => {
            // console.error("[stream] Error reading from stream:", error);
            attemptReconnect(delay);
          });
      }

      streamData();
    })
    .catch((error) => {
      // console.error("[stream] Error fetching from the streaming endpoint:", error);
      attemptReconnect(delay);
    });
  function attemptReconnect(delay) {
    // console.log(`[stream] Attempting to reconnect in ${delay}ms...`);
    setTimeout(() => {
      controller = null;
      startStreaming(delay);
    }, delay);
  }
  return controller;
}

function getNextDailyBarTime(barTime) {
  const date = new Date(barTime * 1000);
  date.setDate(date.getDate() + 1);
  return date.getTime() / 1000;
}

function mergeHandlers(id, handlers) {
  const { handlers: existingHandlers = [] } = channelToSubscription.get(id) || {};
  return [...existingHandlers.filter((handler) => !handlers.map(({ id }) => id).includes(handler.id)), ...handlers];
}

export function subscribeOnStream(
  symbolInfo,
  resolution,
  onRealtimeCallback,
  subscriberUID,
  onResetCacheNeededCallback,
  lastDailyBar
) {
  const channelString = symbolInfo.ticker;
  const handler = {
    id: subscriberUID,
    callback: onRealtimeCallback,
  };
  const subscriptionItem = {
    subscriberUID,
    resolution,
    lastDailyBar,
    handlers: mergeHandlers(channelString, [handler]),
  };
  channelToSubscription.set(channelString, subscriptionItem);
  // console.log("[subscribeBars]: Subscribe to streaming. Channel:", channelString);

  // Start streaming when the first subscription is made
  startStreaming();
}

export function unsubscribeFromStream(subscriberUID) {
  // Find a subscription with id === subscriberUID
  for (const channelString of channelToSubscription.keys()) {
    const subscriptionItem = channelToSubscription.get(channelString);
    const handlerIndex = subscriptionItem.handlers.findIndex((handler) => handler?.id === subscriberUID);

    if (handlerIndex !== -1) {
      // Unsubscribe from the channel if it is the last handler
      // console.log("[unsubscribeBars]: Unsubscribe from streaming. Channel:", channelString);
      if (subscriptionItem.handlers.length === 1) {
        channelToSubscription.delete(channelString);
      } else {
        delete subscriptionItem.handlers[handlerIndex];
        channelToSubscription.set(channelString, subscriptionItem);
      }
      break;
    }
  }
}

export const usePythSreaming = (ids: string[] = []): PriceStreamData | undefined => {
  const { data } = useSWRSubscription<PriceStreamData>("pyth-network-stream", (key, { next }) => {
    for (const id of ids) {
      const handler = {
        id: key,
        callback: (bar) => {
          next(null, (prev) => ({ ...prev, [id]: { price: bar.close, time: bar.time } }));
        },
      };
      const subscriptionItem = channelToSubscription.get(id) || {};
      channelToSubscription.set(id, {
        lastDailyBar: { high: 0, low: 0, time: getUnixTime(Date.now()) },
        ...subscriptionItem,
        handlers: mergeHandlers(id, [handler]),
      });
    }
    startStreaming();
    return () => {};
  });

  return data;
};
