/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.elasticsearch;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.ElasticsearchException;
import org.apache.nifi.elasticsearch.SearchResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.ElasticsearchRestProcessor;
import org.apache.nifi.processors.elasticsearch.api.AggregationResultsFormat;
import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy;
import org.apache.nifi.processors.elasticsearch.api.SearchResultsFormat;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;

public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters>
extends AbstractProcessor
implements ElasticsearchRestProcessor {
    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("All original flowfiles that don't cause an error to occur go to this relationship.").build();
    public static final Relationship REL_HITS = new Relationship.Builder().name("hits").description("Search hits are routed to this relationship.").build();
    public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations").description("Aggregations are routed to this relationship.").build();
    public static final PropertyDescriptor SEARCH_RESULTS_SPLIT = new PropertyDescriptor.Builder().name("Search Results Split").description("Output a flowfile containing all hits or one flowfile for each individual hit.").allowableValues(ResultOutputStrategy.getNonPaginatedResponseOutputStrategies()).defaultValue((DescribedValue)ResultOutputStrategy.PER_RESPONSE).required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final PropertyDescriptor SEARCH_RESULTS_FORMAT = new PropertyDescriptor.Builder().name("Search Results Format").description("Format of Hits output.").allowableValues(SearchResultsFormat.class).defaultValue((DescribedValue)SearchResultsFormat.FULL).required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final PropertyDescriptor AGGREGATION_RESULTS_SPLIT = new PropertyDescriptor.Builder().name("Aggregation Results Split").description("Output a flowfile containing all aggregations or one flowfile for each individual aggregation.").allowableValues(ResultOutputStrategy.getNonPaginatedResponseOutputStrategies()).defaultValue((DescribedValue)ResultOutputStrategy.PER_RESPONSE).required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final PropertyDescriptor AGGREGATION_RESULTS_FORMAT = new PropertyDescriptor.Builder().name("Aggregation Results Format").description("Format of Aggregation output.").allowableValues(AggregationResultsFormat.class).defaultValue((DescribedValue)AggregationResultsFormat.FULL).required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final PropertyDescriptor OUTPUT_NO_HITS = new PropertyDescriptor.Builder().name("Output No Hits").description("Output a \"" + REL_HITS.getName() + "\" flowfile even if no hits found for query. If true, an empty \"" + REL_HITS.getName() + "\" flowfile will be output even if \"" + REL_AGGREGATIONS.getName() + "\" are output.").allowableValues(new String[]{"true", "false"}).defaultValue("false").required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    private static final Set<Relationship> relationships = Set.of(REL_ORIGINAL, REL_FAILURE, REL_HITS, REL_AGGREGATIONS);
    static final List<PropertyDescriptor> queryPropertyDescriptors = List.of(QUERY_DEFINITION_STYLE, QUERY, QUERY_CLAUSE, SIZE, SORT, AGGREGATIONS, FIELDS, SCRIPT_FIELDS, QUERY_ATTRIBUTE, INDEX, TYPE, MAX_JSON_FIELD_STRING_LENGTH, CLIENT_SERVICE, SEARCH_RESULTS_SPLIT, SEARCH_RESULTS_FORMAT, AGGREGATION_RESULTS_SPLIT, AGGREGATION_RESULTS_FORMAT, OUTPUT_NO_HITS);
    ResultOutputStrategy hitStrategy;
    private SearchResultsFormat hitFormat;
    private ResultOutputStrategy aggregationStrategy;
    private AggregationResultsFormat aggregationFormat;
    private boolean outputNoHits;
    ObjectMapper mapper;
    final AtomicReference<ElasticSearchClientService> clientService = new AtomicReference<Object>(null);

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return queryPropertyDescriptors;
    }

    @Override
    public void migrateProperties(PropertyConfiguration config) {
        ElasticsearchRestProcessor.super.migrateProperties(config);
        config.renameProperty("el-rest-split-up-hits", SEARCH_RESULTS_SPLIT.getName());
        config.renameProperty("el-rest-format-hits", SEARCH_RESULTS_FORMAT.getName());
        config.renameProperty("el-rest-split-up-aggregations", AGGREGATION_RESULTS_SPLIT.getName());
        config.renameProperty("el-rest-format-aggregations", AGGREGATION_RESULTS_FORMAT.getName());
        config.renameProperty("el-rest-output-no-hits", OUTPUT_NO_HITS.getName());
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        return new PropertyDescriptor.Builder().name(propertyDescriptorName).required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dynamic(true).build();
    }

    @Override
    public boolean isIndexNotExistSuccessful() {
        return false;
    }

    boolean isOutputNoHits() {
        return this.outputNoHits;
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) {
        this.clientService.set((ElasticSearchClientService)context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
        this.hitStrategy = (ResultOutputStrategy)context.getProperty(SEARCH_RESULTS_SPLIT).asAllowableValue(ResultOutputStrategy.class);
        this.hitFormat = (SearchResultsFormat)context.getProperty(SEARCH_RESULTS_FORMAT).asAllowableValue(SearchResultsFormat.class);
        this.aggregationStrategy = context.getProperty(AGGREGATION_RESULTS_SPLIT).isSet() ? (ResultOutputStrategy)context.getProperty(AGGREGATION_RESULTS_SPLIT).asAllowableValue(ResultOutputStrategy.class) : null;
        this.aggregationFormat = context.getProperty(AGGREGATION_RESULTS_FORMAT).isSet() ? (AggregationResultsFormat)context.getProperty(AGGREGATION_RESULTS_FORMAT).asAllowableValue(AggregationResultsFormat.class) : null;
        this.outputNoHits = context.getProperty(OUTPUT_NO_HITS).asBoolean();
        this.mapper = this.buildObjectMapper(context);
    }

    @OnStopped
    public void onStopped() {
        this.clientService.set(null);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) {
        FlowFile input = null;
        if (context.hasIncomingConnection() && (input = session.get()) == null && context.hasNonLoopConnection()) {
            return;
        }
        try {
            Q queryJsonParameters = this.buildJsonQueryParameters(input, context, session);
            if (queryJsonParameters == null) {
                context.yield();
                return;
            }
            ArrayList<FlowFile> hitsFlowFiles = new ArrayList<FlowFile>();
            StopWatch stopWatch = new StopWatch(true);
            SearchResponse response = this.doQuery(queryJsonParameters, hitsFlowFiles, session, context, input, stopWatch);
            this.finishQuery(input, queryJsonParameters, session, context, response);
        }
        catch (ElasticsearchException ese) {
            String msg = String.format("Encountered a server-side problem with Elasticsearch. %s", ese.isElastic() ? "Routing to retry." : "Routing to failure");
            this.getLogger().error(msg, (Throwable)ese);
            if (input != null) {
                session.penalize(input);
                input = session.putAttribute(input, "elasticsearch.query.error", ese.getMessage());
                session.transfer(input, ese.isElastic() ? REL_RETRY : REL_FAILURE);
            }
        }
        catch (Exception ex) {
            this.getLogger().error("Could not query documents.", (Throwable)ex);
            if (input != null) {
                input = session.putAttribute(input, "elasticsearch.query.error", ex.getMessage());
                session.transfer(input, REL_FAILURE);
            }
            context.yield();
        }
    }

    abstract Q buildJsonQueryParameters(FlowFile var1, ProcessContext var2, ProcessSession var3) throws IOException;

    void populateCommonJsonQueryParameters(Q queryJsonParameters, FlowFile input, ProcessContext context, ProcessSession session) throws IOException {
        String query = this.getQuery(input, context, session, this.mapper);
        String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
        String type = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
        String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet() ? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue() : null;
        ((JsonQueryParameters)queryJsonParameters).setQuery(query);
        ((JsonQueryParameters)queryJsonParameters).setIndex(index);
        ((JsonQueryParameters)queryJsonParameters).setType(type);
        ((JsonQueryParameters)queryJsonParameters).setQueryAttr(queryAttr);
    }

    abstract SearchResponse doQuery(Q var1, List<FlowFile> var2, ProcessSession var3, ProcessContext var4, FlowFile var5, StopWatch var6) throws IOException;

    abstract void finishQuery(FlowFile var1, Q var2, ProcessSession var3, ProcessContext var4, SearchResponse var5) throws IOException;

    FlowFile createChildFlowFile(ProcessSession session, FlowFile parent) {
        return parent != null ? session.create(parent) : session.create();
    }

    private FlowFile writeAggregationFlowFileContents(String name, Integer number, String json, ProcessSession session, FlowFile aggFlowFile, Map<String, String> attributes) {
        FlowFile ff = session.write(aggFlowFile, out -> out.write(json.getBytes()));
        HashMap<String, String> latestAttributes = new HashMap<String, String>(attributes);
        if (name != null) {
            latestAttributes.put("aggregation.name", name);
        }
        if (number != null) {
            latestAttributes.put("aggregation.number", number.toString());
        }
        return session.putAllAttributes(ff, latestAttributes);
    }

    private void handleAggregations(Map<String, Object> aggregations, ProcessSession session, FlowFile parent, Map<String, String> attributes, String transitUri, StopWatch stopWatch) throws IOException {
        if (aggregations != null && !aggregations.isEmpty()) {
            Map<String, Object> formattedAggregations = this.formatAggregations(aggregations);
            ArrayList<FlowFile> aggsFlowFiles = new ArrayList<FlowFile>();
            if (this.aggregationStrategy == ResultOutputStrategy.PER_HIT) {
                int aggCount = 0;
                for (Map.Entry<String, Object> agg : formattedAggregations.entrySet()) {
                    FlowFile aggFlowFile = this.createChildFlowFile(session, parent);
                    String aggJson = this.mapper.writeValueAsString(agg.getValue());
                    aggsFlowFiles.add(this.writeAggregationFlowFileContents(agg.getKey(), ++aggCount, aggJson, session, aggFlowFile, attributes));
                }
            } else {
                FlowFile aggFlowFile = this.createChildFlowFile(session, parent);
                String json = this.mapper.writeValueAsString(formattedAggregations);
                aggsFlowFiles.add(this.writeAggregationFlowFileContents(null, null, json, session, aggFlowFile, attributes));
            }
            if (!aggsFlowFiles.isEmpty()) {
                session.transfer(aggsFlowFiles, REL_AGGREGATIONS);
                aggsFlowFiles.forEach(ff -> session.getProvenanceReporter().receive(ff, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)));
            }
        }
    }

    private Map<String, Object> formatAggregations(Map<String, Object> aggregations) {
        Map formattedAggregations;
        switch (this.aggregationFormat) {
            case METADATA_ONLY: {
                formattedAggregations = new LinkedHashMap<String, Object>(aggregations);
                formattedAggregations.forEach((k, v) -> ((Map)v).remove("buckets"));
                break;
            }
            case BUCKETS_ONLY: {
                formattedAggregations = aggregations.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((Map)e.getValue()).get("buckets"), (k1, k2) -> k1, LinkedHashMap::new));
                break;
            }
            default: {
                formattedAggregations = aggregations;
            }
        }
        return formattedAggregations;
    }

    FlowFile writeHitFlowFile(int count, String json, ProcessSession session, FlowFile hitFlowFile, Map<String, String> attributes) {
        FlowFile ff = session.write(hitFlowFile, out -> out.write(json.getBytes()));
        attributes.put("hit.count", Integer.toString(count));
        return session.putAllAttributes(ff, attributes);
    }

    List<FlowFile> handleHits(List<Map<String, Object>> hits, boolean newQuery, Q queryJsonParameters, ProcessSession session, FlowFile parent, Map<String, String> attributes, List<FlowFile> hitsFlowFiles, String transitUri, StopWatch stopWatch) throws IOException {
        if (hits != null && !hits.isEmpty()) {
            List<Map<String, Object>> formattedHits = this.formatHits(hits);
            if (this.hitStrategy == ResultOutputStrategy.PER_HIT) {
                for (Map<String, Object> hit : formattedHits) {
                    FlowFile hitFlowFile = this.createChildFlowFile(session, parent);
                    String json = this.mapper.writeValueAsString(hit);
                    hitsFlowFiles.add(this.writeHitFlowFile(1, json, session, hitFlowFile, attributes));
                }
            } else {
                FlowFile hitFlowFile = this.createChildFlowFile(session, parent);
                String json = this.mapper.writeValueAsString(formattedHits);
                hitsFlowFiles.add(this.writeHitFlowFile(formattedHits.size(), json, session, hitFlowFile, attributes));
            }
        } else if (newQuery && this.outputNoHits) {
            FlowFile hitFlowFile = this.createChildFlowFile(session, parent);
            hitsFlowFiles.add(this.writeHitFlowFile(0, "", session, hitFlowFile, attributes));
        }
        this.transferResultFlowFiles(session, hitsFlowFiles, transitUri, stopWatch);
        return hitsFlowFiles;
    }

    List<Map<String, Object>> formatHits(List<Map<String, Object>> hits) {
        List<Map<String, Object>> formattedHits;
        switch (this.hitFormat) {
            case METADATA_ONLY: {
                formattedHits = hits.stream().map(HashMap::new).collect(Collectors.toList());
                formattedHits.forEach(h -> h.remove("_source"));
                break;
            }
            case SOURCE_ONLY: {
                formattedHits = hits.stream().map(h -> h.getOrDefault("_source", Collections.emptyMap())).toList();
                break;
            }
            default: {
                formattedHits = hits;
            }
        }
        return formattedHits;
    }

    private void transferResultFlowFiles(ProcessSession session, List<FlowFile> hitsFlowFiles, String transitUri, StopWatch stopWatch) {
        if (!hitsFlowFiles.isEmpty()) {
            session.transfer(hitsFlowFiles, REL_HITS);
            hitsFlowFiles.forEach(ff -> session.getProvenanceReporter().receive(ff, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)));
            hitsFlowFiles.clear();
        }
    }

    List<FlowFile> handleResponse(SearchResponse response, boolean newQuery, Q queryJsonParameters, List<FlowFile> hitsFlowFiles, ProcessSession session, FlowFile input, StopWatch stopWatch) throws IOException {
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
        if (StringUtils.isNotBlank((String)((JsonQueryParameters)queryJsonParameters).getQueryAttr())) {
            attributes.put(((JsonQueryParameters)queryJsonParameters).getQueryAttr(), ((JsonQueryParameters)queryJsonParameters).getQuery());
        }
        String transitUri = this.clientService.get().getTransitUrl(((JsonQueryParameters)queryJsonParameters).getIndex(), ((JsonQueryParameters)queryJsonParameters).getType());
        if (newQuery) {
            this.handleAggregations(response.getAggregations(), session, input, attributes, transitUri, stopWatch);
        }
        List<FlowFile> resultFlowFiles = this.handleHits(response.getHits(), newQuery, queryJsonParameters, session, input, attributes, hitsFlowFiles, transitUri, stopWatch);
        ((JsonQueryParameters)queryJsonParameters).addHitCount(response.getHits().size());
        return resultFlowFiles;
    }
}

