/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.job.process.autodetect;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectCommunicator;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessFactory;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.job.process.normalizer.ScoresUpdater;
import org.elasticsearch.xpack.ml.job.process.normalizer.ShortCircuitingRenormalizer;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;

public class AutodetectProcessManager
extends AbstractComponent {
    public static final Setting<Integer> MAX_RUNNING_JOBS_PER_NODE = Setting.intSetting((String)"max_running_jobs", (int)10, (int)1, (int)512, (Setting.Property[])new Setting.Property[]{Setting.Property.NodeScope});
    private final Client client;
    private final ThreadPool threadPool;
    private final JobManager jobManager;
    private final JobProvider jobProvider;
    private final AutodetectProcessFactory autodetectProcessFactory;
    private final NormalizerFactory normalizerFactory;
    private final JobResultsPersister jobResultsPersister;
    private final JobDataCountsPersister jobDataCountsPersister;
    private final ConcurrentMap<Long, AutodetectCommunicator> autoDetectCommunicatorByJob;
    private final int maxAllowedRunningJobs;
    private final NamedXContentRegistry xContentRegistry;
    private final Auditor auditor;

    public AutodetectProcessManager(Settings settings, Client client, ThreadPool threadPool, JobManager jobManager, JobProvider jobProvider, JobResultsPersister jobResultsPersister, JobDataCountsPersister jobDataCountsPersister, AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory, NamedXContentRegistry xContentRegistry, Auditor auditor) {
        super(settings);
        this.client = client;
        this.threadPool = threadPool;
        this.xContentRegistry = xContentRegistry;
        this.maxAllowedRunningJobs = (Integer)MAX_RUNNING_JOBS_PER_NODE.get(settings);
        this.autodetectProcessFactory = autodetectProcessFactory;
        this.normalizerFactory = normalizerFactory;
        this.jobManager = jobManager;
        this.jobProvider = jobProvider;
        this.jobResultsPersister = jobResultsPersister;
        this.jobDataCountsPersister = jobDataCountsPersister;
        this.autoDetectCommunicatorByJob = new ConcurrentHashMap<Long, AutodetectCommunicator>();
        this.auditor = auditor;
    }

    public synchronized void closeAllJobsOnThisNode(String reason) throws IOException {
        int numJobs = this.autoDetectCommunicatorByJob.size();
        if (numJobs != 0) {
            this.logger.info("Closing [{}] jobs, because [{}]", (Object)numJobs, (Object)reason);
            for (AutodetectCommunicator communicator : this.autoDetectCommunicatorByJob.values()) {
                this.closeJob(communicator.getJobTask(), false, reason);
            }
        }
    }

    public void killProcess(OpenJobAction.JobTask jobTask, boolean awaitCompletion, String reason) {
        AutodetectCommunicator communicator = (AutodetectCommunicator)this.autoDetectCommunicatorByJob.remove(jobTask.getAllocationId());
        if (communicator != null) {
            if (reason == null) {
                this.logger.info("Killing job [{}]", (Object)jobTask.getJobId());
            } else {
                this.logger.info("Killing job [{}], because [{}]", (Object)jobTask.getJobId(), (Object)reason);
            }
            this.killProcess(communicator, jobTask.getJobId(), awaitCompletion, true);
        }
    }

    public void killAllProcessesOnThisNode() {
        Iterator iter = this.autoDetectCommunicatorByJob.values().iterator();
        while (iter.hasNext()) {
            AutodetectCommunicator communicator = (AutodetectCommunicator)iter.next();
            iter.remove();
            this.killProcess(communicator, communicator.getJobTask().getJobId(), false, false);
        }
    }

    private void killProcess(AutodetectCommunicator communicator, String jobId, boolean awaitCompletion, boolean finish) {
        try {
            communicator.killProcess(awaitCompletion, finish);
        }
        catch (IOException e) {
            this.logger.error("[{}] Failed to kill autodetect process for job", (Object)jobId);
        }
    }

    public void processData(OpenJobAction.JobTask jobTask, InputStream input, XContentType xContentType, DataLoadParams params, BiConsumer<DataCounts, Exception> handler) {
        AutodetectCommunicator communicator = (AutodetectCommunicator)this.autoDetectCommunicatorByJob.get(jobTask.getAllocationId());
        if (communicator == null) {
            throw ExceptionsHelper.conflictStatusException("Cannot process data because job [" + jobTask.getJobId() + "] is not open", new Object[0]);
        }
        communicator.writeToJob(input, xContentType, params, handler);
    }

    public void flushJob(OpenJobAction.JobTask jobTask, InterimResultsParams params, Consumer<Exception> handler) {
        this.logger.debug("Flushing job {}", (Object)jobTask.getJobId());
        AutodetectCommunicator communicator = (AutodetectCommunicator)this.autoDetectCommunicatorByJob.get(jobTask.getAllocationId());
        if (communicator == null) {
            String message = String.format(Locale.ROOT, "Cannot flush because job [%s] is not open", jobTask.getJobId());
            this.logger.debug(message);
            handler.accept((Exception)ExceptionsHelper.conflictStatusException(message, new Object[0]));
            return;
        }
        communicator.flushJob(params, (aVoid, e) -> {
            if (e == null) {
                handler.accept(null);
            } else {
                String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", jobTask.getJobId());
                this.logger.error(msg);
                handler.accept((Exception)ExceptionsHelper.serverError(msg, e));
            }
        });
    }

    public void writeUpdateProcessMessage(OpenJobAction.JobTask jobTask, List<JobUpdate.DetectorUpdate> updates, ModelPlotConfig config, Consumer<Exception> handler) {
        AutodetectCommunicator communicator = (AutodetectCommunicator)this.autoDetectCommunicatorByJob.get(jobTask.getAllocationId());
        if (communicator == null) {
            String message = "Cannot process update model debug config because job [" + jobTask.getJobId() + "] is not open";
            this.logger.debug(message);
            handler.accept((Exception)ExceptionsHelper.conflictStatusException(message, new Object[0]));
            return;
        }
        communicator.writeUpdateProcessMessage(config, updates, (aVoid, e) -> {
            if (e == null) {
                handler.accept(null);
            } else {
                handler.accept((Exception)e);
            }
        });
    }

    public void openJob(final OpenJobAction.JobTask jobTask, boolean ignoreDowntime, final Consumer<Exception> handler) {
        String jobId = jobTask.getJobId();
        Job job = this.jobManager.getJobOrThrowIfUnknown(jobId);
        if (job.getJobVersion() == null) {
            this.setJobState(jobTask, JobState.FAILED, (CheckedConsumer<Exception, IOException>)((CheckedConsumer)suppressedException -> handler.accept((Exception)ExceptionsHelper.badRequestException("Cannot open job [" + jobId + "] because jobs created prior to version 5.5 are not supported", new Object[0]))));
            return;
        }
        this.logger.info("Opening job [{}]", (Object)jobId);
        this.jobProvider.getAutodetectParams(job, params -> this.threadPool.executor("ml_utility").execute((Runnable)new AbstractRunnable((AutodetectParams)params, ignoreDowntime){
            final /* synthetic */ AutodetectParams val$params;
            final /* synthetic */ boolean val$ignoreDowntime;
            {
                this.val$params = autodetectParams;
                this.val$ignoreDowntime = bl;
            }

            public void onFailure(Exception e) {
                handler.accept(e);
            }

            protected void doRun() throws Exception {
                try {
                    AutodetectCommunicator communicator = AutodetectProcessManager.this.autoDetectCommunicatorByJob.computeIfAbsent(jobTask.getAllocationId(), id -> AutodetectProcessManager.this.create(jobTask, this.val$params, this.val$ignoreDowntime, handler));
                    communicator.init(this.val$params.modelSnapshot());
                    AutodetectProcessManager.this.setJobState(jobTask, JobState.OPENED);
                }
                catch (Exception e1) {
                    try {
                        AutodetectCommunicator communicator = (AutodetectCommunicator)AutodetectProcessManager.this.autoDetectCommunicatorByJob.remove(jobTask.getAllocationId());
                        if (communicator != null) {
                            communicator.killProcess(false, false);
                        }
                    }
                    finally {
                        AutodetectProcessManager.this.setJobState(jobTask, JobState.FAILED, (CheckedConsumer<Exception, IOException>)((CheckedConsumer)e2 -> handler.accept(e1)));
                    }
                }
            }
        }), e1 -> {
            this.logger.warn("Failed to gather information required to open job [" + jobId + "]", (Throwable)e1);
            this.setJobState(jobTask, JobState.FAILED, (CheckedConsumer<Exception, IOException>)((CheckedConsumer)e2 -> handler.accept((Exception)e1)));
        });
    }

    AutodetectCommunicator create(OpenJobAction.JobTask jobTask, AutodetectParams autodetectParams, boolean ignoreDowntime, Consumer<Exception> handler) {
        ExecutorService autodetectWorkerExecutor;
        if (this.autoDetectCommunicatorByJob.size() == this.maxAllowedRunningJobs) {
            throw new ElasticsearchStatusException("max running job capacity [" + this.maxAllowedRunningJobs + "] reached", RestStatus.TOO_MANY_REQUESTS, new Object[0]);
        }
        String jobId = jobTask.getJobId();
        this.notifyLoadingSnapshot(jobId, autodetectParams);
        if (autodetectParams.dataCounts().getProcessedRecordCount() > 0L) {
            String msg;
            if (autodetectParams.modelSnapshot() == null) {
                msg = "No model snapshot could be found for a job with processed records";
                this.logger.warn("[{}] {}", (Object)jobId, (Object)msg);
                this.auditor.warning(jobId, "No model snapshot could be found for a job with processed records");
            }
            if (autodetectParams.quantiles() == null) {
                msg = "No quantiles could be found for a job with processed records";
                this.logger.warn("[{}] {}", (Object)jobId, (Object)msg);
                this.auditor.warning(jobId, msg);
            }
        }
        Job job = this.jobManager.getJobOrThrowIfUnknown(jobId);
        ExecutorService autoDetectExecutorService = this.threadPool.executor("ml_autodetect");
        DataCountsReporter dataCountsReporter = new DataCountsReporter(this.settings, job, autodetectParams.dataCounts(), this.jobDataCountsPersister);
        ScoresUpdater scoresUpdater = new ScoresUpdater(job, this.jobProvider, new JobRenormalizedResultsPersister(job.getId(), this.settings, this.client), this.normalizerFactory);
        ExecutorService renormalizerExecutorService = this.threadPool.executor("ml_utility");
        ShortCircuitingRenormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater, renormalizerExecutorService, job.getAnalysisConfig().getUsePerPartitionNormalization());
        AutodetectProcess process = this.autodetectProcessFactory.createAutodetectProcess(job, autodetectParams.modelSnapshot(), autodetectParams.quantiles(), autodetectParams.filters(), ignoreDowntime, autoDetectExecutorService, () -> this.setJobState(jobTask, JobState.FAILED));
        AutoDetectResultProcessor processor = new AutoDetectResultProcessor(this.client, jobId, renormalizer, this.jobResultsPersister, autodetectParams.modelSizeStats());
        try {
            autodetectWorkerExecutor = this.createAutodetectExecutorService(autoDetectExecutorService);
            autoDetectExecutorService.submit(() -> processor.process(process));
        }
        catch (EsRejectedExecutionException e) {
            try {
                IOUtils.close((Closeable[])new Closeable[]{process});
            }
            catch (IOException ioe) {
                this.logger.error("Can't close autodetect", (Throwable)ioe);
            }
            throw e;
        }
        return new AutodetectCommunicator(job, jobTask, process, new StateStreamer(this.client), dataCountsReporter, processor, handler, this.xContentRegistry, autodetectWorkerExecutor);
    }

    private void notifyLoadingSnapshot(String jobId, AutodetectParams autodetectParams) {
        ModelSnapshot modelSnapshot = autodetectParams.modelSnapshot();
        StringBuilder msgBuilder = new StringBuilder("Loading model snapshot [");
        if (modelSnapshot == null) {
            msgBuilder.append("N/A");
        } else {
            msgBuilder.append(modelSnapshot.getSnapshotId());
            msgBuilder.append("] with latest_record_timestamp [");
            Date snapshotLatestRecordTimestamp = modelSnapshot.getLatestRecordTimeStamp();
            msgBuilder.append(snapshotLatestRecordTimestamp == null ? "N/A" : XContentBuilder.DEFAULT_DATE_PRINTER.print(snapshotLatestRecordTimestamp.getTime()));
        }
        msgBuilder.append("], job latest_record_timestamp [");
        Date jobLatestRecordTimestamp = autodetectParams.dataCounts().getLatestRecordTimeStamp();
        msgBuilder.append(jobLatestRecordTimestamp == null ? "N/A" : XContentBuilder.DEFAULT_DATE_PRINTER.print(jobLatestRecordTimestamp.getTime()));
        msgBuilder.append("]");
        String msg = msgBuilder.toString();
        this.logger.info("[{}] {}", (Object)jobId, (Object)msg);
        this.auditor.info(jobId, msg);
    }

    public void closeJob(OpenJobAction.JobTask jobTask, boolean restart, String reason) {
        this.logger.debug("Attempting to close job [{}], because [{}]", (Object)jobTask.getJobId(), (Object)reason);
        AutodetectCommunicator communicator = (AutodetectCommunicator)this.autoDetectCommunicatorByJob.remove(jobTask.getAllocationId());
        if (communicator == null) {
            this.logger.debug("Cannot close: no active autodetect process for job {}", (Object)jobTask.getJobId());
            return;
        }
        if (reason == null) {
            this.logger.info("Closing job [{}]", (Object)jobTask.getJobId());
        } else {
            this.logger.info("Closing job [{}], because [{}]", (Object)jobTask.getJobId(), (Object)reason);
        }
        try {
            communicator.close(restart, reason);
        }
        catch (Exception e) {
            this.logger.warn("Exception closing stopped process input stream", (Throwable)e);
            this.setJobState(jobTask, JobState.FAILED);
            throw ExceptionsHelper.serverError("Exception closing stopped process input stream", e);
        }
    }

    int numberOfOpenJobs() {
        return this.autoDetectCommunicatorByJob.size();
    }

    boolean jobHasActiveAutodetectProcess(OpenJobAction.JobTask jobTask) {
        return this.autoDetectCommunicatorByJob.get(jobTask.getAllocationId()) != null;
    }

    public Optional<Duration> jobOpenTime(OpenJobAction.JobTask jobTask) {
        AutodetectCommunicator communicator = (AutodetectCommunicator)this.autoDetectCommunicatorByJob.get(jobTask.getAllocationId());
        if (communicator == null) {
            return Optional.empty();
        }
        return Optional.of(Duration.between(communicator.getProcessStartTime(), ZonedDateTime.now()));
    }

    void setJobState(final OpenJobAction.JobTask jobTask, final JobState state) {
        JobTaskStatus taskStatus = new JobTaskStatus(state, jobTask.getAllocationId());
        jobTask.updatePersistentStatus(taskStatus, new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>(){

            public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
                AutodetectProcessManager.this.logger.info("Successfully set job state to [{}] for job [{}]", (Object)state, (Object)jobTask.getJobId());
            }

            public void onFailure(Exception e) {
                AutodetectProcessManager.this.logger.error("Could not set job state to [" + (Object)((Object)state) + "] for job [" + jobTask.getJobId() + "]", (Throwable)e);
            }
        });
    }

    void setJobState(OpenJobAction.JobTask jobTask, JobState state, final CheckedConsumer<Exception, IOException> handler) {
        JobTaskStatus taskStatus = new JobTaskStatus(state, jobTask.getAllocationId());
        jobTask.updatePersistentStatus(taskStatus, new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>(){

            public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
                try {
                    handler.accept(null);
                }
                catch (IOException e1) {
                    AutodetectProcessManager.this.logger.warn("Error while delegating response", (Throwable)e1);
                }
            }

            public void onFailure(Exception e) {
                try {
                    handler.accept((Object)e);
                }
                catch (IOException e1) {
                    AutodetectProcessManager.this.logger.warn("Error while delegating exception [" + e.getMessage() + "]", (Throwable)e1);
                }
            }
        });
    }

    public Optional<Tuple<DataCounts, ModelSizeStats>> getStatistics(OpenJobAction.JobTask jobTask) {
        AutodetectCommunicator communicator = (AutodetectCommunicator)this.autoDetectCommunicatorByJob.get(jobTask.getAllocationId());
        if (communicator == null) {
            return Optional.empty();
        }
        return Optional.of(new Tuple((Object)communicator.getDataCounts(), (Object)communicator.getModelSizeStats()));
    }

    ExecutorService createAutodetectExecutorService(ExecutorService executorService) {
        AutodetectWorkerExecutorService autoDetectWorkerExecutor = new AutodetectWorkerExecutorService(this.threadPool.getThreadContext());
        executorService.submit(autoDetectWorkerExecutor::start);
        return autoDetectWorkerExecutor;
    }

    class AutodetectWorkerExecutorService
    extends AbstractExecutorService {
        private final ThreadContext contextHolder;
        private final CountDownLatch awaitTermination = new CountDownLatch(1);
        private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(100);
        private volatile boolean running = true;

        AutodetectWorkerExecutorService(ThreadContext contextHolder) {
            this.contextHolder = contextHolder;
        }

        @Override
        public void shutdown() {
            this.running = false;
        }

        @Override
        public List<Runnable> shutdownNow() {
            throw new UnsupportedOperationException("not supported");
        }

        @Override
        public boolean isShutdown() {
            return !this.running;
        }

        @Override
        public boolean isTerminated() {
            return this.awaitTermination.getCount() == 0L;
        }

        @Override
        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
            return this.awaitTermination.await(timeout, unit);
        }

        @Override
        public void execute(Runnable command) {
            boolean added = this.queue.offer(this.contextHolder.preserveContext(command));
            if (!added) {
                throw new ElasticsearchStatusException("Unable to submit operation", RestStatus.TOO_MANY_REQUESTS, new Object[0]);
            }
        }

        void start() {
            block9: {
                block7: while (true) {
                    try {
                        while (this.running) {
                            Runnable runnable = this.queue.poll(500L, TimeUnit.MILLISECONDS);
                            if (runnable == null) continue;
                            try {
                                runnable.run();
                                continue block7;
                            }
                            catch (Exception e) {
                                AutodetectProcessManager.this.logger.error("error handeling job operation", (Throwable)e);
                            }
                        }
                        break block9;
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break block9;
                    }
                }
                finally {
                    this.awaitTermination.countDown();
                }
            }
        }
    }
}

