/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.internal.batchimport.executor;

import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.neo4j.function.Suppliers;
import org.neo4j.internal.batchimport.executor.ProcessorScheduler;
import org.neo4j.internal.batchimport.executor.Task;
import org.neo4j.internal.batchimport.executor.TaskExecutionPanicException;
import org.neo4j.internal.batchimport.executor.TaskExecutor;

public class DynamicTaskExecutor<LOCAL>
implements TaskExecutor<LOCAL> {
    private final BlockingQueue<Task<LOCAL>> queue;
    private final String processorThreadNamePrefix;
    private volatile Processor[] processors = (Processor[])Array.newInstance(Processor.class, 0);
    private volatile boolean shutDown;
    private final AtomicReference<Throwable> panic = new AtomicReference();
    private final Supplier<LOCAL> initialLocalState;
    private final int maxProcessorCount;
    private final ProcessorScheduler scheduler;

    public DynamicTaskExecutor(int initialProcessorCount, int maxProcessorCount, int maxQueueSize, String processorThreadNamePrefix, Supplier<LOCAL> initialLocalState, ProcessorScheduler scheduler) {
        this.maxProcessorCount = maxProcessorCount == 0 ? Integer.MAX_VALUE : maxProcessorCount;
        this.scheduler = scheduler;
        assert (this.maxProcessorCount >= initialProcessorCount) : "Unexpected initial processor count " + initialProcessorCount + " for max " + maxProcessorCount;
        this.processorThreadNamePrefix = processorThreadNamePrefix;
        this.initialLocalState = initialLocalState;
        this.queue = new ArrayBlockingQueue<Task<LOCAL>>(maxQueueSize);
        this.processors(initialProcessorCount);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int processors(int delta) {
        if (this.shutDown || delta == 0) {
            return this.processors.length;
        }
        DynamicTaskExecutor dynamicTaskExecutor = this;
        synchronized (dynamicTaskExecutor) {
            if (this.shutDown) {
                return this.processors.length;
            }
            int requestedNumber = this.processors.length + delta;
            if (delta > 0) {
                if ((requestedNumber = Integer.min(requestedNumber, this.maxProcessorCount)) > this.processors.length) {
                    Processor[] newProcessors = Arrays.copyOf(this.processors, requestedNumber);
                    for (int i = this.processors.length; i < requestedNumber; ++i) {
                        newProcessors[i] = new Processor();
                        this.scheduler.schedule(newProcessors[i], this.processorThreadNamePrefix + "-" + i);
                    }
                    this.processors = newProcessors;
                }
            } else if ((requestedNumber = Integer.max(1, requestedNumber)) < this.processors.length) {
                Processor[] newProcessors = Arrays.copyOf(this.processors, requestedNumber);
                for (int i = newProcessors.length; i < this.processors.length; ++i) {
                    this.processors[i].processorShutDown = true;
                }
                this.processors = newProcessors;
            }
            return this.processors.length;
        }
    }

    @Override
    public void submit(Task<LOCAL> task) {
        this.assertHealthy();
        try {
            while (!this.queue.offer(task, 10L, TimeUnit.MILLISECONDS)) {
                this.assertHealthy();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public void assertHealthy() {
        Throwable panic = this.panic.get();
        if (panic != null) {
            throw new TaskExecutionPanicException("Executor has been shut down in panic", panic);
        }
    }

    @Override
    public void receivePanic(Throwable cause) {
        this.panic.compareAndSet(null, cause);
    }

    @Override
    public synchronized void close() {
        if (this.shutDown) {
            return;
        }
        this.shutDown = true;
        try {
            for (Processor processor : this.processors) {
                while (!processor.endSignal.await(1L, TimeUnit.SECONDS)) {
                    if (this.panic.get() == null) continue;
                    return;
                }
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public static <T> Supplier<T> noLocalState() {
        return Suppliers.singleton(null);
    }

    private class Processor
    implements Runnable {
        private volatile boolean processorShutDown;
        private final CountDownLatch endSignal = new CountDownLatch(1);

        private Processor() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        @Override
        public void run() {
            try {
                Object threadLocalState = DynamicTaskExecutor.this.initialLocalState.get();
                while (this.shouldContinue()) {
                    Task task;
                    try {
                        task = DynamicTaskExecutor.this.queue.poll(1L, TimeUnit.MILLISECONDS);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                    if (task == null) continue;
                    try {
                        task.run(threadLocalState);
                    }
                    catch (Throwable e) {
                        DynamicTaskExecutor.this.receivePanic(e);
                        throw new RuntimeException(e);
                        return;
                    }
                }
            }
            finally {
                this.endSignal.countDown();
            }
        }

        private boolean shouldContinue() {
            if (this.processorShutDown || DynamicTaskExecutor.this.panic.get() != null) {
                return false;
            }
            return !DynamicTaskExecutor.this.shutDown || !DynamicTaskExecutor.this.queue.isEmpty();
        }
    }
}

