// @see https://github.com/lamisChebbi/ng-realtime-dashboard/blob/master/src/app/services/data.service.ts

import { Injectable } from '@angular/core';
import { Auth } from 'aws-amplify';
import { NGXLogger } from 'ngx-logger';
import { EMPTY, Observable, Subject, timer } from 'rxjs';
import { catchError, delayWhen, distinctUntilChanged, filter, retryWhen, switchAll, tap } from 'rxjs/operators';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { environment } from '../../environments/environment';

export const WS_ENDPOINT = environment.wsUrl;
export const RECONNECT_INTERVAL = 5000;

export enum PublishEventAction {
  TRACK_SHIPMENT_RESULT = 'TRACK_SHIPMENT_RESULT',
  BILL_LADING_PARSING_RESULT = 'BILL_LADING_PARSING_RESULT',
  NEW_MESSAGE = 'NEW_MESSAGE',
}

@Injectable({
  providedIn: 'root',
})
export class WebsocketService {
  private socket$: WebSocketSubject<any>;
  private messagesSubject$ = new Subject<any>();
  // eslint-disable-next-line @typescript-eslint/member-ordering
  public messages$ = this.messagesSubject$.pipe(
    switchAll<any>(),
    distinctUntilChanged((prev, curr) => JSON.stringify(prev) === JSON.stringify(curr)),
    catchError(e => {
      throw e;
    }),
  );

  constructor(private logger: NGXLogger) {}

  /**
   * Creates a new WebSocket subject and send it to the messages subject
   *
   * @param cfg if true the observable will be retried.
   */
  public connect(cfg: { reconnect: boolean } = { reconnect: false }): void {
    Auth.currentUserInfo()
      .then(user => {
        if (!user) return;
        if (!this.socket$ || this.socket$.closed) {
          this.socket$ = this.getNewWebSocket(user.attributes.email);
          const messages = this.socket$.pipe(
            cfg.reconnect ? this.reconnect : o => o,
            tap({
              error: error => this.logger.debug('connect', error),
            }),
            catchError(_ => EMPTY),
          );
          // toDO only next an observable if a new subscription was made double-check this
          this.messagesSubject$.next(messages);
        }
      })
      .catch(err => {
        this.logger.debug('Connect err:', err);
      });
  }

  close() {
    this.socket$.complete();
    this.socket$ = undefined;
  }

  sendMessage(msg: any) {
    this.socket$.next(msg);
  }

  docParsedEvent() {
    return this.messages$.pipe(filter(data => data.event === PublishEventAction.BILL_LADING_PARSING_RESULT));
  }

  newMessageEvent() {
    return this.messages$.pipe(filter(data => data.event === PublishEventAction.NEW_MESSAGE));
  }

  /**
   * Retry a given observable by a time span
   *
   * @param observable the observable to be retried
   */
  private reconnect(observable: Observable<any>): Observable<any> {
    return observable.pipe(
      retryWhen(errors =>
        errors.pipe(
          tap(val => this.logger.log('[Websocket] Try to reconnect', val)),
          delayWhen(_ => timer(RECONNECT_INTERVAL)),
        ),
      ),
    );
  }

  /**
   * Return a custom WebSocket subject which reconnects after failure
   */
  private getNewWebSocket(email: string) {
    const url = WS_ENDPOINT + '?id=' + btoa(email);
    return webSocket({
      url,
      openObserver: {
        next: () => {
          this.logger.info('[Websocket]: connection ok');
        },
      },
      closeObserver: {
        next: () => {
          this.logger.info('[Websocket]: connection closed');
          this.socket$ = undefined;
          this.connect({ reconnect: true });
        },
      },
    });
  }
}
