/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.datafeed;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ClientHelper;
import org.elasticsearch.xpack.ml.action.FlushJobAction;
import org.elasticsearch.xpack.ml.action.PostDataAction;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.notifications.Auditor;

class DatafeedJob {
    private static final Logger LOGGER = Loggers.getLogger(DatafeedJob.class);
    private static final int NEXT_TASK_DELAY_MS = 100;
    private final Auditor auditor;
    private final String jobId;
    private final DataDescription dataDescription;
    private final long frequencyMs;
    private final long queryDelayMs;
    private final Client client;
    private final DataExtractorFactory dataExtractorFactory;
    private final Supplier<Long> currentTimeSupplier;
    private volatile long lookbackStartTimeMs;
    private volatile Long lastEndTimeMs;
    private AtomicBoolean running = new AtomicBoolean(true);
    private volatile boolean isIsolated;

    DatafeedJob(String jobId, DataDescription dataDescription, long frequencyMs, long queryDelayMs, DataExtractorFactory dataExtractorFactory, Client client, Auditor auditor, Supplier<Long> currentTimeSupplier, long latestFinalBucketEndTimeMs, long latestRecordTimeMs) {
        this.jobId = jobId;
        this.dataDescription = Objects.requireNonNull(dataDescription);
        this.frequencyMs = frequencyMs;
        this.queryDelayMs = queryDelayMs;
        this.dataExtractorFactory = dataExtractorFactory;
        this.client = client;
        this.auditor = auditor;
        this.currentTimeSupplier = currentTimeSupplier;
        long lastEndTime = Math.max(latestFinalBucketEndTimeMs, latestRecordTimeMs);
        if (lastEndTime > 0L) {
            this.lastEndTimeMs = lastEndTime;
        }
    }

    void isolate() {
        this.isIsolated = true;
    }

    boolean isIsolated() {
        return this.isIsolated;
    }

    Long runLookBack(long startTime, Long endTime) throws Exception {
        this.lookbackStartTimeMs = this.skipToStartTime(startTime);
        Optional<Long> endMs = Optional.ofNullable(endTime);
        long lookbackEnd = endMs.orElse(this.currentTimeSupplier.get() - this.queryDelayMs);
        boolean isLookbackOnly = endMs.isPresent();
        if (lookbackEnd <= this.lookbackStartTimeMs) {
            if (isLookbackOnly) {
                return null;
            }
            this.auditor.info(this.jobId, Messages.getMessage("Datafeed started in real-time"));
            return this.nextRealtimeTimestamp();
        }
        String msg = Messages.getMessage("Datafeed started (from: {0} to: {1}) with frequency [{2}]", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(this.lookbackStartTimeMs), endTime == null ? "real-time" : DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(lookbackEnd), TimeValue.timeValueMillis((long)this.frequencyMs).getStringRep());
        this.auditor.info(this.jobId, msg);
        LOGGER.info("[{}] {}", (Object)this.jobId, (Object)msg);
        FlushJobAction.Request request = new FlushJobAction.Request(this.jobId);
        request.setCalcInterim(true);
        this.run(this.lookbackStartTimeMs, lookbackEnd, request);
        if (this.isRunning() && !this.isIsolated) {
            this.auditor.info(this.jobId, Messages.getMessage("Datafeed lookback completed"));
            LOGGER.info("[{}] Lookback has finished", (Object)this.jobId);
            if (isLookbackOnly) {
                return null;
            }
            this.auditor.info(this.jobId, Messages.getMessage("Datafeed continued in real-time"));
            return this.nextRealtimeTimestamp();
        }
        if (!this.isIsolated) {
            LOGGER.debug("Lookback finished after being stopped");
        }
        return null;
    }

    private long skipToStartTime(long startTime) {
        if (this.lastEndTimeMs != null) {
            if (this.lastEndTimeMs + 1L > startTime) {
                return this.lastEndTimeMs + 1L;
            }
            FlushJobAction.Request request = new FlushJobAction.Request(this.jobId);
            request.setSkipTime(String.valueOf(startTime));
            FlushJobAction.Response flushResponse = this.flushJob(request);
            LOGGER.info("Skipped to time [" + flushResponse.getLastFinalizedBucketEnd().getTime() + "]");
            return flushResponse.getLastFinalizedBucketEnd().getTime();
        }
        return startTime;
    }

    long runRealtime() throws Exception {
        long start = this.lastEndTimeMs == null ? this.lookbackStartTimeMs : Math.max(this.lookbackStartTimeMs, this.lastEndTimeMs + 1L);
        long nowMinusQueryDelay = this.currentTimeSupplier.get() - this.queryDelayMs;
        long end = this.toIntervalStartEpochMs(nowMinusQueryDelay);
        FlushJobAction.Request request = new FlushJobAction.Request(this.jobId);
        request.setCalcInterim(true);
        request.setAdvanceTime(String.valueOf(end));
        this.run(start, end, request);
        return this.nextRealtimeTimestamp();
    }

    public boolean stop() {
        return this.running.compareAndSet(true, false);
    }

    public boolean isRunning() {
        return this.running.get();
    }

    private void run(long start, long end, FlushJobAction.Request flushRequest) throws IOException {
        if (end <= start) {
            return;
        }
        LOGGER.trace("[{}] Searching data in: [{}, {})", (Object)this.jobId, (Object)start, (Object)end);
        AnalysisProblemException error = null;
        long recordCount = 0L;
        DataExtractor dataExtractor = this.dataExtractorFactory.newExtractor(start, end);
        while (dataExtractor.hasNext()) {
            DataCounts counts;
            Optional<InputStream> extractedData;
            if (!(!this.isIsolated && this.isRunning() || dataExtractor.isCancelled())) {
                dataExtractor.cancel();
            }
            if (this.isIsolated) {
                return;
            }
            try {
                extractedData = dataExtractor.next();
            }
            catch (Exception e) {
                LOGGER.debug("[" + this.jobId + "] error while extracting data", (Throwable)e);
                if (e.toString().contains("doc values")) {
                    throw new ExtractionProblemException(this.nextRealtimeTimestamp(), (Throwable)new IllegalArgumentException("One or more fields do not have doc values; please enable doc values for all analysis fields for datafeeds using aggregations"));
                }
                throw new ExtractionProblemException(this.nextRealtimeTimestamp(), (Throwable)e);
            }
            if (this.isIsolated) {
                return;
            }
            if (!extractedData.isPresent()) continue;
            try (InputStream in = extractedData.get();){
                counts = this.postData(in, XContentType.JSON);
                LOGGER.trace("[{}] Processed another {} records", (Object)this.jobId, (Object)counts.getProcessedRecordCount());
            }
            catch (Exception e) {
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                if (this.isIsolated) {
                    return;
                }
                LOGGER.debug("[" + this.jobId + "] error while posting data", (Throwable)e);
                boolean shouldStop = this.isConflictException(e);
                error = new AnalysisProblemException(this.nextRealtimeTimestamp(), shouldStop, e);
                break;
            }
            recordCount += counts.getProcessedRecordCount();
            if (counts.getLatestRecordTimeStamp() == null) continue;
            this.lastEndTimeMs = counts.getLatestRecordTimeStamp().getTime();
        }
        this.lastEndTimeMs = Math.max(this.lastEndTimeMs == null ? 0L : this.lastEndTimeMs, end - 1L);
        LOGGER.debug("[{}] Complete iterating data extractor [{}], [{}], [{}], [{}], [{}]", (Object)this.jobId, error, (Object)recordCount, (Object)this.lastEndTimeMs, (Object)this.isRunning(), (Object)dataExtractor.isCancelled());
        if (error != null) {
            throw error;
        }
        if (this.isRunning() && !this.isIsolated) {
            this.flushJob(flushRequest);
        }
        if (recordCount == 0L) {
            throw new EmptyDataCountException(this.nextRealtimeTimestamp());
        }
    }

    private DataCounts postData(InputStream inputStream, XContentType xContentType) throws IOException {
        PostDataAction.Request request = new PostDataAction.Request(this.jobId);
        request.setDataDescription(this.dataDescription);
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        Streams.copy((InputStream)inputStream, (OutputStream)outputStream);
        request.setContent((BytesReference)new BytesArray(outputStream.toByteArray()), xContentType);
        try (ThreadContext.StoredContext ignore = ClientHelper.stashWithOrigin(this.client.threadPool().getThreadContext(), "ml");){
            PostDataAction.Response response = (PostDataAction.Response)((Object)this.client.execute((Action)PostDataAction.INSTANCE, (ActionRequest)request).actionGet());
            DataCounts dataCounts = response.getDataCounts();
            return dataCounts;
        }
    }

    private boolean isConflictException(Exception e) {
        return e instanceof ElasticsearchStatusException && ((ElasticsearchStatusException)e).status() == RestStatus.CONFLICT;
    }

    private long nextRealtimeTimestamp() {
        long epochMs = this.currentTimeSupplier.get() + this.frequencyMs;
        return this.toIntervalStartEpochMs(epochMs) + this.queryDelayMs + 100L;
    }

    private long toIntervalStartEpochMs(long epochMs) {
        return epochMs / this.frequencyMs * this.frequencyMs;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private FlushJobAction.Response flushJob(FlushJobAction.Request flushRequest) {
        try {
            LOGGER.trace("[" + this.jobId + "] Sending flush request");
            try (ThreadContext.StoredContext ignore = ClientHelper.stashWithOrigin(this.client.threadPool().getThreadContext(), "ml");){
                FlushJobAction.Response response = (FlushJobAction.Response)((Object)this.client.execute((Action)FlushJobAction.INSTANCE, (ActionRequest)flushRequest).actionGet());
                return response;
            }
        }
        catch (Exception e) {
            LOGGER.debug("[" + this.jobId + "] error while flushing job", (Throwable)e);
            boolean shouldStop = this.isConflictException(e);
            throw new AnalysisProblemException(this.nextRealtimeTimestamp(), shouldStop, e);
        }
    }

    Long lastEndTimeMs() {
        return this.lastEndTimeMs;
    }

    static class EmptyDataCountException
    extends RuntimeException {
        final long nextDelayInMsSinceEpoch;

        EmptyDataCountException(long nextDelayInMsSinceEpoch) {
            this.nextDelayInMsSinceEpoch = nextDelayInMsSinceEpoch;
        }
    }

    static class ExtractionProblemException
    extends RuntimeException {
        final long nextDelayInMsSinceEpoch;

        ExtractionProblemException(long nextDelayInMsSinceEpoch, Throwable cause) {
            super(cause);
            this.nextDelayInMsSinceEpoch = nextDelayInMsSinceEpoch;
        }
    }

    static class AnalysisProblemException
    extends RuntimeException {
        final boolean shouldStop;
        final long nextDelayInMsSinceEpoch;

        AnalysisProblemException(long nextDelayInMsSinceEpoch, boolean shouldStop, Throwable cause) {
            super(cause);
            this.shouldStop = shouldStop;
            this.nextDelayInMsSinceEpoch = nextDelayInMsSinceEpoch;
        }
    }
}

