/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.kernel.impl.api.index;

import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
import org.neo4j.function.Predicates;
import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.api.exceptions.index.IndexPopulationFailedKernelException;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.impl.api.index.IndexStoreView;
import org.neo4j.kernel.impl.api.index.MultipleIndexPopulator;
import org.neo4j.kernel.impl.api.index.StoreScan;
import org.neo4j.logging.LogProvider;
import org.neo4j.util.FeatureToggles;

public class BatchingMultipleIndexPopulator
extends MultipleIndexPopulator {
    static final String TASK_QUEUE_SIZE_NAME = "task_queue_size";
    static final String AWAIT_TIMEOUT_MINUTES_NAME = "await_timeout_minutes";
    static final String MAXIMUM_NUMBER_OF_WORKERS_NAME = "population_workers_maximum";
    private static final String EOL = System.lineSeparator();
    private static final String FLUSH_THREAD_NAME_PREFIX = "Index Population Flush Thread";
    private final int MAXIMUM_NUMBER_OF_WORKERS = FeatureToggles.getInteger(this.getClass(), (String)"population_workers_maximum", (int)(Runtime.getRuntime().availableProcessors() - 1));
    private final int TASK_QUEUE_SIZE = FeatureToggles.getInteger(this.getClass(), (String)"task_queue_size", (int)(this.getNumberOfPopulationWorkers() * 2));
    private final int AWAIT_TIMEOUT_MINUTES = FeatureToggles.getInteger(this.getClass(), (String)"await_timeout_minutes", (int)30);
    private final AtomicLong activeTasks = new AtomicLong();
    private final ExecutorService executor;

    BatchingMultipleIndexPopulator(IndexStoreView storeView, LogProvider logProvider) {
        super(storeView, logProvider);
        this.executor = this.createThreadPool();
    }

    BatchingMultipleIndexPopulator(IndexStoreView storeView, ExecutorService executor, LogProvider logProvider) {
        super(storeView, logProvider);
        this.executor = executor;
    }

    @Override
    public StoreScan<IndexPopulationFailedKernelException> indexAllNodes() {
        StoreScan<IndexPopulationFailedKernelException> storeScan = super.indexAllNodes();
        return new BatchingStoreScan<IndexPopulationFailedKernelException>(storeScan);
    }

    @Override
    protected void populateFromQueue(long currentlyIndexedNodeId) {
        this.log.debug("Populating from queue." + EOL + this);
        this.flushAll();
        this.awaitCompletion();
        super.populateFromQueue(currentlyIndexedNodeId);
        this.log.debug("Drained queue and all batched updates." + EOL + this);
    }

    public String toString() {
        String updatesString = this.populations.stream().map(population -> population.batchedUpdates.size() + " updates").collect(Collectors.joining(", ", "[", "]"));
        return "BatchingMultipleIndexPopulator{activeTasks=" + this.activeTasks + ", executor=" + this.executor + ", batchedUpdates = " + updatesString + ", queuedUpdates = " + this.queue.size() + "}";
    }

    private void awaitCompletion() {
        try {
            this.log.debug("Waiting " + this.AWAIT_TIMEOUT_MINUTES + " minutes for all submitted and active flush tasks to complete." + EOL + this);
            BooleanSupplier allSubmittedTasksCompleted = () -> this.activeTasks.get() == 0L;
            Predicates.await((BooleanSupplier)allSubmittedTasksCompleted, (long)this.AWAIT_TIMEOUT_MINUTES, (TimeUnit)TimeUnit.MINUTES);
        }
        catch (TimeoutException e) {
            this.handleTimeout();
        }
    }

    @Override
    protected void flush(MultipleIndexPopulator.IndexPopulation population) {
        this.activeTasks.incrementAndGet();
        Collection<IndexEntryUpdate<?>> batch = population.takeCurrentBatch();
        this.executor.execute(() -> {
            try {
                population.populator.add(batch);
            }
            catch (Throwable failure) {
                this.fail(population, failure);
            }
            finally {
                this.activeTasks.decrementAndGet();
            }
        });
    }

    private void shutdownExecutor(boolean now) {
        this.log.info((now ? "Forcefully shutting" : "Shutting") + " down executor." + EOL + this);
        if (now) {
            this.executor.shutdownNow();
        } else {
            this.executor.shutdown();
        }
        try {
            boolean tasksCompleted = this.executor.awaitTermination(this.AWAIT_TIMEOUT_MINUTES, TimeUnit.MINUTES);
            if (!tasksCompleted) {
                this.handleTimeout();
            }
        }
        catch (InterruptedException e) {
            this.handleInterrupt();
        }
    }

    private void handleTimeout() {
        throw new IllegalStateException("Index population tasks were not able to complete in " + this.AWAIT_TIMEOUT_MINUTES + " minutes." + EOL + this + EOL + BatchingMultipleIndexPopulator.allStackTraces());
    }

    private void handleInterrupt() {
        Thread.currentThread().interrupt();
        this.log.warn("Interrupted while waiting for index population tasks to complete." + EOL + this);
    }

    private ExecutorService createThreadPool() {
        int threads = this.getNumberOfPopulationWorkers();
        LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(this.TASK_QUEUE_SIZE);
        NamedThreadFactory threadFactory = NamedThreadFactory.daemon(FLUSH_THREAD_NAME_PREFIX);
        ThreadPoolExecutor.CallerRunsPolicy rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
        return new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory, rejectedExecutionHandler);
    }

    private static String allStackTraces() {
        return Thread.getAllStackTraces().entrySet().stream().map(entry -> Exceptions.stringify((Thread)((Thread)entry.getKey()), (StackTraceElement[])((StackTraceElement[])entry.getValue()))).collect(Collectors.joining());
    }

    private int getNumberOfPopulationWorkers() {
        return Math.max(2, this.MAXIMUM_NUMBER_OF_WORKERS);
    }

    private class BatchingStoreScan<E extends Exception>
    extends MultipleIndexPopulator.DelegatingStoreScan<E> {
        BatchingStoreScan(StoreScan<E> delegate) {
            super(delegate);
        }

        @Override
        public void run() throws E {
            try {
                super.run();
                BatchingMultipleIndexPopulator.this.log.info("Completed node store scan. Flushing all pending updates." + EOL + BatchingMultipleIndexPopulator.this);
                BatchingMultipleIndexPopulator.this.flushAll();
            }
            catch (Throwable scanError) {
                try {
                    BatchingMultipleIndexPopulator.this.shutdownExecutor(true);
                }
                catch (Throwable error) {
                    scanError.addSuppressed(error);
                }
                throw scanError;
            }
            BatchingMultipleIndexPopulator.this.shutdownExecutor(false);
        }
    }
}

