import { Injectable } from "@angular/core";
import { Observable, Subscriber, Subscription } from "rxjs";
import Timeout = NodeJS.Timeout;
import { distinctUntilChanged } from "rxjs/operators";

@Injectable({
  providedIn: "root",
})
export class WebSocketWithRetry {
  private readonly _messages: Observable<string>;
  private _subscribers: Subscriber<string>[] = [];
  private _ws: WebSocket | null = null;
  private _checkInterval: Timeout | null = null;
  private _urlObservable: Observable<string | null>;
  private url: string | null = null;
  private urlObservableSubscriber: Subscription | null = null;

  constructor(urlObservable: Observable<string | null>) {
    this._urlObservable = urlObservable.pipe(distinctUntilChanged());
    this._messages = new Observable((subscriber) => {
      this._subscribers.push(subscriber);
      this._checkWsConnection();
      this._checkUrlObserving();

      return () => {
        this._subscribers = this._subscribers.filter((s) => s !== subscriber);
        this._checkWsConnection();
        this._checkUrlObserving();
      };
    });
  }

  private _checkWsConnection(): void {
    if (this._subscribers.length === 0) {
      this._stopWsConnection();
      return;
    }

    if (!this.url) {
      this._stopWsConnection();
      return;
    }

    if (
      !this._ws ||
      (this._ws.readyState !== WebSocket.OPEN &&
        this._ws.readyState !== WebSocket.CONNECTING) ||
      this._ws.url !== this.url
    ) {
      if (this._ws) {
        try {
          this._ws.close();
        } catch (ignore) {}
        this._ws = null;
      }
      this._startWsConnection();
    }
  }

  private _checkUrlObserving(): void {
    if (this._subscribers.length === 0) {
      this._stopObserveUrl();
      return;
    }

    this._startObserveUrl();
  }

  private _startWsConnection(): void {
    if (this._checkInterval === null) {
      this._checkInterval = setInterval(() => {
        this._checkWsConnection();
      }, 5000);
    }

    if (this.url === null) {
      return;
    }

    this._ws = new WebSocket(this.url);
    this._ws.onmessage = (e) => this._onWsMessage(e);
    this._ws.onerror = (e) => this._onWsError(e);
    this._ws.onclose = () => {
      this._checkWsConnection();
    };
    console.debug("WebSocketWithRetry %s started", this.url);
  }

  private _onWsMessage(e): void {
    console.debug("WebSocketWithRetry %s message", this.url, e);
    this._subscribers.forEach((subscriber) => subscriber.next(e.data));
  }

  private _onWsError(e): void {
    console.debug("WebSocketWithRetry %s error", this.url, e);
    // this._subscribers.forEach(subscriber => subscriber.error(e));
  }

  private _stopWsConnection(): void {
    if (this._checkInterval !== null) {
      clearInterval(this._checkInterval);
      this._checkInterval = null;
    }
    if (this._ws) {
      try {
        this._ws.close();
      } catch (ignore) {}
      console.debug("WebSocketWithRetry %s stopped", this.url);
      this._ws = null;
    }
  }

  public get messages(): Observable<string> {
    return this._messages;
  }

  private _startObserveUrl() {
    if (this.urlObservableSubscriber !== null) {
      return;
    }

    this.urlObservableSubscriber = this._urlObservable.subscribe((url) => {
      this.url = url;
      this._checkWsConnection();
    });
  }

  private _stopObserveUrl() {
    if (this.urlObservableSubscriber === null) {
      return;
    }
    this.urlObservableSubscriber.unsubscribe();
    this.urlObservableSubscriber = null;
  }
}
