import { filter, map } from 'rxjs/operators';
import { Observable, fromEvent, BehaviorSubject } from 'rxjs';
/**
 * Useful to ignore the first value emitted by the Behavior Subject
 */
const INITIAL_VALUE = 'dummy_initial_value' as any;
type formatterFunction<T, R> = (d: T) => R;

/**
 * Class that listen webworker and resolve messages emiited by the webworker as an Observable
 */
class WorkerRx<T> {

  private _result$ = new BehaviorSubject<T>(INITIAL_VALUE);

  public get result$() {
    return this._result$;
  }

  constructor(
    private _worker: Worker,
    private _workerMessageDataFormat?: formatterFunction<any, T>
  ) {
    // workerMessage$
    fromEvent<MessageEvent<T>>(this._worker, 'message').pipe(
      map(({ data }) => {
        if (typeof this._workerMessageDataFormat === 'function') {
          data = this._workerMessageDataFormat(data);
        }
        return data;
      })
    ).subscribe(
      (data) => this._result$.next(data)
    );

    // workerError$
    fromEvent<MessageEvent<T>>(this._worker, 'error').subscribe(
      (err) => this._result$.error(err)
    );

    // workerMessageError$
    fromEvent<MessageEvent<T>>(this._worker, 'messageerror').subscribe(
      (err) => this._result$.error(err)
    );
  }

}

/**
 * Custom RxJs operator that execute a function asynchroniously throught a web worker (multithread)
 * @param worker Web worker instance
 * @param dataFormatter - OPTIONAL - A transformation function to adapt the data coming from the previous operation (type T) to the type expected at the entry of the worker function (type U)
 * @param workerMessageDataFormat - OPTIONAL - A transformation function to adapt the data coming from the worker (type any) to the type expected after this operator (type R)
 */
export function workerMap<T, R, U>(worker: Worker, dataFormatter?: formatterFunction<T, U>, workerMessageDataFormat?: formatterFunction<any, R>) {

  const workerRx = new WorkerRx(worker, workerMessageDataFormat);

  return (source$: Observable<T>): Observable<R> => {
    source$.subscribe({
      next(value) {
        if (typeof dataFormatter === 'function') {
          worker.postMessage(dataFormatter(value));
        } else {
          worker.postMessage(value);
        }
      },
      error(error) {
        workerRx.result$.error(error);
      },
      complete() {
        workerRx.result$.complete();
      },
    });
    return workerRx.result$.pipe(filter(data => data !== INITIAL_VALUE));
  };

}

/**
 * Experimental - create a webworker during the runtime
 * @param cb Only pure function
 */
export function workerMapFunction<T, R>(cb: (data: any) => void) {

  if (cb.toString().match(/\W*this\W/gm)) {
    throw new Error('workerMapFunction operator support only pure function (no this reference)');
  }

  const workerFunctionBlob = new Blob([
    `const cb = ${cb.toString().replace(/^(\w*)(\([\w,\s]*\)\s?{)/, 'function$2')};
    addEventListener('message', ({data}) => {
      postMessage(cb(data));
    });`
  ], {
    type: 'text/javascript',
  });
  const worker = new Worker(URL.createObjectURL(workerFunctionBlob));
  const workerRx = new WorkerRx<R>(worker);

  return (source$: Observable<T>): Observable<R> => {
    source$.subscribe({
      next(value) {
        worker.postMessage(value);
      },
      error(error) {
        workerRx.result$.error(error);
      },
      complete() {
        workerRx.result$.complete();
        worker.terminate();
      },
    });
    return workerRx.result$;
  };

}
