package com.krupalshah.composer;

import com.krupalshah.composer.exception.ComposerException;
import com.krupalshah.composer.exception.ErrorStream;
import com.krupalshah.composer.function.collector.BiCollector;
import com.krupalshah.composer.function.collector.Collector;
import com.krupalshah.composer.function.collector.Distributor;
import com.krupalshah.composer.function.collector.TriCollector;
import com.krupalshah.composer.function.other.Supplier;
import com.krupalshah.composer.function.other.Validator;
import com.krupalshah.composer.function.tasks.ConsumingTask;
import com.krupalshah.composer.function.tasks.ProducingTask;
import com.krupalshah.composer.function.tasks.SimpleTask;
import com.krupalshah.composer.function.tasks.TransformingTask;
import com.krupalshah.composer.util.KnownFuture;
import com.krupalshah.composer.util.Pair;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/* loaded from: input_file:com/krupalshah/composer/Composer.class */
public class Composer<T> implements Composable<T> {
    private static final ExecutorService PARENT_EXECUTOR = Executors.newSingleThreadExecutor();
    private final ExecutorService taskExecutor;
    private final Future<T> future;
    private final ErrorStream errStream;

    private Composer(Future<T> future, ErrorStream errorStream, ExecutorService executorService) {
        this.future = future;
        this.errStream = errorStream;
        this.taskExecutor = executorService;
    }

    public static Composable<?> startWith(ErrorStream errorStream) {
        return startWith(errorStream, Executors.newCachedThreadPool());
    }

    public static <R> Composable<R> startWith(R r, ErrorStream errorStream) {
        return startWith(r, errorStream, Executors.newCachedThreadPool());
    }

    public static Composable<?> startWith(SimpleTask simpleTask, ErrorStream errorStream) {
        return startWith(simpleTask, errorStream, Executors.newCachedThreadPool());
    }

    public static <R> Composable<R> startWith(ProducingTask<R> producingTask, ErrorStream errorStream) {
        return startWith((ProducingTask) producingTask, errorStream, Executors.newCachedThreadPool());
    }

    public static Composable<?> startWith(ErrorStream errorStream, ExecutorService executorService) {
        try {
            return newComposer(new KnownFuture(new Object()), errorStream, executorService);
        } catch (Throwable th) {
            errorStream.onError(th);
            return newComposer(null, errorStream, executorService);
        }
    }

    public static <R> Composable<R> startWith(R r, ErrorStream errorStream, ExecutorService executorService) {
        try {
            return newComposer(new KnownFuture(r), errorStream, executorService);
        } catch (Throwable th) {
            errorStream.onError(th);
            return newComposer(null, errorStream, executorService);
        }
    }

    public static Composable<?> startWith(SimpleTask simpleTask, ErrorStream errorStream, ExecutorService executorService) {
        try {
            return newComposer(executorService.submit(() -> {
                try {
                    simpleTask.execute();
                } catch (Exception e) {
                    throw new ComposerException(e);
                }
            }), errorStream, executorService);
        } catch (Throwable th) {
            errorStream.onError(th);
            return newComposer(null, errorStream, executorService);
        }
    }

    public static <R> Composable<R> startWith(ProducingTask<R> producingTask, ErrorStream errorStream, ExecutorService executorService) {
        try {
            producingTask.getClass();
            return newComposer(executorService.submit(producingTask::produce), errorStream, executorService);
        } catch (Throwable th) {
            errorStream.onError(th);
            return newComposer(null, errorStream, executorService);
        }
    }

    private static <R> Composer<R> newComposer(Future<R> future, ErrorStream errorStream, ExecutorService executorService) {
        return new Composer<>(future, errorStream, executorService);
    }

    @Override // com.krupalshah.composer.Composable
    public Composable<T> thenRun(SimpleTask simpleTask) {
        return chainWith(() -> {
            T await = await();
            return await == null ? switchTo(null) : switchTo(async(() -> {
                uncheckedTask(simpleTask);
            }, await));
        });
    }

    @Override // com.krupalshah.composer.Composable
    public Composable<T> thenRunTogether(Supplier<Collection<SimpleTask>> supplier) {
        return chainWith(() -> {
            T await = await();
            if (await == null) {
                return switchTo(null);
            }
            Collection collection = (Collection) supplier.supply();
            return switchTo(deferred(() -> {
                uncheckedTask(() -> {
                    CountDownLatch newLatch = newLatch(collection.size());
                    Iterator it = collection.iterator();
                    while (it.hasNext()) {
                        SimpleTask simpleTask = (SimpleTask) it.next();
                        async(() -> {
                            latchedTask(() -> {
                                uncheckedTask(simpleTask);
                            }, newLatch);
                        });
                    }
                    newLatch.await();
                });
            }, await));
        });
    }

    @Override // com.krupalshah.composer.Composable
    public Composable<T> thenRunSynchronously(SimpleTask simpleTask) {
        return chainWith(() -> {
            await();
            simpleTask.execute();
            return this;
        });
    }

    @Override // com.krupalshah.composer.Composable
    public Composable<T> thenConsume(ConsumingTask<T> consumingTask) {
        return chainWith(() -> {
            T await = await();
            return await == null ? switchTo(null) : switchTo(async(() -> {
                uncheckedTask(consumingTask, await);
            }, await));
        });
    }

    @Override // com.krupalshah.composer.Composable
    public Composable<T> thenConsumeTogether(Supplier<Collection<ConsumingTask<T>>> supplier) {
        return chainWith(() -> {
            T await = await();
            if (await == null) {
                return switchTo(null);
            }
            Collection collection = (Collection) supplier.supply();
            return switchTo(deferred(() -> {
                uncheckedTask(() -> {
                    CountDownLatch newLatch = newLatch(collection.size());
                    Iterator it = collection.iterator();
                    while (it.hasNext()) {
                        ConsumingTask consumingTask = (ConsumingTask) it.next();
                        async(() -> {
                            latchedTask(() -> {
                                uncheckedTask(consumingTask, await);
                            }, newLatch);
                        });
                    }
                    newLatch.await();
                });
            }, await));
        });
    }

    @Override // com.krupalshah.composer.Composable
    public <S> Composable<T> thenConsumeForEachTogether(Distributor<T, Collection<S>> distributor, ConsumingTask<S> consumingTask) {
        return chainWith(() -> {
            Collection distribute;
            T await = await();
            if (await != null && (distribute = distributor.distribute(await)) != null) {
                return switchTo(deferred(() -> {
                    uncheckedTask(() -> {
                        CountDownLatch newLatch = newLatch(distribute.size());
                        for (Object obj : distribute) {
                            async(() -> {
                                latchedTask(() -> {
                                    uncheckedTask(consumingTask, obj);
                                }, newLatch);
                            });
                        }
                        newLatch.await();
                    });
                }, await));
            }
            return switchTo(null);
        });
    }

    @Override // com.krupalshah.composer.Composable
    public Composable<T> thenConsumeSynchronously(ConsumingTask<T> consumingTask) {
        return chainWith(() -> {
            T await = await();
            if (await == null) {
                return switchTo(null);
            }
            consumingTask.consume(await);
            return this;
        });
    }

    @Override // com.krupalshah.composer.Composable
    public <R> Composable<R> thenProduce(ProducingTask<R> producingTask) {
        return chainWith(() -> {
            await();
            producingTask.getClass();
            return switchTo(async(producingTask::produce));
        });
    }

    @Override // com.krupalshah.composer.Composable
    public <S, R> Composable<R> thenProduceTogether(Supplier<Collection<ProducingTask<S>>> supplier, Collector<T, Set<S>, R> collector) {
        return chainWith(() -> {
            T await = await();
            if (await == null) {
                return switchTo(null);
            }
            Collection collection = (Collection) supplier.supply();
            return switchTo(deferred(() -> {
                CountDownLatch newLatch = newLatch(collection.size());
                LinkedHashSet linkedHashSet = new LinkedHashSet();
                Iterator it = collection.iterator();
                while (it.hasNext()) {
                    ProducingTask producingTask = (ProducingTask) it.next();
                    linkedHashSet.add(async(() -> {
                        producingTask.getClass();
                        return latchedTask(producingTask::produce, newLatch);
                    }));
                }
                newLatch.await();
                LinkedHashSet linkedHashSet2 = new LinkedHashSet();
                Iterator it2 = linkedHashSet.iterator();
                while (it2.hasNext()) {
                    linkedHashSet2.add(((Future) it2.next()).get());
                }
                return collector.collect(await, linkedHashSet2);
            }));
        });
    }

    @Override // com.krupalshah.composer.Composable
    public <S, U, R> Composable<R> thenProduceTogether(ProducingTask<S> producingTask, ProducingTask<U> producingTask2, BiCollector<T, S, U, R> biCollector) {
        return chainWith(() -> {
            T await = await();
            return await == null ? switchTo(null) : switchTo(deferred(() -> {
                CountDownLatch newLatch = newLatch(2);
                Future<R> async = async(() -> {
                    producingTask.getClass();
                    return latchedTask(producingTask::produce, newLatch);
                });
                Future<R> async2 = async(() -> {
                    producingTask2.getClass();
                    return latchedTask(producingTask2::produce, newLatch);
                });
                newLatch.await();
                return biCollector.collect(await, async.get(), async2.get());
            }));
        });
    }

    @Override // com.krupalshah.composer.Composable
    public <S, U, V, R> Composable<R> thenProduceTogether(ProducingTask<S> producingTask, ProducingTask<U> producingTask2, ProducingTask<V> producingTask3, TriCollector<T, S, U, V, R> triCollector) {
        return chainWith(() -> {
            T await = await();
            return await == null ? switchTo(null) : switchTo(deferred(() -> {
                CountDownLatch newLatch = newLatch(3);
                Future<R> async = async(() -> {
                    producingTask.getClass();
                    return latchedTask(producingTask::produce, newLatch);
                });
                Future<R> async2 = async(() -> {
                    producingTask2.getClass();
                    return latchedTask(producingTask2::produce, newLatch);
                });
                Future<R> async3 = async(() -> {
                    producingTask3.getClass();
                    return latchedTask(producingTask3::produce, newLatch);
                });
                newLatch.await();
                return triCollector.collect(await, async.get(), async2.get(), async3.get());
            }));
        });
    }

    @Override // com.krupalshah.composer.Composable
    public <R> Composable<R> thenProduceSynchronously(ProducingTask<R> producingTask) {
        return chainWith(() -> {
            await();
            return switchTo(new KnownFuture(producingTask.produce()));
        });
    }

    @Override // com.krupalshah.composer.Composable
    public <R> Composable<R> thenTransform(TransformingTask<T, R> transformingTask) {
        return chainWith(() -> {
            T await = await();
            return await == null ? switchTo(null) : switchTo(async(() -> {
                return transformingTask.transform(await);
            }));
        });
    }

    @Override // com.krupalshah.composer.Composable
    public <S, R> Composable<R> thenTransformTogether(Supplier<Collection<TransformingTask<T, S>>> supplier, Collector<T, Set<S>, R> collector) {
        return chainWith(() -> {
            T await = await();
            if (await == null) {
                return switchTo(null);
            }
            Collection collection = (Collection) supplier.supply();
            return switchTo(deferred(() -> {
                CountDownLatch newLatch = newLatch(collection.size());
                ArrayList arrayList = new ArrayList();
                Iterator it = collection.iterator();
                while (it.hasNext()) {
                    TransformingTask transformingTask = (TransformingTask) it.next();
                    arrayList.add(async(() -> {
                        return latchedTask(() -> {
                            return transformingTask.transform(await);
                        }, newLatch);
                    }));
                }
                newLatch.await();
                LinkedHashSet linkedHashSet = new LinkedHashSet();
                Iterator it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    linkedHashSet.add(((Future) it2.next()).get());
                }
                return collector.collect(await, linkedHashSet);
            }));
        });
    }

    @Override // com.krupalshah.composer.Composable
    public <S, U, R> Composable<R> thenTransformTogether(TransformingTask<T, S> transformingTask, TransformingTask<T, U> transformingTask2, BiCollector<T, S, U, R> biCollector) {
        return chainWith(() -> {
            T await = await();
            return await == null ? switchTo(null) : switchTo(deferred(() -> {
                CountDownLatch newLatch = newLatch(2);
                Future<R> async = async(() -> {
                    return latchedTask(() -> {
                        return transformingTask.transform(await);
                    }, newLatch);
                });
                Future<R> async2 = async(() -> {
                    return latchedTask(() -> {
                        return transformingTask2.transform(await);
                    }, newLatch);
                });
                newLatch.await();
                return biCollector.collect(await, async.get(), async2.get());
            }));
        });
    }

    @Override // com.krupalshah.composer.Composable
    public <S, U, V, R> Composable<R> thenTransformTogether(TransformingTask<T, S> transformingTask, TransformingTask<T, U> transformingTask2, TransformingTask<T, V> transformingTask3, TriCollector<T, S, U, V, R> triCollector) {
        return chainWith(() -> {
            T await = await();
            return await == null ? switchTo(null) : switchTo(deferred(() -> {
                CountDownLatch newLatch = newLatch(3);
                Future<R> async = async(() -> {
                    return latchedTask(() -> {
                        return transformingTask.transform(await);
                    }, newLatch);
                });
                Future<R> async2 = async(() -> {
                    return latchedTask(() -> {
                        return transformingTask2.transform(await);
                    }, newLatch);
                });
                Future<R> async3 = async(() -> {
                    return latchedTask(() -> {
                        return transformingTask3.transform(await);
                    }, newLatch);
                });
                newLatch.await();
                return triCollector.collect(await, async.get(), async2.get(), async3.get());
            }));
        });
    }

    @Override // com.krupalshah.composer.Composable
    public <S, U, R> Composable<R> thenTransformForEachTogether(Distributor<T, Collection<S>> distributor, TransformingTask<S, U> transformingTask, Collector<T, Set<Pair<S, U>>, R> collector) {
        return chainWith(() -> {
            Collection distribute;
            T await = await();
            if (await != null && (distribute = distributor.distribute(await)) != null) {
                return switchTo(deferred(() -> {
                    CountDownLatch newLatch = newLatch(distribute.size());
                    LinkedHashSet<Pair> linkedHashSet = new LinkedHashSet();
                    for (Object obj : distribute) {
                        linkedHashSet.add(Pair.create(obj, async(() -> {
                            return latchedTask(() -> {
                                return transformingTask.transform(obj);
                            }, newLatch);
                        })));
                    }
                    newLatch.await();
                    LinkedHashSet linkedHashSet2 = new LinkedHashSet();
                    for (Pair pair : linkedHashSet) {
                        linkedHashSet2.add(Pair.create(pair.getKey(), ((Future) pair.getValue()).get()));
                    }
                    return collector.collect(await, linkedHashSet2);
                }));
            }
            return switchTo(null);
        });
    }

    @Override // com.krupalshah.composer.Composable
    public <R> Composable<R> thenTransformSynchronously(TransformingTask<T, R> transformingTask) {
        return chainWith(() -> {
            T await = await();
            return await == null ? switchTo(null) : switchTo(new KnownFuture(transformingTask.transform(await)));
        });
    }

    @Override // com.krupalshah.composer.Composable
    public Composable<T> thenContinueIf(Validator<T> validator) {
        return chainWith(() -> {
            T await = await();
            if (await == null) {
                return switchTo(null);
            }
            if (validator.validate(await)) {
                return this;
            }
            this.errStream.onError(new ComposerException(String.format("The upstream value: %s is not valid as per the condition specified in given validator: %s \nDownstream execution will stop now.", await, validator)));
            return switchTo(null);
        });
    }

    @Override // com.krupalshah.composer.Composable
    public void thenFinish() {
        try {
            await();
        } catch (Throwable th) {
            this.errStream.onError(th);
        }
    }

    @Override // com.krupalshah.composer.Composable
    public void thenFinish(ConsumingTask<T> consumingTask) {
        try {
            consumingTask.consume(await());
        } catch (Throwable th) {
            this.errStream.onError(th);
        }
    }

    private <R> Composer<R> switchTo(Future<R> future) {
        return newComposer(future, this.errStream, this.taskExecutor);
    }

    private <R> Composer<R> chainWith(Callable<Composer<R>> callable) {
        if (this.future == null) {
            return switchTo(null);
        }
        try {
            return callable.call();
        } catch (Throwable th) {
            this.errStream.onError(th);
            return switchTo(null);
        }
    }

    private <R> Future<R> deferred(Callable<R> callable) {
        return PARENT_EXECUTOR.submit(callable);
    }

    private <R> Future<R> deferred(Runnable runnable, R r) {
        return PARENT_EXECUTOR.submit(runnable, r);
    }

    private <R> Future<R> async(Callable<R> callable) {
        return this.taskExecutor.submit(callable);
    }

    private <R> Future<R> async(Runnable runnable, R r) {
        return this.taskExecutor.submit(runnable, r);
    }

    private void async(Runnable runnable) {
        this.taskExecutor.submit(runnable);
    }

    private T await() throws InterruptedException, ExecutionException {
        if (this.future == null) {
            return null;
        }
        return this.future.get();
    }

    private CountDownLatch newLatch(int i) {
        return new CountDownLatch(i);
    }

    private <R> R latchedTask(Callable<R> callable, CountDownLatch countDownLatch) throws Exception {
        try {
            R call = callable.call();
            countDownLatch.countDown();
            return call;
        } catch (Throwable th) {
            countDownLatch.countDown();
            throw th;
        }
    }

    private void latchedTask(Runnable runnable, CountDownLatch countDownLatch) {
        try {
            runnable.run();
        } finally {
            countDownLatch.countDown();
        }
    }

    private void uncheckedTask(SimpleTask simpleTask) {
        try {
            simpleTask.execute();
        } catch (Exception e) {
            throw new ComposerException(e);
        }
    }

    private <S> void uncheckedTask(ConsumingTask<S> consumingTask, S s) {
        try {
            consumingTask.consume(s);
        } catch (Exception e) {
            throw new ComposerException(e);
        }
    }
}
