import { Pipe, PipeTransform } from '@angular/core';
import { WebSocketSubject, webSocket } from 'rxjs/webSocket';
import { map, retryWhen, tap, delay } from 'rxjs/operators';
import { Observable } from 'rxjs';

@Pipe({
  name: 'roadRef'
})

export class RoadRefPipe implements PipeTransform {
  private ws: WebSocketSubject<any>;
  constructor() {
    this.ws = webSocket('ws://localhost:8080/roadref/roadref-ws');

    this.ws.asObservable().pipe(
      retryWhen(errors =>
        errors.pipe(
          tap(err => {
            console.error('Got error', err);
          }),
          delay(1000)
        )
      ));
  }

  transform = (value: any, args?: any): Observable<any> => {
    if (value.lat !== 0) {
      this.ws.next(value);
    }
    return this.ws;
  }
}
