/*
 * Decompiled with CFR 0.152.
 */
package com.krupalshah.composer;

import com.krupalshah.composer.Composable;
import com.krupalshah.composer.exception.ComposerException;
import com.krupalshah.composer.exception.ErrorStream;
import com.krupalshah.composer.function.collector.BiCollector;
import com.krupalshah.composer.function.collector.Collector;
import com.krupalshah.composer.function.collector.TriCollector;
import com.krupalshah.composer.function.other.Supplier;
import com.krupalshah.composer.function.other.Validator;
import com.krupalshah.composer.function.tasks.ConsumingTask;
import com.krupalshah.composer.function.tasks.ProducingTask;
import com.krupalshah.composer.function.tasks.SimpleTask;
import com.krupalshah.composer.function.tasks.TransformingTask;
import com.krupalshah.composer.util.KnownFuture;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Composer<T>
implements Composable<T> {
    private static final ExecutorService PARENT_EXECUTOR = Executors.newSingleThreadExecutor();
    private final ExecutorService taskExecutor;
    private final Future<T> future;
    private final ErrorStream errStream;

    private Composer(Future<T> future, ErrorStream errStream, ExecutorService taskExecutor) {
        this.future = future;
        this.errStream = errStream;
        this.taskExecutor = taskExecutor;
    }

    public static <R> Composable<R> startWith(ProducingTask<R> task, ErrorStream errStream) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        return Composer.startWith(task, errStream, executorService);
    }

    public static <R> Composable<R> startWith(R value, ErrorStream errStream) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        return Composer.startWith(value, errStream, executorService);
    }

    public static <R> Composable<R> startWith(ProducingTask<R> task, ErrorStream errStream, ExecutorService taskExecutor) {
        try {
            Future<Object> future = taskExecutor.submit(task::produce);
            return Composer.newComposer(future, errStream, taskExecutor);
        }
        catch (Throwable t) {
            errStream.onError(t);
            return Composer.newComposer(null, errStream, taskExecutor);
        }
    }

    public static <R> Composable<R> startWith(R value, ErrorStream errStream, ExecutorService taskExecutor) {
        try {
            KnownFuture<R> future = new KnownFuture<R>(value);
            return Composer.newComposer(future, errStream, taskExecutor);
        }
        catch (Throwable t) {
            errStream.onError(t);
            return Composer.newComposer(null, errStream, taskExecutor);
        }
    }

    @Override
    public Composable<T> thenRun(SimpleTask task) {
        return this.chainWith(() -> {
            T upstream = this.await();
            if (upstream == null) {
                return this.switchTo(null);
            }
            Future<T> resultFuture = this.async(() -> this.uncheckedTask(task), upstream);
            return this.switchTo(resultFuture);
        });
    }

    @Override
    public Composable<T> thenConsume(ConsumingTask<T> task) {
        return this.chainWith(() -> {
            T upstream = this.await();
            if (upstream == null) {
                return this.switchTo(null);
            }
            Future<T> resultFuture = this.async(() -> this.uncheckedTask(task, upstream), upstream);
            return this.switchTo(resultFuture);
        });
    }

    @Override
    public <R> Composable<R> thenProduce(ProducingTask<R> task) {
        return this.chainWith(() -> {
            this.await();
            Future<Object> future = this.async(task::produce);
            return this.switchTo(future);
        });
    }

    @Override
    public <R> Composable<R> thenTransform(TransformingTask<T, R> task) {
        return this.chainWith(() -> {
            Object upstream = this.await();
            if (upstream == null) {
                return this.switchTo(null);
            }
            Future<Object> future = this.async(() -> task.transform(upstream));
            return this.switchTo(future);
        });
    }

    @Override
    public Composable<T> thenRunTogether(Supplier<Collection<SimpleTask>> tasksSupplier) {
        return this.chainWith(() -> {
            T upstream = this.await();
            if (upstream == null) {
                return this.switchTo(null);
            }
            Collection tasks = (Collection)tasksSupplier.supply();
            Future<T> resultFuture = this.deferred(() -> this.uncheckedTask(() -> {
                CountDownLatch latch = this.newLatch(tasks.size());
                for (SimpleTask task : tasks) {
                    this.async(() -> this.latchedTask(() -> this.uncheckedTask(task), latch));
                }
                latch.await();
            }), upstream);
            return this.switchTo(resultFuture);
        });
    }

    @Override
    public Composable<T> thenConsumeTogether(Supplier<Collection<ConsumingTask<T>>> tasksSupplier) {
        return this.chainWith(() -> {
            T upstream = this.await();
            if (upstream == null) {
                return this.switchTo(null);
            }
            Collection tasks = (Collection)tasksSupplier.supply();
            Future<T> resultFuture = this.deferred(() -> this.uncheckedTask(() -> {
                CountDownLatch latch = this.newLatch(tasks.size());
                for (ConsumingTask task : tasks) {
                    this.async(() -> this.latchedTask(() -> this.uncheckedTask(task, upstream), latch));
                }
                latch.await();
            }), upstream);
            return this.switchTo(resultFuture);
        });
    }

    @Override
    public <S, R> Composable<R> thenProduceTogether(Supplier<Collection<ProducingTask<S>>> tasksSupplier, Collector<List<S>, R> resultsCollector) {
        return this.chainWith(() -> {
            this.await();
            Collection tasks = (Collection)tasksSupplier.supply();
            Future<Object> resultFuture = this.deferred(() -> {
                CountDownLatch latch = this.newLatch(tasks.size());
                LinkedHashSet<Future> futures = new LinkedHashSet<Future>();
                for (ProducingTask task : tasks) {
                    Future future = this.async(() -> this.latchedTask(task::produce, latch));
                    futures.add(future);
                }
                latch.await();
                ArrayList results = new ArrayList();
                for (Future future : futures) {
                    Object result = future.get();
                    results.add(result);
                }
                return resultsCollector.collect(results);
            });
            return this.switchTo(resultFuture);
        });
    }

    @Override
    public <S, U, R> Composable<R> thenProduceTogether(ProducingTask<S> task1, ProducingTask<U> task2, BiCollector<S, U, R> resultsCollector) {
        return this.chainWith(() -> {
            this.await();
            Future<Object> resultFuture = this.deferred(() -> {
                CountDownLatch latch = this.newLatch(2);
                Future<Object> future1 = this.async(() -> this.latchedTask(task1::produce, latch));
                Future<Object> future2 = this.async(() -> this.latchedTask(task2::produce, latch));
                latch.await();
                Object result1 = future1.get();
                Object result2 = future2.get();
                return resultsCollector.collect(result1, result2);
            });
            return this.switchTo(resultFuture);
        });
    }

    @Override
    public <S, U, V, R> Composable<R> thenProduceTogether(ProducingTask<S> task1, ProducingTask<U> task2, ProducingTask<V> task3, TriCollector<S, U, V, R> resultsCollector) {
        return this.chainWith(() -> {
            this.await();
            Future<Object> resultFuture = this.deferred(() -> {
                CountDownLatch latch = this.newLatch(3);
                Future<Object> future1 = this.async(() -> this.latchedTask(task1::produce, latch));
                Future<Object> future2 = this.async(() -> this.latchedTask(task2::produce, latch));
                Future<Object> future3 = this.async(() -> this.latchedTask(task3::produce, latch));
                latch.await();
                Object result1 = future1.get();
                Object result2 = future2.get();
                Object result3 = future3.get();
                return resultsCollector.collect(result1, result2, result3);
            });
            return this.switchTo(resultFuture);
        });
    }

    @Override
    public <S, R> Composable<R> thenTransformTogether(Supplier<Collection<TransformingTask<T, S>>> tasksSupplier, Collector<List<S>, R> resultsCollector) {
        return this.chainWith(() -> {
            Object upstream = this.await();
            if (upstream == null) {
                return this.switchTo(null);
            }
            Collection tasks = (Collection)tasksSupplier.supply();
            Future<Object> resultFuture = this.deferred(() -> {
                CountDownLatch latch = this.newLatch(tasks.size());
                LinkedHashSet<Future> futures = new LinkedHashSet<Future>();
                for (TransformingTask task : tasks) {
                    Future future = this.async(() -> this.latchedTask(() -> task.transform(upstream), latch));
                    futures.add(future);
                }
                latch.await();
                ArrayList results = new ArrayList();
                for (Future future : futures) {
                    Object result = future.get();
                    results.add(result);
                }
                return resultsCollector.collect(results);
            });
            return this.switchTo(resultFuture);
        });
    }

    @Override
    public <S, U, R> Composable<R> thenTransformTogether(TransformingTask<T, S> task1, TransformingTask<T, U> task2, BiCollector<S, U, R> resultsCollector) {
        return this.chainWith(() -> {
            Object upstream = this.await();
            if (upstream == null) {
                return this.switchTo(null);
            }
            Future<Object> resultFuture = this.deferred(() -> {
                CountDownLatch latch = this.newLatch(2);
                Future<Object> future1 = this.async(() -> this.latchedTask(() -> task1.transform(upstream), latch));
                Future<Object> future2 = this.async(() -> this.latchedTask(() -> task2.transform(upstream), latch));
                latch.await();
                Object result1 = future1.get();
                Object result2 = future2.get();
                return resultsCollector.collect(result1, result2);
            });
            return this.switchTo(resultFuture);
        });
    }

    @Override
    public <S, U, V, R> Composable<R> thenTransformTogether(TransformingTask<T, S> task1, TransformingTask<T, U> task2, TransformingTask<T, V> task3, TriCollector<S, U, V, R> resultsCollector) {
        return this.chainWith(() -> {
            Object upstream = this.await();
            if (upstream == null) {
                return this.switchTo(null);
            }
            Future<Object> resultFuture = this.deferred(() -> {
                CountDownLatch latch = this.newLatch(3);
                Future<Object> future1 = this.async(() -> this.latchedTask(() -> task1.transform(upstream), latch));
                Future<Object> future2 = this.async(() -> this.latchedTask(() -> task2.transform(upstream), latch));
                Future<Object> future3 = this.async(() -> this.latchedTask(() -> task3.transform(upstream), latch));
                latch.await();
                Object result1 = future1.get();
                Object result2 = future2.get();
                Object result3 = future3.get();
                return resultsCollector.collect(result1, result2, result3);
            });
            return this.switchTo(resultFuture);
        });
    }

    @Override
    public Composable<T> thenRunSynchronously(SimpleTask task) {
        return this.chainWith(() -> {
            this.await();
            task.execute();
            return this;
        });
    }

    @Override
    public Composable<T> thenConsumeSynchronously(ConsumingTask<T> task) {
        return this.chainWith(() -> {
            T upstream = this.await();
            if (upstream == null) {
                return this.switchTo(null);
            }
            task.consume(upstream);
            return this;
        });
    }

    @Override
    public <R> Composable<R> thenProduceSynchronously(ProducingTask<R> task) {
        return this.chainWith(() -> {
            this.await();
            Object result = task.produce();
            return this.switchTo(new KnownFuture(result));
        });
    }

    @Override
    public <R> Composable<R> thenTransformSynchronously(TransformingTask<T, R> task) {
        return this.chainWith(() -> {
            T upstream = this.await();
            if (upstream == null) {
                return this.switchTo(null);
            }
            Object result = task.transform(upstream);
            return this.switchTo(new KnownFuture(result));
        });
    }

    @Override
    public Composable<T> thenContinueIf(Validator<T> validator) {
        return this.chainWith(() -> {
            T upstream = this.await();
            if (upstream == null) {
                return this.switchTo(null);
            }
            if (validator.validate(upstream)) {
                return this;
            }
            this.errStream.onError(new ComposerException(String.format("The upstream value: %s is not valid as per the condition specified in given validator: %s \nDownstream execution will stop now.", upstream, validator)));
            return this.switchTo(null);
        });
    }

    @Override
    public T thenFinish() {
        try {
            return this.await();
        }
        catch (Throwable t) {
            this.errStream.onError(t);
            return null;
        }
    }

    private static <R> Composer<R> newComposer(Future<R> future, ErrorStream errStream, ExecutorService executorService) {
        return new Composer<R>(future, errStream, executorService);
    }

    private <R> Composer<R> switchTo(Future<R> resultFuture) {
        return Composer.newComposer(resultFuture, this.errStream, this.taskExecutor);
    }

    private <R> Composer<R> chainWith(Callable<Composer<R>> composerSupplier) {
        if (this.future == null) {
            return this.switchTo(null);
        }
        try {
            return composerSupplier.call();
        }
        catch (Throwable t) {
            this.errStream.onError(t);
            return this.switchTo(null);
        }
    }

    private <R> Future<R> deferred(Callable<R> step) {
        return PARENT_EXECUTOR.submit(step);
    }

    private <R> Future<R> deferred(Runnable step, R result) {
        return PARENT_EXECUTOR.submit(step, result);
    }

    private <R> Future<R> async(Callable<R> task) {
        return this.taskExecutor.submit(task);
    }

    private <R> Future<R> async(Runnable task, R result) {
        return this.taskExecutor.submit(task, result);
    }

    private void async(Runnable task) {
        this.taskExecutor.submit(task);
    }

    private T await() throws InterruptedException, ExecutionException {
        if (this.future == null) {
            return null;
        }
        return this.future.get();
    }

    private CountDownLatch newLatch(int nTasks) {
        return new CountDownLatch(nTasks);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <R> R latchedTask(Callable<R> task, CountDownLatch latch) throws Exception {
        try {
            R r = task.call();
            return r;
        }
        finally {
            latch.countDown();
        }
    }

    private void latchedTask(Runnable task, CountDownLatch latch) {
        try {
            task.run();
        }
        finally {
            latch.countDown();
        }
    }

    private void uncheckedTask(SimpleTask task) {
        try {
            task.execute();
        }
        catch (Exception e) {
            throw new ComposerException(e);
        }
    }

    private <S> void uncheckedTask(ConsumingTask<S> task, S input) {
        try {
            task.consume(input);
        }
        catch (Exception e) {
            throw new ComposerException(e);
        }
    }
}

