/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.elasticsearch;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.ElasticsearchRequestOptions;
import org.apache.nifi.elasticsearch.SearchResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.AbstractJsonQueryElasticsearch;
import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
import org.apache.nifi.processors.elasticsearch.api.PaginationType;
import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;

public abstract class AbstractPaginatedJsonQueryElasticsearch
extends AbstractJsonQueryElasticsearch<PaginatedJsonQueryParameters> {
    public static final PropertyDescriptor SEARCH_RESULTS_SPLIT = new PropertyDescriptor.Builder().fromPropertyDescriptor(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT).description("Output a flowfile containing all hits or one flowfile for each individual hit or one flowfile containing all hits from all paged responses.").allowableValues(ResultOutputStrategy.class).build();
    public static final PropertyDescriptor PAGINATION_TYPE = new PropertyDescriptor.Builder().name("Pagination Type").description("Pagination method to use. Not all types are available for all Elasticsearch versions, check the Elasticsearch docs to confirm which are applicable and recommended for your service.").allowableValues(PaginationType.class).defaultValue((DescribedValue)PaginationType.SCROLL).required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final PropertyDescriptor PAGINATION_KEEP_ALIVE = new PropertyDescriptor.Builder().name("Pagination Keep Alive").description("Pagination \"keep_alive\" period. Period Elasticsearch will keep the scroll/pit cursor alive in between requests (this is not the time expected for all pages to be returned, but the maximum allowed time for requests between page retrievals).").required(true).dependsOn(PAGINATION_TYPE, (DescribedValue)PaginationType.SCROLL, new DescribedValue[]{PaginationType.POINT_IN_TIME}).defaultValue("10 mins").expressionLanguageSupported(ExpressionLanguageScope.NONE).addValidator(StandardValidators.createTimePeriodValidator((long)1L, (TimeUnit)TimeUnit.SECONDS, (long)24L, (TimeUnit)TimeUnit.HOURS)).build();
    static final List<PropertyDescriptor> paginatedPropertyDescriptors = Stream.concat(queryPropertyDescriptors.stream().map(pd -> AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT.equals(pd) ? SEARCH_RESULTS_SPLIT : pd), Stream.of(PAGINATION_TYPE, PAGINATION_KEEP_ALIVE)).toList();
    private ObjectWriter writer;
    PaginationType paginationType;

    @Override
    @OnScheduled
    public void onScheduled(ProcessContext context) {
        super.onScheduled(context);
        this.paginationType = (PaginationType)context.getProperty(PAGINATION_TYPE).asAllowableValue(PaginationType.class);
        this.writer = this.mapper.writer().withRootValueSeparator("\n");
    }

    @Override
    public void migrateProperties(PropertyConfiguration config) {
        super.migrateProperties(config);
        config.renameProperty("el-rest-pagination-type", PAGINATION_TYPE.getName());
        config.renameProperty("el-rest-pagination-keep-alive", PAGINATION_KEEP_ALIVE.getName());
    }

    @Override
    SearchResponse doQuery(PaginatedJsonQueryParameters paginatedJsonQueryParameters, List<FlowFile> hitsFlowFiles, ProcessSession session, ProcessContext context, FlowFile input, StopWatch stopWatch) throws IOException {
        SearchResponse response = null;
        do {
            this.resetQueryParamsIfRequired(paginatedJsonQueryParameters, context);
            boolean newQuery = paginatedJsonQueryParameters.getPageCount() == 0;
            String queryJson = this.updateQueryJson(newQuery, paginatedJsonQueryParameters, context, input);
            Map requestParameters = this.getRequestParametersFromDynamicProperties(context, input);
            Map requestHeaders = this.getRequestHeadersFromDynamicProperties(context, input);
            if (!newQuery && this.paginationType == PaginationType.SCROLL) {
                if (!requestParameters.isEmpty()) {
                    this.getLogger().warn("Elasticsearch _scroll API does not accept query parameters, ignoring dynamic properties {}", new Object[]{requestParameters.keySet()});
                }
                response = ((ElasticSearchClientService)this.clientService.get()).scroll(queryJson, new ElasticsearchRequestOptions(null, requestHeaders));
            } else {
                if (this.paginationType == PaginationType.SCROLL) {
                    requestParameters.put("scroll", paginatedJsonQueryParameters.getKeepAlive());
                }
                response = ((ElasticSearchClientService)this.clientService.get()).search(queryJson, this.paginationType == PaginationType.POINT_IN_TIME ? null : paginatedJsonQueryParameters.getIndex(), paginatedJsonQueryParameters.getType(), new ElasticsearchRequestOptions(requestParameters, requestHeaders));
                paginatedJsonQueryParameters.setPitId(response.getPitId());
                paginatedJsonQueryParameters.setSearchAfter(response.getSearchAfter());
            }
            paginatedJsonQueryParameters.setScrollId(response.getScrollId());
            if (newQuery && input != null) {
                session.getProvenanceReporter().send(input, ((ElasticSearchClientService)this.clientService.get()).getTransitUrl(paginatedJsonQueryParameters.getIndex(), paginatedJsonQueryParameters.getType()), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
            }
            this.updateQueryParameters(paginatedJsonQueryParameters, response);
            hitsFlowFiles = this.handleResponse(response, newQuery, paginatedJsonQueryParameters, hitsFlowFiles, session, input, stopWatch);
        } while (!response.getHits().isEmpty() && (input != null || this.hitStrategy == ResultOutputStrategy.PER_QUERY));
        if (response.getHits().isEmpty()) {
            this.getLogger().debug("No more results for paginated query, clearing Elasticsearch resources");
            this.clearElasticsearchState(context, response, input);
        }
        return response;
    }

    @Override
    PaginatedJsonQueryParameters buildJsonQueryParameters(FlowFile input, ProcessContext context, ProcessSession session) throws IOException {
        PaginatedJsonQueryParameters paginatedJsonQueryParameters = new PaginatedJsonQueryParameters();
        this.populateCommonJsonQueryParameters(paginatedJsonQueryParameters, input, context, session);
        if (this.paginationType.hasExpiry()) {
            paginatedJsonQueryParameters.setKeepAlive(context.getProperty(PAGINATION_KEEP_ALIVE).asTimePeriod(TimeUnit.SECONDS) + "s");
        }
        return paginatedJsonQueryParameters;
    }

    abstract void resetQueryParamsIfRequired(PaginatedJsonQueryParameters var1, ProcessContext var2) throws IOException;

    abstract String getScrollId(ProcessContext var1, SearchResponse var2) throws IOException;

    abstract String getPitId(ProcessContext var1, SearchResponse var2) throws IOException;

    private void prepareNextPageQuery(ObjectNode queryJson, PaginatedJsonQueryParameters paginatedJsonQueryParameters) throws IOException {
        if (this.paginationType == PaginationType.SCROLL) {
            queryJson.removeAll().put("scroll_id", paginatedJsonQueryParameters.getScrollId());
            if (StringUtils.isNotBlank((String)paginatedJsonQueryParameters.getKeepAlive())) {
                queryJson.put("scroll", paginatedJsonQueryParameters.getKeepAlive());
            }
        } else {
            queryJson.set("search_after", (JsonNode)this.mapper.readValue(paginatedJsonQueryParameters.getSearchAfter(), ArrayNode.class));
            if (queryJson.has("aggs")) {
                this.getLogger().debug("Removing \"aggs\" from non-initial paged query");
                queryJson.remove("aggs");
            }
        }
    }

    private String updateQueryJson(boolean newQuery, PaginatedJsonQueryParameters paginatedJsonQueryParameters, ProcessContext context, FlowFile input) throws IOException {
        ObjectNode queryJson = (ObjectNode)this.mapper.readValue(paginatedJsonQueryParameters.getQuery(), ObjectNode.class);
        if (!newQuery) {
            this.prepareNextPageQuery(queryJson, paginatedJsonQueryParameters);
        } else if (!(this.paginationType != PaginationType.POINT_IN_TIME && this.paginationType != PaginationType.SEARCH_AFTER || queryJson.has("sort"))) {
            throw new IllegalArgumentException("Query using pit/search_after must contain a \"sort\" field");
        }
        if (this.paginationType == PaginationType.POINT_IN_TIME) {
            String queryPitId = newQuery ? ((ElasticSearchClientService)this.clientService.get()).initialisePointInTime(paginatedJsonQueryParameters.getIndex(), paginatedJsonQueryParameters.getKeepAlive(), new ElasticsearchRequestOptions(null, this.getRequestHeadersFromDynamicProperties(context, input))) : paginatedJsonQueryParameters.getPitId();
            ObjectNode pit = JsonNodeFactory.instance.objectNode().put("id", queryPitId);
            if (StringUtils.isNotBlank((String)paginatedJsonQueryParameters.getKeepAlive())) {
                pit.put("keep_alive", paginatedJsonQueryParameters.getKeepAlive());
            }
            queryJson.set("pit", (JsonNode)pit);
        }
        return this.mapper.writeValueAsString((Object)queryJson);
    }

    private FlowFile writeCombinedHitFlowFile(int count, List<Map<String, Object>> hits, ProcessSession session, FlowFile hitFlowFile, Map<String, String> attributes, boolean append) {
        FlowFile ff;
        if (append) {
            ff = session.append(hitFlowFile, out -> out.write(10));
            ff = session.append(ff, out -> this.writer.writeValues(out).writeAll((Collection)hits));
        } else {
            ff = session.write(hitFlowFile, out -> this.writer.writeValues(out).writeAll((Collection)hits));
        }
        attributes.put("hit.count", Integer.toString(count));
        return session.putAllAttributes(ff, attributes);
    }

    private void combineHits(List<Map<String, Object>> hits, PaginatedJsonQueryParameters paginatedJsonQueryParameters, ProcessSession session, FlowFile parent, Map<String, String> attributes, List<FlowFile> hitsFlowFiles, boolean newQuery) {
        if (hits != null && !hits.isEmpty()) {
            boolean append = !hitsFlowFiles.isEmpty();
            FlowFile hitFlowFile = !hitsFlowFiles.isEmpty() ? hitsFlowFiles.removeFirst() : this.createChildFlowFile(session, parent);
            hitsFlowFiles.add(this.writeCombinedHitFlowFile(paginatedJsonQueryParameters.getHitCount() + hits.size(), hits, session, hitFlowFile, attributes, append));
        } else if (this.isOutputNoHits() && newQuery) {
            FlowFile hitFlowFile = this.createChildFlowFile(session, parent);
            hitsFlowFiles.add(this.writeHitFlowFile(0, "", session, hitFlowFile, attributes));
        }
    }

    @Override
    List<FlowFile> handleHits(List<Map<String, Object>> hits, boolean newQuery, PaginatedJsonQueryParameters paginatedJsonQueryParameters, ProcessSession session, FlowFile parent, Map<String, String> attributes, List<FlowFile> hitsFlowFiles, String transitUri, StopWatch stopWatch) throws IOException {
        attributes.put("page.number", Integer.toString(paginatedJsonQueryParameters.getPageCount()));
        if (this.hitStrategy == ResultOutputStrategy.PER_QUERY) {
            List<Map<String, Object>> formattedHits = this.formatHits(hits);
            this.combineHits(formattedHits, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles, newQuery);
            if (!hitsFlowFiles.isEmpty() && (hits == null || hits.isEmpty())) {
                session.transfer(hitsFlowFiles, REL_HITS);
                hitsFlowFiles.forEach(ff -> session.getProvenanceReporter().receive(ff, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)));
                hitsFlowFiles.clear();
            }
        } else {
            super.handleHits(hits, newQuery, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles, transitUri, stopWatch);
        }
        return hitsFlowFiles;
    }

    void updateQueryParameters(PaginatedJsonQueryParameters paginatedJsonQueryParameters, SearchResponse response) {
        paginatedJsonQueryParameters.incrementPageCount();
        paginatedJsonQueryParameters.setFinished(response.getHits().isEmpty());
        if (this.paginationType.hasExpiry()) {
            String keepAliveDuration = "PT" + paginatedJsonQueryParameters.getKeepAlive();
            paginatedJsonQueryParameters.setPageExpirationTimestamp(String.valueOf(Instant.now().plus(Duration.parse(keepAliveDuration)).toEpochMilli()));
        }
    }

    void clearElasticsearchState(ProcessContext context, SearchResponse response, FlowFile input) {
        try {
            String pitId;
            Map requestHeaders = this.getRequestHeadersFromDynamicProperties(context, input);
            if (this.paginationType == PaginationType.SCROLL) {
                String scrollId = this.getScrollId(context, response);
                if (StringUtils.isNotBlank((String)scrollId)) {
                    ((ElasticSearchClientService)this.clientService.get()).deleteScroll(scrollId, new ElasticsearchRequestOptions(null, requestHeaders));
                }
            } else if (this.paginationType == PaginationType.POINT_IN_TIME && StringUtils.isNotBlank((String)(pitId = this.getPitId(context, response)))) {
                ((ElasticSearchClientService)this.clientService.get()).deletePointInTime(pitId, new ElasticsearchRequestOptions(null, requestHeaders));
            }
        }
        catch (Exception ex) {
            this.getLogger().warn("Error while cleaning up Elasticsearch pagination resources, ignoring", (Throwable)ex);
        }
    }
}

