/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.job.process.autodetect;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.CountingInputStream;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriter;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriterFactory;

public class AutodetectCommunicator
implements Closeable {
    private static final Logger LOGGER = Loggers.getLogger(AutodetectCommunicator.class);
    private static final Duration FLUSH_PROCESS_CHECK_FREQUENCY = Duration.ofSeconds(1L);
    private final Job job;
    private final OpenJobAction.JobTask jobTask;
    private final AutodetectProcess autodetectProcess;
    private final StateStreamer stateStreamer;
    private final DataCountsReporter dataCountsReporter;
    private final AutoDetectResultProcessor autoDetectResultProcessor;
    private final Consumer<Exception> onFinishHandler;
    private final ExecutorService autodetectWorkerExecutor;
    private final NamedXContentRegistry xContentRegistry;
    private volatile boolean processKilled;

    AutodetectCommunicator(Job job, OpenJobAction.JobTask jobTask, AutodetectProcess process, StateStreamer stateStreamer, DataCountsReporter dataCountsReporter, AutoDetectResultProcessor autoDetectResultProcessor, Consumer<Exception> onFinishHandler, NamedXContentRegistry xContentRegistry, ExecutorService autodetectWorkerExecutor) {
        this.job = job;
        this.jobTask = jobTask;
        this.autodetectProcess = process;
        this.stateStreamer = stateStreamer;
        this.dataCountsReporter = dataCountsReporter;
        this.autoDetectResultProcessor = autoDetectResultProcessor;
        this.onFinishHandler = onFinishHandler;
        this.xContentRegistry = xContentRegistry;
        this.autodetectWorkerExecutor = autodetectWorkerExecutor;
    }

    public void init(ModelSnapshot modelSnapshot) throws IOException {
        this.autodetectProcess.restoreState(this.stateStreamer, modelSnapshot);
        this.createProcessWriter(Optional.empty()).writeHeader();
    }

    private DataToProcessWriter createProcessWriter(Optional<DataDescription> dataDescription) {
        return DataToProcessWriterFactory.create(true, this.autodetectProcess, dataDescription.orElse(this.job.getDataDescription()), this.job.getAnalysisConfig(), this.dataCountsReporter, this.xContentRegistry);
    }

    public void writeToJob(InputStream inputStream, XContentType xContentType, DataLoadParams params, BiConsumer<DataCounts, Exception> handler) {
        this.submitOperation(() -> {
            if (params.isResettingBuckets()) {
                this.autodetectProcess.writeResetBucketsControlMessage(params);
            }
            CountingInputStream countingStream = new CountingInputStream(inputStream, this.dataCountsReporter);
            DataToProcessWriter autoDetectWriter = this.createProcessWriter(params.getDataDescription());
            CountDownLatch latch = new CountDownLatch(1);
            AtomicReference dataCountsAtomicReference = new AtomicReference();
            AtomicReference exceptionAtomicReference = new AtomicReference();
            autoDetectWriter.write(countingStream, xContentType, (dataCounts, e) -> {
                dataCountsAtomicReference.set(dataCounts);
                exceptionAtomicReference.set(e);
                latch.countDown();
            });
            latch.await();
            autoDetectWriter.flushStream();
            if (exceptionAtomicReference.get() != null) {
                throw (Exception)exceptionAtomicReference.get();
            }
            return (DataCounts)((Object)((Object)dataCountsAtomicReference.get()));
        }, handler);
    }

    @Override
    public void close() throws IOException {
        this.close(false, null);
    }

    public void close(boolean restart, String reason) {
        Future<Object> future = this.autodetectWorkerExecutor.submit(() -> {
            ElasticsearchException elasticsearchException;
            this.checkProcessIsAlive();
            try {
                if (this.autodetectProcess.isReady()) {
                    this.autodetectProcess.close();
                } else {
                    this.killProcess(false, false);
                    this.stateStreamer.cancel();
                }
                this.autoDetectResultProcessor.awaitCompletion();
                elasticsearchException = restart ? new ElasticsearchException(reason, new Object[0]) : null;
            }
            catch (Throwable throwable) {
                this.onFinishHandler.accept((Exception)(restart ? new ElasticsearchException(reason, new Object[0]) : null));
                throw throwable;
            }
            this.onFinishHandler.accept((Exception)elasticsearchException);
            LOGGER.info("[{}] job closed", (Object)this.job.getId());
            return null;
        });
        try {
            future.get();
            this.autodetectWorkerExecutor.shutdown();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException e) {
            throw ExceptionsHelper.convertToElastic((Exception)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void killProcess(boolean awaitCompletion, boolean finish) throws IOException {
        try {
            this.processKilled = true;
            this.autoDetectResultProcessor.setProcessKilled();
            this.autodetectProcess.kill();
            this.autodetectWorkerExecutor.shutdown();
            if (awaitCompletion) {
                try {
                    this.autoDetectResultProcessor.awaitCompletion();
                }
                catch (TimeoutException e) {
                    LOGGER.warn((Message)new ParameterizedMessage("[{}] Timed out waiting for killed job", (Object)this.job.getId()), (Throwable)e);
                }
            }
        }
        finally {
            if (finish) {
                this.onFinishHandler.accept(null);
            }
        }
    }

    public void writeUpdateProcessMessage(ModelPlotConfig config, List<JobUpdate.DetectorUpdate> updates, BiConsumer<Void, Exception> handler) {
        this.submitOperation(() -> {
            if (config != null) {
                this.autodetectProcess.writeUpdateModelPlotMessage(config);
            }
            if (updates != null) {
                for (JobUpdate.DetectorUpdate update : updates) {
                    if (update.getRules() == null) continue;
                    this.autodetectProcess.writeUpdateDetectorRulesMessage(update.getDetectorIndex(), update.getRules());
                }
            }
            return null;
        }, handler);
    }

    public void flushJob(InterimResultsParams params, BiConsumer<Void, Exception> handler) {
        this.submitOperation(() -> {
            String flushId = this.autodetectProcess.flushJob(params);
            this.waitFlushToCompletion(flushId);
            return null;
        }, handler);
    }

    void waitFlushToCompletion(String flushId) {
        LOGGER.debug("[{}] waiting for flush", (Object)this.job.getId());
        try {
            boolean isFlushComplete = this.autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
            while (!isFlushComplete) {
                this.checkProcessIsAlive();
                this.checkResultsProcessorIsAlive();
                isFlushComplete = this.autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
            }
        }
        finally {
            this.autoDetectResultProcessor.clearAwaitingFlush(flushId);
        }
        if (!this.processKilled) {
            this.autoDetectResultProcessor.waitUntilRenormalizerIsIdle();
            LOGGER.debug("[{}] Flush completed", (Object)this.job.getId());
        }
    }

    private void checkProcessIsAlive() {
        if (!this.autodetectProcess.isProcessAlive()) {
            ParameterizedMessage message = new ParameterizedMessage("[{}] Unexpected death of autodetect: {}", (Object)this.job.getId(), (Object)this.autodetectProcess.readError());
            LOGGER.error((Message)message);
            throw new ElasticsearchException(message.getFormattedMessage(), new Object[0]);
        }
    }

    private void checkResultsProcessorIsAlive() {
        if (this.autoDetectResultProcessor.isFailed()) {
            ParameterizedMessage message = new ParameterizedMessage("[{}] Unexpected death of the result processor", (Object)this.job.getId());
            LOGGER.error((Message)message);
            throw new ElasticsearchException(message.getFormattedMessage(), new Object[0]);
        }
    }

    public OpenJobAction.JobTask getJobTask() {
        return this.jobTask;
    }

    public ZonedDateTime getProcessStartTime() {
        return this.autodetectProcess.getProcessStartTime();
    }

    public ModelSizeStats getModelSizeStats() {
        return this.autoDetectResultProcessor.modelSizeStats();
    }

    public DataCounts getDataCounts() {
        return this.dataCountsReporter.runningTotalStats();
    }

    private <T> void submitOperation(final CheckedSupplier<T, Exception> operation, final BiConsumer<T, Exception> handler) {
        this.autodetectWorkerExecutor.execute((Runnable)new AbstractRunnable(){

            public void onFailure(Exception e) {
                if (AutodetectCommunicator.this.processKilled) {
                    handler.accept(null, null);
                } else {
                    LOGGER.error((Message)new ParameterizedMessage("[{}] Unexpected exception writing to process", (Object)AutodetectCommunicator.this.job.getId()), (Throwable)e);
                    handler.accept(null, e);
                }
            }

            protected void doRun() throws Exception {
                if (AutodetectCommunicator.this.processKilled) {
                    handler.accept(null, null);
                } else {
                    AutodetectCommunicator.this.checkProcessIsAlive();
                    handler.accept(operation.get(), null);
                }
            }
        });
    }
}

