/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.datafeed;

import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJob;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.datafeed.DefaultFrequency;
import org.elasticsearch.xpack.ml.datafeed.ProblemTracker;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksService;

public class DatafeedManager
extends AbstractComponent {
    private static final String INF_SYMBOL = "forever";
    private final Client client;
    private final ClusterService clusterService;
    private final PersistentTasksService persistentTasksService;
    private final JobProvider jobProvider;
    private final ThreadPool threadPool;
    private final Supplier<Long> currentTimeSupplier;
    private final Auditor auditor;
    private final ConcurrentMap<Long, Holder> runningDatafeeds = new ConcurrentHashMap<Long, Holder>();

    public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, JobProvider jobProvider, Supplier<Long> currentTimeSupplier, Auditor auditor, PersistentTasksService persistentTasksService) {
        super(Settings.EMPTY);
        this.client = Objects.requireNonNull(client);
        this.clusterService = Objects.requireNonNull(clusterService);
        this.jobProvider = Objects.requireNonNull(jobProvider);
        this.threadPool = threadPool;
        this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
        this.auditor = Objects.requireNonNull(auditor);
        this.persistentTasksService = Objects.requireNonNull(persistentTasksService);
    }

    public void run(final StartDatafeedAction.DatafeedTask task, final Consumer<Exception> handler) {
        String datafeedId = task.getDatafeedId();
        ClusterState state = this.clusterService.state();
        MlMetadata mlMetadata = (MlMetadata)state.metaData().custom("ml");
        DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
        Job job = mlMetadata.getJobs().get(datafeed.getJobId());
        this.gatherInformation(job.getId(), (buckets, dataCounts) -> {
            long latestFinalBucketEndMs = -1L;
            TimeValue bucketSpan = job.getAnalysisConfig().getBucketSpan();
            if (buckets.results().size() == 1) {
                latestFinalBucketEndMs = ((Bucket)((Object)((Object)buckets.results().get(0)))).getTimestamp().getTime() + bucketSpan.millis() - 1L;
            }
            long latestRecordTimeMs = -1L;
            if (dataCounts.getLatestRecordTimeStamp() != null) {
                latestRecordTimeMs = dataCounts.getLatestRecordTimeStamp().getTime();
            }
            final Holder holder = this.createJobDatafeed(datafeed, job, latestFinalBucketEndMs, latestRecordTimeMs, handler, task);
            this.runningDatafeeds.put(task.getAllocationId(), holder);
            task.updatePersistentStatus(DatafeedState.STARTED, new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>(){

                public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
                    DatafeedManager.this.innerRun(holder, task.getDatafeedStartTime(), task.getEndTime());
                }

                public void onFailure(Exception e) {
                    handler.accept(e);
                }
            });
        }, handler);
    }

    public void stopDatafeed(StartDatafeedAction.DatafeedTask task, String reason, TimeValue timeout) {
        this.logger.info("[{}] attempt to stop datafeed [{}] [{}]", (Object)reason, (Object)task.getDatafeedId(), (Object)task.getAllocationId());
        Holder holder = (Holder)this.runningDatafeeds.remove(task.getAllocationId());
        if (holder != null) {
            holder.stop(reason, timeout, null);
        }
    }

    public void stopAllDatafeeds(String reason) {
        int numDatafeeds = this.runningDatafeeds.size();
        if (numDatafeeds != 0) {
            this.logger.info("Closing [{}] datafeeds, because [{}]", (Object)numDatafeeds, (Object)reason);
        }
        for (Holder holder : this.runningDatafeeds.values()) {
            holder.stop(reason, TimeValue.timeValueSeconds((long)20L), null);
        }
    }

    private void innerRun(final Holder holder, final long startTime, final Long endTime) {
        this.logger.info("Starting datafeed [{}] for job [{}] in [{}, {})", (Object)holder.datafeed.getId(), (Object)holder.datafeed.getJobId(), (Object)DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(startTime), (Object)(endTime == null ? INF_SYMBOL : DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(endTime.longValue())));
        holder.future = this.threadPool.executor("ml_datafeed").submit((Runnable)new AbstractRunnable(){

            public void onFailure(Exception e) {
                DatafeedManager.this.logger.error("Failed lookback import for job [" + holder.datafeed.getJobId() + "]", (Throwable)e);
                holder.stop("general_lookback_failure", TimeValue.timeValueSeconds((long)20L), e);
            }

            protected void doRun() throws Exception {
                Long next = null;
                try {
                    next = holder.executeLoopBack(startTime, endTime);
                }
                catch (DatafeedJob.ExtractionProblemException e) {
                    if (endTime == null) {
                        next = e.nextDelayInMsSinceEpoch;
                    }
                    holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
                }
                catch (DatafeedJob.AnalysisProblemException e) {
                    if (endTime == null) {
                        next = e.nextDelayInMsSinceEpoch;
                    }
                    holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
                    if (e.shouldStop) {
                        holder.stop("lookback_analysis_error", TimeValue.timeValueSeconds((long)20L), e);
                        return;
                    }
                }
                catch (DatafeedJob.EmptyDataCountException e) {
                    if (endTime == null) {
                        holder.problemTracker.reportEmptyDataCount();
                        next = e.nextDelayInMsSinceEpoch;
                    } else {
                        String lookbackNoDataMsg = Messages.getMessage("Datafeed lookback retrieved no data");
                        DatafeedManager.this.logger.warn("[{}] {}", (Object)holder.datafeed.getJobId(), (Object)lookbackNoDataMsg);
                        DatafeedManager.this.auditor.warning(holder.datafeed.getJobId(), lookbackNoDataMsg);
                    }
                }
                catch (Exception e) {
                    DatafeedManager.this.logger.error("Failed lookback import for job [" + holder.datafeed.getJobId() + "]", (Throwable)e);
                    holder.stop("general_lookback_failure", TimeValue.timeValueSeconds((long)20L), e);
                    return;
                }
                if (next != null) {
                    DatafeedManager.this.doDatafeedRealtime(next, holder.datafeed.getJobId(), holder);
                } else {
                    holder.stop("no_realtime", TimeValue.timeValueSeconds((long)20L), null);
                    holder.problemTracker.finishReport();
                }
            }
        });
    }

    void doDatafeedRealtime(long delayInMsSinceEpoch, final String jobId, final Holder holder) {
        if (holder.isRunning()) {
            TimeValue delay = this.computeNextDelay(delayInMsSinceEpoch);
            this.logger.debug("Waiting [{}] before executing next realtime import for job [{}]", (Object)delay, (Object)jobId);
            holder.future = this.threadPool.schedule(delay, "ml_datafeed", (Runnable)new AbstractRunnable(){

                public void onFailure(Exception e) {
                    DatafeedManager.this.logger.error("Unexpected datafeed failure for job [" + jobId + "] stopping...", (Throwable)e);
                    holder.stop("general_realtime_error", TimeValue.timeValueSeconds((long)20L), e);
                }

                protected void doRun() throws Exception {
                    long nextDelayInMsSinceEpoch;
                    try {
                        nextDelayInMsSinceEpoch = holder.executeRealTime();
                        holder.problemTracker.reportNoneEmptyCount();
                    }
                    catch (DatafeedJob.ExtractionProblemException e) {
                        nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
                        holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
                    }
                    catch (DatafeedJob.AnalysisProblemException e) {
                        nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
                        holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
                        if (e.shouldStop) {
                            holder.stop("realtime_analysis_error", TimeValue.timeValueSeconds((long)20L), e);
                            return;
                        }
                    }
                    catch (DatafeedJob.EmptyDataCountException e) {
                        nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
                        holder.problemTracker.reportEmptyDataCount();
                    }
                    catch (Exception e) {
                        DatafeedManager.this.logger.error("Unexpected datafeed failure for job [" + jobId + "] stopping...", (Throwable)e);
                        holder.stop("general_realtime_error", TimeValue.timeValueSeconds((long)20L), e);
                        return;
                    }
                    holder.problemTracker.finishReport();
                    if (nextDelayInMsSinceEpoch >= 0L) {
                        DatafeedManager.this.doDatafeedRealtime(nextDelayInMsSinceEpoch, jobId, holder);
                    }
                }
            });
        }
    }

    Holder createJobDatafeed(DatafeedConfig datafeed, Job job, long finalBucketEndMs, long latestRecordTimeMs, Consumer<Exception> handler, StartDatafeedAction.DatafeedTask task) {
        Duration frequency = DatafeedManager.getFrequencyOrDefault(datafeed, job);
        Duration queryDelay = Duration.ofMillis(datafeed.getQueryDelay().millis());
        DataExtractorFactory dataExtractorFactory = this.createDataExtractorFactory(datafeed, job);
        DatafeedJob datafeedJob = new DatafeedJob(job.getId(), DatafeedManager.buildDataDescription(job), frequency.toMillis(), queryDelay.toMillis(), dataExtractorFactory, this.client, this.auditor, this.currentTimeSupplier, finalBucketEndMs, latestRecordTimeMs);
        return new Holder(task.getPersistentTaskId(), task.getAllocationId(), datafeed, datafeedJob, task.isLookbackOnly(), new ProblemTracker(this.auditor, job.getId()), handler);
    }

    DataExtractorFactory createDataExtractorFactory(DatafeedConfig datafeed, Job job) {
        return DataExtractorFactory.create(this.client, datafeed, job);
    }

    private static DataDescription buildDataDescription(Job job) {
        DataDescription.Builder dataDescription = new DataDescription.Builder();
        dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
        if (job.getDataDescription() != null) {
            dataDescription.setTimeField(job.getDataDescription().getTimeField());
        }
        dataDescription.setTimeFormat("epoch_ms");
        return dataDescription.build();
    }

    private void gatherInformation(String jobId, BiConsumer<QueryPage<Bucket>, DataCounts> handler, Consumer<Exception> errorHandler) {
        BucketsQueryBuilder.BucketsQuery latestBucketQuery = new BucketsQueryBuilder().sortField(Result.TIMESTAMP.getPreferredName()).sortDescending(true).size(1).includeInterim(false).build();
        this.jobProvider.bucketsViaInternalClient(jobId, latestBucketQuery, buckets -> this.jobProvider.dataCounts(jobId, dataCounts -> handler.accept((QueryPage<Bucket>)((Object)buckets), (DataCounts)((Object)dataCounts)), errorHandler), e -> {
            if (e instanceof ResourceNotFoundException) {
                QueryPage empty = new QueryPage(Collections.emptyList(), 0L, Bucket.RESULT_TYPE_FIELD);
                this.jobProvider.dataCounts(jobId, dataCounts -> handler.accept(empty, (DataCounts)((Object)dataCounts)), errorHandler);
            } else {
                errorHandler.accept((Exception)e);
            }
        });
    }

    private static Duration getFrequencyOrDefault(DatafeedConfig datafeed, Job job) {
        TimeValue frequency = datafeed.getFrequency();
        TimeValue bucketSpan = job.getAnalysisConfig().getBucketSpan();
        return frequency == null ? DefaultFrequency.ofBucketSpan(bucketSpan.seconds()) : Duration.ofSeconds(frequency.seconds());
    }

    private TimeValue computeNextDelay(long next) {
        return new TimeValue(Math.max(1L, next - this.currentTimeSupplier.get()));
    }

    boolean isRunning(long allocationId) {
        return this.runningDatafeeds.containsKey(allocationId);
    }

    public class Holder {
        private final String taskId;
        private final long allocationId;
        private final DatafeedConfig datafeed;
        private final ReentrantLock datafeedJobLock = new ReentrantLock(true);
        private final DatafeedJob datafeedJob;
        private final boolean autoCloseJob;
        private final ProblemTracker problemTracker;
        private final Consumer<Exception> handler;
        volatile Future<?> future;

        Holder(String taskId, long allocationId, DatafeedConfig datafeed, DatafeedJob datafeedJob, boolean autoCloseJob, ProblemTracker problemTracker, Consumer<Exception> handler) {
            this.taskId = taskId;
            this.allocationId = allocationId;
            this.datafeed = datafeed;
            this.datafeedJob = datafeedJob;
            this.autoCloseJob = autoCloseJob;
            this.problemTracker = problemTracker;
            this.handler = handler;
        }

        boolean isRunning() {
            return this.datafeedJob.isRunning();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void stop(String source, TimeValue timeout, Exception e) {
            block10: {
                block11: {
                    DatafeedManager.this.logger.info("[{}] attempt to stop datafeed [{}] for job [{}]", (Object)source, (Object)this.datafeed.getId(), (Object)this.datafeed.getJobId());
                    if (!this.datafeedJob.stop()) break block11;
                    boolean acquired = false;
                    try {
                        DatafeedManager.this.logger.info("[{}] try lock [{}] to stop datafeed [{}] for job [{}]...", (Object)source, (Object)timeout, (Object)this.datafeed.getId(), (Object)this.datafeed.getJobId());
                        acquired = this.datafeedJobLock.tryLock(timeout.millis(), TimeUnit.MILLISECONDS);
                    }
                    catch (InterruptedException e1) {
                        try {
                            Thread.currentThread().interrupt();
                        }
                        catch (Throwable throwable) {
                            DatafeedManager.this.logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", (Object)source, (Object)this.datafeed.getId(), (Object)this.datafeed.getJobId(), (Object)acquired);
                            DatafeedManager.this.runningDatafeeds.remove(this.allocationId);
                            FutureUtils.cancel(this.future);
                            DatafeedManager.this.auditor.info(this.datafeed.getJobId(), Messages.getMessage("Datafeed stopped"));
                            this.handler.accept(e);
                            DatafeedManager.this.logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", (Object)source, (Object)this.datafeed.getId(), (Object)this.datafeed.getJobId(), (Object)(acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout.getStringRep() + "] expired"));
                            if (this.autoCloseJob) {
                                this.closeJob();
                            }
                            if (acquired) {
                                this.datafeedJobLock.unlock();
                            }
                            throw throwable;
                        }
                        DatafeedManager.this.logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", (Object)source, (Object)this.datafeed.getId(), (Object)this.datafeed.getJobId(), (Object)acquired);
                        DatafeedManager.this.runningDatafeeds.remove(this.allocationId);
                        FutureUtils.cancel(this.future);
                        DatafeedManager.this.auditor.info(this.datafeed.getJobId(), Messages.getMessage("Datafeed stopped"));
                        this.handler.accept(e);
                        DatafeedManager.this.logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", (Object)source, (Object)this.datafeed.getId(), (Object)this.datafeed.getJobId(), (Object)(acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout.getStringRep() + "] expired"));
                        if (this.autoCloseJob) {
                            this.closeJob();
                        }
                        if (acquired) {
                            this.datafeedJobLock.unlock();
                        }
                        break block10;
                    }
                    DatafeedManager.this.logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", (Object)source, (Object)this.datafeed.getId(), (Object)this.datafeed.getJobId(), (Object)acquired);
                    DatafeedManager.this.runningDatafeeds.remove(this.allocationId);
                    FutureUtils.cancel(this.future);
                    DatafeedManager.this.auditor.info(this.datafeed.getJobId(), Messages.getMessage("Datafeed stopped"));
                    this.handler.accept(e);
                    DatafeedManager.this.logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", (Object)source, (Object)this.datafeed.getId(), (Object)this.datafeed.getJobId(), (Object)(acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout.getStringRep() + "] expired"));
                    if (this.autoCloseJob) {
                        this.closeJob();
                    }
                    if (acquired) {
                        this.datafeedJobLock.unlock();
                    }
                    break block10;
                }
                DatafeedManager.this.logger.info("[{}] datafeed [{}] for job [{}] was already stopped", (Object)source, (Object)this.datafeed.getId(), (Object)this.datafeed.getJobId());
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private Long executeLoopBack(long startTime, Long endTime) throws Exception {
            this.datafeedJobLock.lock();
            try {
                if (this.isRunning()) {
                    Long l = this.datafeedJob.runLookBack(startTime, endTime);
                    return l;
                }
                Long l = null;
                return l;
            }
            finally {
                this.datafeedJobLock.unlock();
            }
        }

        private long executeRealTime() throws Exception {
            this.datafeedJobLock.lock();
            try {
                if (this.isRunning()) {
                    long l = this.datafeedJob.runRealtime();
                    return l;
                }
                long l = -1L;
                return l;
            }
            finally {
                this.datafeedJobLock.unlock();
            }
        }

        private void closeJob() {
            DatafeedManager.this.persistentTasksService.waitForPersistentTaskStatus(this.taskId, Objects::isNull, TimeValue.timeValueSeconds((long)20L), new PersistentTasksService.WaitForPersistentTaskStatusListener<StartDatafeedAction.DatafeedParams>(){

                public void onResponse(PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams> PersistentTask2) {
                    CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(Holder.this.datafeed.getJobId());
                    closeJobRequest.setLocal(true);
                    DatafeedManager.this.client.execute((Action)CloseJobAction.INSTANCE, (ActionRequest)closeJobRequest, (ActionListener)new ActionListener<CloseJobAction.Response>(){

                        public void onResponse(CloseJobAction.Response response) {
                            if (!response.isClosed()) {
                                DatafeedManager.this.logger.error("[{}] job close action was not acknowledged", (Object)Holder.this.datafeed.getJobId());
                            }
                        }

                        public void onFailure(Exception e) {
                            DatafeedManager.this.logger.error("[" + Holder.this.datafeed.getJobId() + "] failed to  auto-close job", (Throwable)e);
                        }
                    });
                }

                public void onFailure(Exception e) {
                    DatafeedManager.this.logger.error("Cannot auto close job [" + Holder.this.datafeed.getJobId() + "]", (Throwable)e);
                }
            });
        }
    }
}

