import { asyncScheduler, merge, Observable, ReplaySubject, Subject, Subscription, timer } from 'rxjs';
import { first, map, skip, switchMap, take, throttleTime } from 'rxjs/operators';

const REPLAY_SIZE = 1;

export class CachedRequestResponse<T> {
  private request: Subject<void>;
  private responseSubject: Subject<T>;
  private response$: Observable<T>;
  private requestSubscription: Subscription = new Subscription;
  private forceRefresh$: Subject<void>;

  constructor(producerFactory: () => Observable<T>, cacheTime: number, autoRefresh: boolean = false) {
    this.request = new Subject();
    this.responseSubject = new ReplaySubject<T>(REPLAY_SIZE);
    this.response$ = this.responseSubject.asObservable();
    this.forceRefresh$ = new Subject();

    this.setupRequestResponse(producerFactory, cacheTime, autoRefresh);
  }

  private setupRequestResponse(producerFactory: () => Observable<T>, cacheTime: number, autoRefresh: boolean) {
    const throttledRequest$ = this.request.pipe(
      // Throttle to invalidate cache after cacheTime. Trailing events should not trigger request.
      throttleTime(cacheTime, asyncScheduler, { leading: true, trailing: false })
    );

    const pipedRequest$ = autoRefresh
      ? throttledRequest$.pipe(
          // timer forces requests with cacheTime cadence
          switchMap(() => timer(0, cacheTime))
        )
      : throttledRequest$;

    // force refresh when requested
    this.requestSubscription = merge(pipedRequest$, this.forceRefresh$).subscribe(() => {
      // first() forces auto-closure after first emission and spares us handling the unsubscription
      producerFactory()
        .pipe(first())
        .subscribe(
          (response) => {
            this.responseSubject.next(response);
          },
          (error) => {
            this.responseSubject.error(error);
          }
        );
    });
  }

  public get(): Observable<T> {
    this.request.next();
    return this.response$;
  }

  public refresh(): Observable<void> {
    this.forceRefresh$.next();
    return this.responseSubject.pipe(
      // Avoid immediate firing of the cached value
      skip(REPLAY_SIZE),
      // Notify on the completion of the actual cache-refresh request
      take(1),
      // Don't return the new response data as to avoid misuse of this method
      map(() => {})
    );
  }

  public unsubscribe(): void {
    this.requestSubscription.unsubscribe();
  }
}
