/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.job.process.autodetect.output;

import java.time.Duration;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.ClientHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.FlushListener;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.ml.job.results.Forecast;
import org.elasticsearch.xpack.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.ModelPlot;

public class AutoDetectResultProcessor {
    private static final Logger LOGGER = Loggers.getLogger(AutoDetectResultProcessor.class);
    private final Client client;
    private final String jobId;
    private final Renormalizer renormalizer;
    private final JobResultsPersister persister;
    private final JobProvider jobProvider;
    private final boolean restoredSnapshot;
    final CountDownLatch completionLatch = new CountDownLatch(1);
    final Semaphore updateModelSnapshotIdSemaphore = new Semaphore(1);
    private final FlushListener flushListener;
    private volatile boolean processKilled;
    private volatile boolean failed;
    private int bucketCount;
    private volatile ModelSizeStats latestModelSizeStats;
    private volatile long latestEstablishedModelMemory;
    private volatile boolean haveNewLatestModelSizeStats;

    public AutoDetectResultProcessor(Client client, String jobId, Renormalizer renormalizer, JobResultsPersister persister, JobProvider jobProvider, ModelSizeStats latestModelSizeStats, boolean restoredSnapshot) {
        this(client, jobId, renormalizer, persister, jobProvider, latestModelSizeStats, restoredSnapshot, new FlushListener());
    }

    AutoDetectResultProcessor(Client client, String jobId, Renormalizer renormalizer, JobResultsPersister persister, JobProvider jobProvider, ModelSizeStats latestModelSizeStats, boolean restoredSnapshot, FlushListener flushListener) {
        this.client = Objects.requireNonNull(client);
        this.jobId = Objects.requireNonNull(jobId);
        this.renormalizer = Objects.requireNonNull(renormalizer);
        this.persister = Objects.requireNonNull(persister);
        this.jobProvider = Objects.requireNonNull(jobProvider);
        this.flushListener = Objects.requireNonNull(flushListener);
        this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats);
        this.restoredSnapshot = restoredSnapshot;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void process(AutodetectProcess process) {
        Context context = new Context(this.jobId, this.persister.bulkPersisterBuilder(this.jobId));
        try {
            this.bucketCount = 0;
            Iterator<AutodetectResult> iterator = process.readAutodetectResults();
            while (iterator.hasNext()) {
                try {
                    AutodetectResult result = iterator.next();
                    this.processResult(context, result);
                    if (result.getBucket() == null) continue;
                    LOGGER.trace("[{}] Bucket number {} parsed from output", (Object)this.jobId, (Object)this.bucketCount);
                }
                catch (Exception e) {
                    if (this.processKilled) {
                        throw e;
                    }
                    if (!process.isProcessAliveAfterWaiting()) {
                        throw e;
                    }
                    LOGGER.warn((Message)new ParameterizedMessage("[{}] Error processing autodetect result", (Object)this.jobId), (Throwable)e);
                }
            }
            try {
                if (!this.processKilled) {
                    context.bulkResultsPersister.executeRequest();
                }
            }
            catch (Exception e) {
                LOGGER.warn((Message)new ParameterizedMessage("[{}] Error persisting autodetect results", (Object)this.jobId), (Throwable)e);
            }
            LOGGER.info("[{}] {} buckets parsed from autodetect output", (Object)this.jobId, (Object)this.bucketCount);
        }
        catch (Exception e) {
            this.failed = true;
            if (this.processKilled) {
                LOGGER.warn("[{}] some results not processed due to the process being killed", (Object)this.jobId);
            } else if (!process.isProcessAliveAfterWaiting()) {
                LOGGER.warn("[{}] some results not processed due to the termination of autodetect", (Object)this.jobId);
            } else {
                LOGGER.error((Message)new ParameterizedMessage("[{}] error parsing autodetect output", (Object)this.jobId), (Throwable)e);
            }
        }
        finally {
            this.flushListener.clear();
            this.completionLatch.countDown();
        }
    }

    public void setProcessKilled() {
        this.processKilled = true;
        this.renormalizer.shutdown();
    }

    void processResult(Context context, AutodetectResult result) {
        FlushAcknowledgement flushAcknowledgement;
        Quantiles quantiles;
        ModelSnapshot modelSnapshot;
        ModelSizeStats modelSizeStats;
        ForecastRequestStats forecastRequestStats;
        Forecast forecast;
        ModelPlot modelPlot;
        CategoryDefinition categoryDefinition;
        List<Influencer> influencers;
        List<AnomalyRecord> records;
        if (this.processKilled) {
            return;
        }
        Bucket bucket = result.getBucket();
        if (bucket != null) {
            if (context.deleteInterimRequired) {
                LOGGER.trace("[{}] Deleting interim results", (Object)context.jobId);
                this.persister.deleteInterimResults(context.jobId);
                context.deleteInterimRequired = false;
            }
            context.bulkResultsPersister.persistBucket(bucket).executeRequest();
            ++this.bucketCount;
            long minEstablishedTimespanMs = 20L * bucket.getBucketSpan() * 1000L;
            if (this.haveNewLatestModelSizeStats && this.latestEstablishedModelMemory == 0L && bucket.getTimestamp().getTime() > this.latestModelSizeStats.getTimestamp().getTime() + minEstablishedTimespanMs) {
                this.persister.commitResultWrites(context.jobId);
                this.updateEstablishedModelMemoryOnJob(bucket.getTimestamp(), this.latestModelSizeStats);
                this.haveNewLatestModelSizeStats = false;
            }
        }
        if ((records = result.getRecords()) != null && !records.isEmpty()) {
            context.bulkResultsPersister.persistRecords(records);
        }
        if ((influencers = result.getInfluencers()) != null && !influencers.isEmpty()) {
            context.bulkResultsPersister.persistInfluencers(influencers);
        }
        if ((categoryDefinition = result.getCategoryDefinition()) != null) {
            this.persister.persistCategoryDefinition(categoryDefinition);
        }
        if ((modelPlot = result.getModelPlot()) != null) {
            context.bulkResultsPersister.persistModelPlot(modelPlot);
        }
        if ((forecast = result.getForecast()) != null) {
            context.bulkResultsPersister.persistForecast(forecast);
        }
        if ((forecastRequestStats = result.getForecastRequestStats()) != null) {
            LOGGER.trace("Received Forecast Stats [{}]", (Object)forecastRequestStats.getId());
            context.bulkResultsPersister.persistForecastRequestStats(forecastRequestStats);
            double forecastProgress = forecastRequestStats.getProgress();
            if (forecastProgress == 0.0 || forecastProgress >= 1.0) {
                context.bulkResultsPersister.executeRequest();
            }
        }
        if ((modelSizeStats = result.getModelSizeStats()) != null) {
            LOGGER.trace("[{}] Parsed ModelSizeStats: {} / {} / {} / {} / {} / {}", (Object)context.jobId, (Object)modelSizeStats.getModelBytes(), (Object)modelSizeStats.getTotalByFieldCount(), (Object)modelSizeStats.getTotalOverFieldCount(), (Object)modelSizeStats.getTotalPartitionFieldCount(), (Object)modelSizeStats.getBucketAllocationFailuresCount(), (Object)modelSizeStats.getMemoryStatus());
            this.latestModelSizeStats = modelSizeStats;
            this.haveNewLatestModelSizeStats = true;
            this.persister.persistModelSizeStats(modelSizeStats);
            if (this.restoredSnapshot || this.bucketCount >= 20) {
                this.persister.commitResultWrites(context.jobId);
                this.updateEstablishedModelMemoryOnJob(modelSizeStats.getTimestamp(), modelSizeStats);
            }
        }
        if ((modelSnapshot = result.getModelSnapshot()) != null) {
            this.persister.persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE);
            this.updateModelSnapshotIdOnJob(modelSnapshot);
        }
        if ((quantiles = result.getQuantiles()) != null) {
            this.persister.persistQuantiles(quantiles);
            context.bulkResultsPersister.executeRequest();
            if (!this.processKilled) {
                this.persister.commitResultWrites(context.jobId);
                LOGGER.debug("[{}] Quantiles parsed from output - will trigger renormalization of scores", (Object)context.jobId);
                this.renormalizer.renormalize(quantiles);
            }
        }
        if ((flushAcknowledgement = result.getFlushAcknowledgement()) != null) {
            LOGGER.debug("[{}] Flush acknowledgement parsed from output for ID {}", (Object)context.jobId, (Object)flushAcknowledgement.getId());
            context.bulkResultsPersister.executeRequest();
            this.persister.commitResultWrites(context.jobId);
            this.flushListener.acknowledgeFlush(flushAcknowledgement);
            context.deleteInterimRequired = true;
        }
    }

    protected void updateModelSnapshotIdOnJob(final ModelSnapshot modelSnapshot) {
        JobUpdate update = new JobUpdate.Builder(this.jobId).setModelSnapshotId(modelSnapshot.getSnapshotId()).build();
        UpdateJobAction.Request updateRequest = new UpdateJobAction.Request(this.jobId, update);
        try {
            this.updateModelSnapshotIdSemaphore.acquire();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOGGER.info("[{}] Interrupted acquiring update model snapshot semaphore", (Object)this.jobId);
            return;
        }
        ClientHelper.executeAsyncWithOrigin(this.client, "ml", UpdateJobAction.INSTANCE, updateRequest, new ActionListener<PutJobAction.Response>(){

            public void onResponse(PutJobAction.Response response) {
                AutoDetectResultProcessor.this.updateModelSnapshotIdSemaphore.release();
                LOGGER.debug("[{}] Updated job with model snapshot id [{}]", (Object)AutoDetectResultProcessor.this.jobId, (Object)modelSnapshot.getSnapshotId());
            }

            public void onFailure(Exception e) {
                AutoDetectResultProcessor.this.updateModelSnapshotIdSemaphore.release();
                LOGGER.error("[" + AutoDetectResultProcessor.this.jobId + "] Failed to update job with new model snapshot id [" + modelSnapshot.getSnapshotId() + "]", (Throwable)e);
            }
        });
    }

    protected void updateEstablishedModelMemoryOnJob(Date latestBucketTimestamp, ModelSizeStats modelSizeStats) {
        this.jobProvider.getEstablishedMemoryUsage(this.jobId, latestBucketTimestamp, modelSizeStats, establishedModelMemory -> {
            JobUpdate update = new JobUpdate.Builder(this.jobId).setEstablishedModelMemory((Long)establishedModelMemory).build();
            UpdateJobAction.Request updateRequest = new UpdateJobAction.Request(this.jobId, update);
            ClientHelper.executeAsyncWithOrigin(this.client, "ml", UpdateJobAction.INSTANCE, updateRequest, new ActionListener<PutJobAction.Response>(){

                public void onResponse(PutJobAction.Response response) {
                    AutoDetectResultProcessor.this.latestEstablishedModelMemory = establishedModelMemory;
                    LOGGER.debug("[{}] Updated job with established model memory [{}]", (Object)AutoDetectResultProcessor.this.jobId, (Object)establishedModelMemory);
                }

                public void onFailure(Exception e) {
                    LOGGER.error("[" + AutoDetectResultProcessor.this.jobId + "] Failed to update job with new established model memory [" + establishedModelMemory + "]", (Throwable)e);
                }
            });
        }, e -> LOGGER.error("[" + this.jobId + "] Failed to calculate established model memory", (Throwable)e));
    }

    public void awaitCompletion() throws TimeoutException {
        try {
            if (!this.completionLatch.await(MachineLearning.STATE_PERSIST_RESTORE_TIMEOUT.getMinutes(), TimeUnit.MINUTES)) {
                throw new TimeoutException("Timed out waiting for results processor to complete for job " + this.jobId);
            }
            this.updateModelSnapshotIdSemaphore.acquire();
            this.updateModelSnapshotIdSemaphore.release();
            this.waitUntilRenormalizerIsIdle();
            this.persister.commitResultWrites(this.jobId);
            this.persister.commitStateWrites(this.jobId);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOGGER.info("[{}] Interrupted waiting for results processor to complete", (Object)this.jobId);
        }
    }

    @Nullable
    public FlushAcknowledgement waitForFlushAcknowledgement(String flushId, Duration timeout) {
        return this.failed ? null : this.flushListener.waitForFlush(flushId, timeout);
    }

    public void clearAwaitingFlush(String flushId) {
        this.flushListener.clear(flushId);
    }

    public void waitUntilRenormalizerIsIdle() {
        this.renormalizer.waitUntilIdle();
    }

    public boolean isFailed() {
        return this.failed;
    }

    public ModelSizeStats modelSizeStats() {
        return this.latestModelSizeStats;
    }

    static class Context {
        private final String jobId;
        private JobResultsPersister.Builder bulkResultsPersister;
        boolean deleteInterimRequired;

        Context(String jobId, JobResultsPersister.Builder bulkResultsPersister) {
            this.jobId = jobId;
            this.deleteInterimRequired = true;
            this.bulkResultsPersister = bulkResultsPersister;
        }
    }
}

