import { Injectable } from '@angular/core';
import { concat, forkJoin, Observable, of, Subject, Subscription } from 'rxjs';
import { switchMap, takeWhile, toArray } from 'rxjs/operators';

import {
  IJob,
  JobConfig,
  JobResponse,
  JobResult,
  JobResultType,
} from '../model/job-chain';

@Injectable()
export class JobChainService<T> {
  itemJobIndex = -1;
  itemJobResults: JobResult<T>[] = [];
  itemJobResultSub: Subject<JobResult<T>> = new Subject();
  itemJobsDoneSub: Subject<JobResponse<T>> = new Subject();
  itemJobResultSubscription: Subscription;
  itemJobsDoneSubscription: Subscription;

  itemJobResponseIndex = -1;
  itemJobResponses: JobResponse<T>[] = [];

  constructor() {}

  //#region WITH USER INTERACTION

  public processJobsForItems(
    items: T[],
    jobConfig: JobConfig<T>,
    allDoneSub: Subject<JobResponse<T>[]>
  ): void {
    this.itemJobsDoneSubscription?.unsubscribe();
    this.itemJobsDoneSubscription = this.itemJobsDoneSub.subscribe(
      (jobResponse) => {
        this.itemJobResponses.push(jobResponse);
        this.processJobsForNextItem(items, jobConfig, allDoneSub);
      }
    );
    this.processJobsForNextItem(items, jobConfig, allDoneSub);
  }

  public processJobsForItem(
    item: T,
    jobConfig: JobConfig<T>
  ): Observable<JobResponse<T>> {
    this.itemJobResultSubscription?.unsubscribe();
    this.itemJobResultSubscription = this.itemJobResultSub.subscribe(
      (jobResult) => {
        this.itemJobResults.push(jobResult);

        item = jobResult.data;

        // Stop pipeline on 'cancel'.
        if (
          jobConfig.stopPipelineOnCancel &&
          jobResult.type === JobResultType.CANCEL
        ) {
          jobConfig.jobInterruptionListener?.onJobInterruption(item, jobResult);
          this.finishItemJobs(item);
        }
        // Stop pipeline on 'error'.
        else if (
          jobConfig.stopPipelineOnError &&
          jobResult.type === JobResultType.ERROR
        ) {
          jobConfig.jobInterruptionListener?.onJobInterruption(item, jobResult);
          this.finishItemJobs(item);
        } else {
          this.processNextJob(item, jobConfig.jobs);
        }
      }
    );

    jobConfig.jobs.forEach((job) => {
      job.response$ = this.itemJobResultSub;
    });

    this.processNextJob(item, jobConfig.jobs);

    return this.itemJobsDoneSub;
  }

  //#region private
  private processNextJob(item: T, jobs: IJob<T>[]): void {
    this.itemJobIndex++;
    if (this.itemJobIndex < jobs.length) {
      const job = jobs[this.itemJobIndex];
      job.startJob(item);
    } else {
      this.finishItemJobs(item);
    }
  }

  private processJobsForNextItem(
    items: T[],
    jobConfig: JobConfig<T>,
    allDoneSub: Subject<JobResponse<T>[]>
  ): void {
    this.itemJobResponseIndex++;
    if (this.itemJobResponseIndex < items.length) {
      const item = items[this.itemJobResponseIndex];
      this.processJobsForItem(item, jobConfig);
    } else {
      this.finishItems(items, allDoneSub);
    }
  }

  private finishItemJobs(item: T): void {
    this.itemJobResultSubscription?.unsubscribe();
    this.itemJobIndex = -1;
    this.itemJobsDoneSub.next(new JobResponse<T>(item, this.itemJobResults));
    this.itemJobResults = [];
  }

  private finishItems(items: T[], allDoneSub: Subject<JobResponse<T>[]>): void {
    this.itemJobsDoneSubscription?.unsubscribe();
    this.itemJobResponseIndex = -1;
    allDoneSub.next(this.itemJobResponses);
    this.itemJobResponses = [];
  }
  //#endregion private

  //#endregion WITH USER INTERACTION

  //#region AUTOMATED WITHOUT USER INTERACTION

  public processAutomatedJobsForItems(
    items: T[],
    jobConfig: JobConfig<T>
  ): Observable<JobResponse<T>[]> {
    const chainedJobResponses = items.map((item) =>
      this.processAutomatedJobsForItem(item, jobConfig)
    );

    let jobChainResponses: Observable<JobResponse<T>[]>;

    if (jobConfig.processItemsSimultaneously) {
      jobChainResponses = forkJoin(chainedJobResponses).pipe(
        switchMap((val) => val),
        toArray()
      );
    } else {
      jobChainResponses = concat(...chainedJobResponses).pipe(toArray());
    }

    return jobChainResponses;
  }

  public processAutomatedJobsForItem(
    item: T,
    jobConfig: JobConfig<T>
  ): Observable<JobResponse<T>> {
    const allJobs: Observable<JobResult<T>>[] = jobConfig.jobs.map((job) => {
      return job.startAutomatedJob(item);
    });
    const chainedJobResultsObservable = concat(...allJobs).pipe(
      takeWhile((jobResult) => {
        // Stop pipeline on 'cancel'.
        if (
          jobConfig.stopPipelineOnCancel &&
          jobResult.type === JobResultType.CANCEL
        ) {
          jobConfig.jobInterruptionListener?.onJobInterruption(item, jobResult);
          return false;
        }
        // Stop pipeline on 'error'.
        if (
          jobConfig.stopPipelineOnError &&
          jobResult.type === JobResultType.ERROR
        ) {
          jobConfig.jobInterruptionListener?.onJobInterruption(item, jobResult);
          return false;
        }

        return true;
      }),
      toArray()
    );

    // Concat all job results into a single observable for this item.
    const jobChainResponse = chainedJobResultsObservable.pipe(
      switchMap((jobResults) => {
        return of(new JobResponse<T>(item, jobResults));
      })
    );

    return jobChainResponse;
  }

  //#endregion AUTOMATED WITHOUT USER INTERACTION
}
