/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.frame.processor;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.frame.processor.AwaitAnyWidget;
import org.apache.druid.frame.processor.Bouncer;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.processor.ReturnOrAwait;
import org.apache.druid.frame.processor.RunAllFullyWidget;
import org.apache.druid.frame.processor.RunnableFrameProcessor;
import org.apache.druid.frame.processor.manager.ProcessorManager;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;

public class FrameProcessorExecutor {
    private static final Logger log = new Logger(FrameProcessorExecutor.class);
    private final ListeningExecutorService exec;
    private final Object lock = new Object();
    @GuardedBy(value="lock")
    private final Set<String> activeCancellationIds = new HashSet<String>();
    @GuardedBy(value="lock")
    private final SetMultimap<String, ListenableFuture<?>> cancelableFutures = Multimaps.newSetMultimap(new HashMap(), Sets::newIdentityHashSet);
    @GuardedBy(value="lock")
    private final SetMultimap<String, ListenableFuture<?>> cancelableReturnFutures = Multimaps.newSetMultimap(new HashMap(), Sets::newIdentityHashSet);
    @GuardedBy(value="lock")
    private final SetMultimap<String, FrameProcessor<?>> cancelableProcessors = Multimaps.newSetMultimap(new HashMap(), Sets::newIdentityHashSet);
    @GuardedBy(value="lock")
    private final Map<FrameProcessor<?>, Thread> runningProcessors = new IdentityHashMap();

    public FrameProcessorExecutor(ListeningExecutorService exec) {
        this.exec = exec;
    }

    public <T> ListenableFuture<T> runFully(final FrameProcessor<T> processor, final @Nullable String cancellationId) {
        final List<ReadableFrameChannel> inputChannels = processor.inputChannels();
        final List<WritableFrameChannel> outputChannels = processor.outputChannels();
        final SettableFuture finished = this.registerCancelableFuture(SettableFuture.create(), true, cancellationId);
        if (finished.isDone()) {
            return finished;
        }
        class ExecutorRunnable
        implements Runnable {
            private final AwaitAnyWidget awaitAnyWidget;

            ExecutorRunnable() {
                this.awaitAnyWidget = new AwaitAnyWidget(inputChannels);
            }

            @Override
            public void run() {
                try {
                    List<ListenableFuture<?>> allWritabilityFutures = this.gatherWritabilityFutures();
                    List writabilityFuturesToWaitFor = allWritabilityFutures.stream().filter(f -> !f.isDone()).collect(Collectors.toList());
                    FrameProcessorExecutor.logProcessorStatusString(processor, finished, allWritabilityFutures);
                    if (!writabilityFuturesToWaitFor.isEmpty()) {
                        this.runProcessorAfterFutureResolves(Futures.allAsList(writabilityFuturesToWaitFor), false);
                        return;
                    }
                    Optional maybeResult = this.runProcessorNow();
                    if (!maybeResult.isPresent()) {
                        return;
                    }
                    ReturnOrAwait result = maybeResult.get();
                    FrameProcessorExecutor.logProcessorStatusString(processor, finished, null);
                    if (result.isReturn()) {
                        this.succeed(result.value());
                    } else if (result.hasAwaitableFutures()) {
                        this.runProcessorAfterFutureResolves(Futures.allAsList(result.awaitableFutures()), true);
                    } else {
                        assert (result.hasAwaitableChannels());
                        IntSet await = result.awaitableChannels();
                        if (await.isEmpty()) {
                            FrameProcessorExecutor.this.exec.execute((Runnable)this);
                        } else if (result.isAwaitAllChannels() || await.size() == 1) {
                            ArrayList readabilityFutures = new ArrayList();
                            IntIterator intIterator = await.iterator();
                            while (intIterator.hasNext()) {
                                int channelNumber = (Integer)intIterator.next();
                                ReadableFrameChannel channel = (ReadableFrameChannel)inputChannels.get(channelNumber);
                                if (channel.isFinished() || channel.canRead()) continue;
                                readabilityFutures.add(channel.readabilityFuture());
                            }
                            if (readabilityFutures.isEmpty()) {
                                FrameProcessorExecutor.this.exec.execute((Runnable)this);
                            } else {
                                this.runProcessorAfterFutureResolves(Futures.allAsList(readabilityFutures), false);
                            }
                        } else {
                            this.runProcessorAfterFutureResolves(this.awaitAnyWidget.awaitAny(await), false);
                        }
                    }
                }
                catch (Throwable e) {
                    this.fail(e);
                }
            }

            private List<ListenableFuture<?>> gatherWritabilityFutures() {
                ArrayList futures = new ArrayList();
                for (WritableFrameChannel channel : outputChannels) {
                    futures.add(channel.writabilityFuture());
                }
                return futures;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            private Optional<ReturnOrAwait<T>> runProcessorNow() {
                Either<Throwable, Object> retVal;
                IntOpenHashSet readableInputs = new IntOpenHashSet(inputChannels.size());
                for (int i = 0; i < inputChannels.size(); ++i) {
                    ReadableFrameChannel channel = (ReadableFrameChannel)inputChannels.get(i);
                    if (!channel.isFinished() && !channel.canRead()) continue;
                    readableInputs.add(i);
                }
                if (cancellationId != null) {
                    Object i = FrameProcessorExecutor.this.lock;
                    synchronized (i) {
                        if (!FrameProcessorExecutor.this.cancelableProcessors.containsEntry((Object)cancellationId, (Object)processor)) {
                            return Optional.empty();
                        }
                        FrameProcessorExecutor.this.runningProcessors.put(processor, Thread.currentThread());
                    }
                }
                String threadName = Thread.currentThread().getName();
                boolean canceled = false;
                try {
                    if (Thread.interrupted()) {
                        throw new InterruptedException();
                    }
                    if (cancellationId != null) {
                        Thread.currentThread().setName(threadName + "-" + cancellationId);
                    }
                    retVal = Either.value(processor.runIncrementally((IntSet)readableInputs));
                }
                catch (Throwable e) {
                    retVal = Either.error(e);
                }
                finally {
                    if (cancellationId != null) {
                        Object object = FrameProcessorExecutor.this.lock;
                        synchronized (object) {
                            if (Thread.interrupted()) {
                                // empty if block
                            }
                            FrameProcessorExecutor.this.runningProcessors.remove(processor);
                            FrameProcessorExecutor.this.lock.notifyAll();
                            if (!FrameProcessorExecutor.this.cancelableProcessors.containsEntry((Object)cancellationId, (Object)processor)) {
                                canceled = true;
                            }
                        }
                        Thread.currentThread().setName(threadName);
                    }
                }
                if (canceled) {
                    return Optional.empty();
                }
                return Optional.of((ReturnOrAwait)retVal.valueOrThrow());
            }

            private <V> void runProcessorAfterFutureResolves(ListenableFuture<V> future, final boolean failOnCancel) {
                final ListenableFuture<V> cancelableFuture = FrameProcessorExecutor.this.registerCancelableFuture(future, false, cancellationId);
                Futures.addCallback(cancelableFuture, (FutureCallback)new FutureCallback<V>(){

                    public void onSuccess(V ignored) {
                        try {
                            FrameProcessorExecutor.this.exec.execute((Runnable)this);
                        }
                        catch (Throwable e) {
                            this.fail(e);
                        }
                    }

                    public void onFailure(Throwable t) {
                        if (failOnCancel || !cancelableFuture.isCancelled()) {
                            this.fail(t);
                        }
                    }
                }, (Executor)MoreExecutors.directExecutor());
            }

            private void succeed(T value) {
                try {
                    this.doProcessorCleanup();
                }
                catch (Throwable e) {
                    finished.setException(e);
                    return;
                }
                finished.set(value);
            }

            private void fail(Throwable e) {
                for (WritableFrameChannel outputChannel : outputChannels) {
                    try {
                        outputChannel.fail(e);
                    }
                    catch (Throwable e1) {
                        e.addSuppressed(e1);
                    }
                }
                try {
                    this.doProcessorCleanup();
                }
                catch (Throwable e1) {
                    e.addSuppressed(e1);
                }
                finished.setException(e);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            void doProcessorCleanup() throws IOException {
                boolean doCleanup;
                if (cancellationId != null) {
                    Object object = FrameProcessorExecutor.this.lock;
                    synchronized (object) {
                        doCleanup = FrameProcessorExecutor.this.cancelableProcessors.remove((Object)cancellationId, (Object)processor);
                    }
                } else {
                    doCleanup = true;
                }
                if (doCleanup) {
                    processor.cleanup();
                }
            }
        }
        ExecutorRunnable runnable = new ExecutorRunnable();
        finished.addListener(() -> {
            FrameProcessorExecutor.logProcessorStatusString(processor, finished, null);
            if (finished.isCancelled() && cancellationId != null) {
                boolean didRemoveFromCancelableProcessors;
                Object object = this.lock;
                synchronized (object) {
                    didRemoveFromCancelableProcessors = this.cancelableProcessors.remove((Object)cancellationId, (Object)processor);
                }
                if (didRemoveFromCancelableProcessors) {
                    try {
                        this.cancel(Collections.singleton(processor));
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }, (Executor)Execs.directExecutor());
        FrameProcessorExecutor.logProcessorStatusString(processor, finished, null);
        this.registerCancelableProcessor(processor, cancellationId);
        this.exec.execute((Runnable)runnable);
        return finished;
    }

    public <T, R> ListenableFuture<R> runAllFully(ProcessorManager<T, R> processorManager, int maxOutstandingProcessors, Bouncer bouncer, @Nullable String cancellationId) {
        return new RunAllFullyWidget<T, R>(processorManager, this, maxOutstandingProcessors, bouncer, cancellationId).run();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerCancellationId(String cancellationId) {
        Object object = this.lock;
        synchronized (object) {
            this.activeCancellationIds.add(cancellationId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancel(String cancellationId) throws InterruptedException {
        Set returnFuturesToCancel;
        Set processorsToCancel;
        Set futuresToCancel;
        Preconditions.checkNotNull((Object)cancellationId, (Object)"cancellationId");
        Iterator iterator = this.lock;
        synchronized (iterator) {
            this.activeCancellationIds.remove(cancellationId);
            futuresToCancel = this.cancelableFutures.removeAll((Object)cancellationId);
            processorsToCancel = this.cancelableProcessors.removeAll((Object)cancellationId);
            returnFuturesToCancel = this.cancelableReturnFutures.removeAll((Object)cancellationId);
        }
        for (ListenableFuture future : futuresToCancel) {
            future.cancel(true);
        }
        this.cancel(processorsToCancel);
        for (ListenableFuture future : returnFuturesToCancel) {
            future.cancel(true);
        }
    }

    public Executor asExecutor(@Nullable String cancellationId) {
        return command -> this.runFully(new RunnableFrameProcessor(command), cancellationId);
    }

    public void shutdownNow() {
        this.exec.shutdownNow();
    }

    ListeningExecutorService getExecutorService() {
        return this.exec;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    <T, FutureType extends ListenableFuture<T>> FutureType registerCancelableFuture(FutureType future, boolean isReturn, @Nullable String cancellationId) {
        if (cancellationId != null) {
            Object object = this.lock;
            synchronized (object) {
                if (!this.activeCancellationIds.contains(cancellationId)) {
                    future.cancel(true);
                    return future;
                }
                SetMultimap<String, ListenableFuture<?>> map = isReturn ? this.cancelableReturnFutures : this.cancelableFutures;
                map.put((Object)cancellationId, future);
                future.addListener(() -> {
                    Object object = this.lock;
                    synchronized (object) {
                        map.remove((Object)cancellationId, (Object)future);
                    }
                }, (Executor)Execs.directExecutor());
            }
        }
        return future;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    int cancelableProcessorCount() {
        Object object = this.lock;
        synchronized (object) {
            return this.cancelableProcessors.size();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cancel(Set<FrameProcessor<?>> processorsToCancel) throws InterruptedException {
        Iterator<FrameProcessor<?>> iterator = this.lock;
        synchronized (iterator) {
            for (FrameProcessor<?> processor : processorsToCancel) {
                Thread processorThread = this.runningProcessors.get(processor);
                if (processorThread == null) continue;
                processorThread.interrupt();
            }
            while (this.anyIsRunning(processorsToCancel)) {
                this.lock.wait();
            }
        }
        for (FrameProcessor<?> processor : processorsToCancel) {
            for (WritableFrameChannel outputChannel : processor.outputChannels()) {
                try {
                    outputChannel.fail(new CancellationException("Canceled"));
                }
                catch (Throwable e) {
                    log.debug(e, "Exception encountered while marking output channel failed for processor [%s]", processor);
                }
            }
            try {
                processor.cleanup();
            }
            catch (Throwable e) {
                log.noStackTrace().warn(e, "Exception encountered while canceling processor [%s]", processor);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> void registerCancelableProcessor(FrameProcessor<T> processor, @Nullable String cancellationId) {
        if (cancellationId != null) {
            Object object = this.lock;
            synchronized (object) {
                this.cancelableProcessors.put((Object)cancellationId, processor);
            }
        }
    }

    private static <T> void logProcessorStatusString(FrameProcessor<T> processor, ListenableFuture<?> finishedFuture, @Nullable List<ListenableFuture<?>> writabilityFutures) {
        if (log.isDebugEnabled()) {
            StringBuilder sb = new StringBuilder().append("Processor [").append(processor).append("]; in=[");
            for (ReadableFrameChannel readableFrameChannel : processor.inputChannels()) {
                if (readableFrameChannel.canRead()) {
                    sb.append("R");
                    continue;
                }
                if (readableFrameChannel.isFinished()) {
                    sb.append("D");
                    continue;
                }
                sb.append("~");
            }
            sb.append("]");
            if (writabilityFutures != null) {
                sb.append("; out=[");
                for (ListenableFuture listenableFuture : writabilityFutures) {
                    if (listenableFuture.isDone()) {
                        sb.append("W");
                        continue;
                    }
                    sb.append("~");
                }
                sb.append("]");
            }
            sb.append("; cancel=").append(finishedFuture.isCancelled() ? "y" : "n");
            sb.append("; done=").append(finishedFuture.isDone() ? "y" : "n");
            log.debug(StringUtils.encodeForFormat(sb.toString()), new Object[0]);
        }
    }

    @GuardedBy(value="lock")
    private boolean anyIsRunning(Set<FrameProcessor<?>> processors) {
        for (FrameProcessor<?> processor : processors) {
            if (!this.runningProcessors.containsKey(processor)) continue;
            return true;
        }
        return false;
    }
}

