/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.datafeed.extractor.chunked;

import java.io.IOException;
import java.io.InputStream;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.search.aggregations.metrics.min.Min;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.ExtractorUtils;
import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorContext;

public class ChunkedDataExtractor
implements DataExtractor {
    private static final Logger LOGGER = Loggers.getLogger(ChunkedDataExtractor.class);
    private static final String EARLIEST_TIME = "earliest_time";
    private static final String LATEST_TIME = "latest_time";
    private static final long MIN_CHUNK_SPAN = 60000L;
    private final Client client;
    private final DataExtractorFactory dataExtractorFactory;
    private final ChunkedDataExtractorContext context;
    private long currentStart;
    private long currentEnd;
    private long chunkSpan;
    private boolean isCancelled;
    private DataExtractor currentExtractor;

    public ChunkedDataExtractor(Client client, DataExtractorFactory dataExtractorFactory, ChunkedDataExtractorContext context) {
        this.client = Objects.requireNonNull(client);
        this.dataExtractorFactory = Objects.requireNonNull(dataExtractorFactory);
        this.context = Objects.requireNonNull(context);
        this.currentStart = context.start;
        this.currentEnd = context.start;
        this.isCancelled = false;
    }

    @Override
    public boolean hasNext() {
        boolean currentHasNext;
        boolean bl = currentHasNext = this.currentExtractor != null && this.currentExtractor.hasNext();
        if (this.isCancelled()) {
            return currentHasNext;
        }
        return currentHasNext || this.currentEnd < this.context.end;
    }

    @Override
    public Optional<InputStream> next() throws IOException {
        if (!this.hasNext()) {
            throw new NoSuchElementException();
        }
        if (this.currentExtractor == null) {
            this.setUpChunkedSearch();
        }
        return this.getNextStream();
    }

    private void setUpChunkedSearch() throws IOException {
        DataSummary dataSummary = this.requestDataSummary();
        if (dataSummary.totalHits > 0L) {
            this.currentEnd = this.currentStart = dataSummary.earliestTime;
            this.chunkSpan = this.context.chunkSpan == null ? dataSummary.estimateChunk() : this.context.chunkSpan.getMillis();
            LOGGER.debug("Chunked search configured:  totalHits = {}, dataTimeSpread = {} ms, chunk span = {} ms", (Object)dataSummary.totalHits, (Object)dataSummary.getDataTimeSpread(), (Object)this.chunkSpan);
        } else {
            this.currentEnd = this.context.end;
        }
    }

    private DataSummary requestDataSummary() throws IOException {
        SearchRequestBuilder searchRequestBuilder = SearchAction.INSTANCE.newRequestBuilder((ElasticsearchClient)this.client).setSize(0).setIndices(this.context.indexes).setTypes(this.context.types).setQuery(ExtractorUtils.wrapInTimeRangeQuery(this.context.query, this.context.timeField, this.currentStart, this.context.end)).addAggregation((AggregationBuilder)AggregationBuilders.min((String)EARLIEST_TIME).field(this.context.timeField)).addAggregation((AggregationBuilder)AggregationBuilders.max((String)LATEST_TIME).field(this.context.timeField));
        SearchResponse response = this.executeSearchRequest(searchRequestBuilder);
        ExtractorUtils.checkSearchWasSuccessful(this.context.jobId, response);
        Aggregations aggregations = response.getAggregations();
        long earliestTime = 0L;
        long latestTime = 0L;
        long totalHits = response.getHits().totalHits();
        if (totalHits > 0L) {
            Min min = (Min)aggregations.get(EARLIEST_TIME);
            earliestTime = (long)min.getValue();
            Max max = (Max)aggregations.get(LATEST_TIME);
            latestTime = (long)max.getValue();
        }
        return new DataSummary(earliestTime, latestTime, totalHits);
    }

    protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) {
        return (SearchResponse)searchRequestBuilder.get();
    }

    private Optional<InputStream> getNextStream() throws IOException {
        while (this.hasNext()) {
            Optional<InputStream> nextStream;
            boolean isNewSearch = false;
            if (this.currentExtractor == null || !this.currentExtractor.hasNext()) {
                this.advanceTime();
                isNewSearch = true;
            }
            if ((nextStream = this.currentExtractor.next()).isPresent()) {
                return nextStream;
            }
            if (!isNewSearch || !this.hasNext()) continue;
            this.setUpChunkedSearch();
        }
        return Optional.empty();
    }

    private void advanceTime() {
        this.currentStart = this.currentEnd;
        this.currentEnd = Math.min(this.currentStart + this.chunkSpan, this.context.end);
        this.currentExtractor = this.dataExtractorFactory.newExtractor(this.currentStart, this.currentEnd);
        LOGGER.trace("advances time to [{}, {})", (Object)this.currentStart, (Object)this.currentEnd);
    }

    @Override
    public boolean isCancelled() {
        return this.isCancelled;
    }

    @Override
    public void cancel() {
        if (this.currentExtractor != null) {
            this.currentExtractor.cancel();
        }
        this.isCancelled = true;
    }

    private class DataSummary {
        private long earliestTime;
        private long latestTime;
        private long totalHits;

        private DataSummary(long earliestTime, long latestTime, long totalHits) {
            this.earliestTime = earliestTime;
            this.latestTime = latestTime;
            this.totalHits = totalHits;
        }

        private long getDataTimeSpread() {
            return this.latestTime - this.earliestTime;
        }

        private long estimateChunk() {
            long dataTimeSpread = this.getDataTimeSpread();
            if (this.totalHits <= 0L || dataTimeSpread <= 0L) {
                return ((ChunkedDataExtractor)ChunkedDataExtractor.this).context.end - ChunkedDataExtractor.this.currentEnd;
            }
            long estimatedChunk = 10L * ((long)((ChunkedDataExtractor)ChunkedDataExtractor.this).context.scrollSize * this.getDataTimeSpread()) / this.totalHits;
            return Math.max(estimatedChunk, 60000L);
        }
    }
}

