/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.elasticsearch;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.behavior.DynamicProperties;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.ElasticsearchException;
import org.apache.nifi.elasticsearch.ElasticsearchRequestOptions;
import org.apache.nifi.elasticsearch.IndexOperationRequest;
import org.apache.nifi.elasticsearch.IndexOperationResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.migration.RelationshipConfiguration;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.AbstractPutElasticsearch;
import org.apache.nifi.processors.elasticsearch.ElasticsearchRestProcessor;
import org.apache.nifi.processors.elasticsearch.PutElasticsearchJson;
import org.apache.nifi.processors.elasticsearch.api.BulkOperation;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.DateTimeFormatValidator;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.field.StandardFieldConverterRegistry;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;

@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@SupportsBatching
@Tags(value={"json", "elasticsearch", "elasticsearch7", "elasticsearch8", "elasticsearch9", "put", "index", "record"})
@CapabilityDescription(value="A record-aware Elasticsearch put processor that uses the official Elastic REST client libraries. Each Record within the FlowFile is converted into a document to be sent to the Elasticsearch _bulk APi. Multiple documents can be batched into each Request sent to Elasticsearch. Each document's Bulk operation can be configured using Record Path expressions.")
@WritesAttributes(value={@WritesAttribute(attribute="elasticsearch.put.error", description="The error message if there is an issue parsing the FlowFile records, sending the parsed documents to Elasticsearch or parsing the Elasticsearch response."), @WritesAttribute(attribute="elasticsearch.put.error.count", description="The number of records that generated errors in the Elasticsearch _bulk API."), @WritesAttribute(attribute="elasticsearch.put.success.count", description="The number of records that were successfully processed by the Elasticsearch _bulk API."), @WritesAttribute(attribute="elasticsearch.bulk.error", description="The _bulk response if there was an error during processing the record within Elasticsearch.")})
@SeeAlso(value={PutElasticsearchJson.class})
@DynamicProperties(value={@DynamicProperty(name="The name of the Bulk request header", value="A Record Path expression to retrieve the Bulk request header value", expressionLanguageScope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description="Prefix: BULK: - adds the specified property name/value as a Bulk request header in the Elasticsearch Bulk API body used for processing. If the Record Path expression results in a null or blank value, the Bulk header will be omitted for the document operation. These parameters will override any matching parameters in the _bulk request body."), @DynamicProperty(name="The name of the HTTP request header", value="A Record Path expression to retrieve the HTTP request header value", expressionLanguageScope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description="Prefix: HEADER: - adds the specified property name/value as a HTTP request header in the Elasticsearch request. If the Record Path expression results in a null or blank value, the HTTP request header will be omitted."), @DynamicProperty(name="The name of a URL query parameter to add", value="The value of the URL query parameter", expressionLanguageScope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description="Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. These parameters will override any matching parameters in the _bulk request body")})
@SystemResourceConsideration(resource=SystemResource.MEMORY, description="The Batch of Records will be stored in memory until the bulk operation is performed.")
public class PutElasticsearchRecord
extends AbstractPutElasticsearch {
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("put-es-record-reader").displayName("Record Reader").description("The record reader to use for reading incoming records from flowfiles.").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().fromPropertyDescriptor(AbstractPutElasticsearch.BATCH_SIZE).description("The number of records to send over in a single batch.").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor AT_TIMESTAMP = new PropertyDescriptor.Builder().name("put-es-record-at-timestamp").displayName("@timestamp Value").description("The value to use as the @timestamp field (required for Elasticsearch Data Streams)").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(false).build();
    static final PropertyDescriptor INDEX_OP_RECORD_PATH = new PropertyDescriptor.Builder().name("put-es-record-index-op-path").displayName("Index Operation Record Path").description("A record path expression to retrieve the Index Operation field for use with Elasticsearch. If left blank the Index Operation will be determined using the main Index Operation property.").addValidator((Validator)new RecordPathValidator()).required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor ID_RECORD_PATH = new PropertyDescriptor.Builder().name("put-es-record-id-path").displayName("ID Record Path").description("A record path expression to retrieve the ID field for use with Elasticsearch. If left blank the ID will be automatically generated by Elasticsearch.").addValidator((Validator)new RecordPathValidator()).required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor RETAIN_ID_FIELD = new PropertyDescriptor.Builder().name("put-es-record-retain-id-field").displayName("Retain ID (Record Path)").description("Whether to retain the existing field used as the ID Record Path.").addValidator(StandardValidators.BOOLEAN_VALIDATOR).defaultValue("false").required(false).dependsOn(ID_RECORD_PATH, new AllowableValue[0]).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor INDEX_RECORD_PATH = new PropertyDescriptor.Builder().name("put-es-record-index-record-path").displayName("Index Record Path").description("A record path expression to retrieve the index field for use with Elasticsearch. If left blank the index will be determined using the main index property.").addValidator((Validator)new RecordPathValidator()).required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor TYPE_RECORD_PATH = new PropertyDescriptor.Builder().name("put-es-record-type-record-path").displayName("Type Record Path").description("A record path expression to retrieve the type field for use with Elasticsearch. If left blank the type will be determined using the main type property.").addValidator((Validator)new RecordPathValidator()).required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor AT_TIMESTAMP_RECORD_PATH = new PropertyDescriptor.Builder().name("put-es-record-at-timestamp-path").displayName("@timestamp Record Path").description("A RecordPath pointing to a field in the record(s) that contains the @timestamp for the document. If left blank the @timestamp will be determined using the main @timestamp property").addValidator((Validator)new RecordPathValidator()).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor SCRIPT_RECORD_PATH = new PropertyDescriptor.Builder().name("put-es-record-script-path").displayName("Script Record Path").description("A RecordPath pointing to a field in the record(s) that contains the script for the document update/upsert. Only applies to Update/Upsert operations. Field must be Map-type compatible (e.g. a Map or a Record) or a String parsable into a JSON Object").addValidator((Validator)new RecordPathValidator()).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor SCRIPTED_UPSERT_RECORD_PATH = new PropertyDescriptor.Builder().name("put-es-record-scripted-upsert-path").displayName("Scripted Upsert Record Path").description("A RecordPath pointing to a field in the record(s) that contains the scripted_upsert boolean flag. Whether to add the scripted_upsert flag to the Upsert Operation. Forces Elasticsearch to execute the Script whether or not the document exists, defaults to false. If the Upsert Document provided (from FlowFile content) will be empty, but sure to set the " + CLIENT_SERVICE.getDisplayName() + " controller service's " + ElasticSearchClientService.SUPPRESS_NULLS.getDisplayName() + " to " + ElasticSearchClientService.NEVER_SUPPRESS.getDisplayName() + " or no \"upsert\" doc will be, included in the request to Elasticsearch and the operation will not create a new document for the script to execute against, resulting in a \"not_found\" error").addValidator((Validator)new RecordPathValidator()).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor DYNAMIC_TEMPLATES_RECORD_PATH = new PropertyDescriptor.Builder().name("put-es-record-dynamic-templates-path").displayName("Dynamic Templates Record Path").description("A RecordPath pointing to a field in the record(s) that contains the dynamic_templates for the document. Field must be Map-type compatible (e.g. a Map or Record) or a String parsable into a JSON Object. Requires Elasticsearch 7+").addValidator((Validator)new RecordPathValidator()).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor RETAIN_AT_TIMESTAMP_FIELD = new PropertyDescriptor.Builder().name("put-es-record-retain-at-timestamp-field").displayName("Retain @timestamp (Record Path)").description("Whether to retain the existing field used as the @timestamp Record Path.").addValidator(StandardValidators.BOOLEAN_VALIDATOR).defaultValue("false").required(false).dependsOn(AT_TIMESTAMP_RECORD_PATH, new AllowableValue[0]).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor RESULT_RECORD_WRITER = new PropertyDescriptor.Builder().name("put-es-record-error-writer").displayName("Result Record Writer").description("The response from Elasticsearch will be examined for failed records and the failed records will be written to a record set with this record writer service and sent to the \"" + REL_ERRORS.getName() + "\" relationship. Successful records will be written to a record set with this record writer service and sent to the \"" + REL_SUCCESSFUL.getName() + "\" relationship.").identifiesControllerService(RecordSetWriterFactory.class).addValidator(Validator.VALID).required(true).build();
    static final PropertyDescriptor GROUP_BULK_ERRORS_BY_TYPE = new PropertyDescriptor.Builder().name("put-es-record-bulk-error-groups").displayName("Group Results by Bulk Error Type").description("The errored records written to the \"" + REL_ERRORS.getName() + "\" relationship will be grouped by error type and the error related to the first record within the FlowFile added to the FlowFile as \"elasticsearch.bulk.error\". If \"" + NOT_FOUND_IS_SUCCESSFUL.getDisplayName() + "\" is \"false\" then records associated with \"not_found\" Elasticsearch document responses will also be send to the \"" + REL_ERRORS.getName() + "\" relationship.").defaultValue("false").allowableValues(new String[]{"true", "false"}).addValidator(StandardValidators.BOOLEAN_VALIDATOR).required(false).dependsOn(RESULT_RECORD_WRITER, new AllowableValue[0]).build();
    static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder().name("put-es-record-at-timestamp-date-format").displayName("Date Format").description("Specifies the format to use when writing Date fields. If not specified, the default format '" + RecordFieldType.DATE.getDefaultFormat() + "' is used. If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy for a two-digit month, followed by a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/25/2017).").expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator((Validator)new DateTimeFormatValidator()).required(false).build();
    static final PropertyDescriptor TIME_FORMAT = new PropertyDescriptor.Builder().name("put-es-record-at-timestamp-time-format").displayName("Time Format").description("Specifies the format to use when writing Time fields. If not specified, the default format '" + RecordFieldType.TIME.getDefaultFormat() + "' is used. If specified, the value must match the Java Simple Date Format (for example, HH:mm:ss for a two-digit hour in 24-hour format, followed by a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 18:04:15).").expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator((Validator)new DateTimeFormatValidator()).required(false).build();
    static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder().name("put-es-record-at-timestamp-timestamp-format").displayName("Timestamp Format").description("Specifies the format to use when writing Timestamp fields. If not specified, the default format '" + RecordFieldType.TIMESTAMP.getDefaultFormat() + "' is used. If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy HH:mm:ss for a two-digit month, followed by a two-digit day, followed by a four-digit year, all separated by '/' characters; and then followed by a two-digit hour in 24-hour format, followed by a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 01/25/2017 18:04:15).").expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator((Validator)new DateTimeFormatValidator()).required(false).build();
    static final List<PropertyDescriptor> DESCRIPTORS = List.of(INDEX_OP, INDEX, TYPE, AT_TIMESTAMP, MAX_JSON_FIELD_STRING_LENGTH, CLIENT_SERVICE, RECORD_READER, BATCH_SIZE, ID_RECORD_PATH, RETAIN_ID_FIELD, INDEX_OP_RECORD_PATH, INDEX_RECORD_PATH, TYPE_RECORD_PATH, AT_TIMESTAMP_RECORD_PATH, RETAIN_AT_TIMESTAMP_FIELD, SCRIPT_RECORD_PATH, SCRIPTED_UPSERT_RECORD_PATH, DYNAMIC_TEMPLATES_RECORD_PATH, DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES, OUTPUT_ERROR_RESPONSES, RESULT_RECORD_WRITER, NOT_FOUND_IS_SUCCESSFUL, GROUP_BULK_ERRORS_BY_TYPE);
    static final Set<Relationship> BASE_RELATIONSHIPS = Set.of(REL_ORIGINAL, REL_FAILURE, REL_RETRY, REL_ERRORS, REL_SUCCESSFUL);
    private static final String OUTPUT_TYPE_SUCCESS = "success";
    private static final String OUTPUT_TYPE_ERROR = "error";
    private static final String OUTPUT_TYPE_UNKNOWN_EXCEPTION = "unknown_exception";
    private static final String OUTPUT_TYPE_NOT_FOUND = "not_found";
    private RecordPathCache recordPathCache;
    private RecordReaderFactory readerFactory;
    private RecordSetWriterFactory writerFactory;
    private boolean groupBulkErrors;
    private volatile String dateFormat;
    private volatile String timeFormat;
    private volatile String timestampFormat;

    @Override
    Set<Relationship> getBaseRelationships() {
        return BASE_RELATIONSHIPS;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return DESCRIPTORS;
    }

    public void migrateProperties(PropertyConfiguration config) {
        super.migrateProperties(config);
        config.renameProperty("put-es-record-not_found-is-error", AbstractPutElasticsearch.NOT_FOUND_IS_SUCCESSFUL.getName());
        if (config.getPropertyValue(RESULT_RECORD_WRITER).isEmpty()) {
            String resultRecordWriterId = config.createControllerService("org.apache.nifi.json.JsonRecordSetWriter", Collections.emptyMap());
            config.setProperty(RESULT_RECORD_WRITER, resultRecordWriterId);
        }
    }

    public void migrateRelationships(RelationshipConfiguration config) {
        super.migrateRelationships(config);
        config.renameRelationship(OUTPUT_TYPE_SUCCESS, AbstractPutElasticsearch.REL_ORIGINAL.getName());
        config.renameRelationship("successful_records", AbstractPutElasticsearch.REL_SUCCESSFUL.getName());
    }

    @Override
    @OnScheduled
    public void onScheduled(ProcessContext context) {
        super.onScheduled(context);
        this.readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        this.recordPathCache = new RecordPathCache(16);
        this.writerFactory = (RecordSetWriterFactory)context.getProperty(RESULT_RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        this.notFoundIsSuccessful = context.getProperty(NOT_FOUND_IS_SUCCESSFUL).asBoolean();
        this.groupBulkErrors = context.getProperty(GROUP_BULK_ERRORS_BY_TYPE).asBoolean();
        this.dateFormat = context.getProperty(DATE_FORMAT).evaluateAttributeExpressions().getValue();
        if (this.dateFormat == null) {
            this.dateFormat = RecordFieldType.DATE.getDefaultFormat();
        }
        this.timeFormat = context.getProperty(TIME_FORMAT).evaluateAttributeExpressions().getValue();
        if (this.timeFormat == null) {
            this.timeFormat = RecordFieldType.TIME.getDefaultFormat();
        }
        this.timestampFormat = context.getProperty(TIMESTAMP_FORMAT).evaluateAttributeExpressions().getValue();
        if (this.timestampFormat == null) {
            this.timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
        }
    }

    public void onTrigger(ProcessContext context, ProcessSession session) {
        FlowFile input = session.get();
        if (input == null) {
            return;
        }
        IndexOperationParameters indexOperationParameters = new IndexOperationParameters(context, input);
        ArrayList<FlowFile> resultRecords = new ArrayList<FlowFile>();
        final AtomicLong errorRecords = new AtomicLong(0L);
        final AtomicLong successfulRecords = new AtomicLong(0L);
        StopWatch stopWatch = new StopWatch(true);
        HashSet<String> indices = new HashSet<String>();
        HashSet<String> types = new HashSet<String>();
        int batches = 0;
        try (InputStream inStream = session.read(input);
             RecordReader reader = this.readerFactory.createRecordReader(input, inStream, this.getLogger());){
            Record record;
            PushBackRecordSet recordSet = new PushBackRecordSet(reader.createRecordSet());
            ArrayList<IndexOperationRequest> operationList = new ArrayList<IndexOperationRequest>();
            ArrayList<Record> originals = new ArrayList<Record>();
            while ((record = recordSet.next()) != null) {
                this.addOperation(operationList, record, indexOperationParameters, indices, types);
                originals.add(record);
                if (operationList.size() != indexOperationParameters.getBatchSize() && recordSet.isAnotherRecord()) continue;
                this.operate(operationList, originals, reader, session, input, indexOperationParameters, resultRecords, errorRecords, successfulRecords, batches);
                ++batches;
            }
            if (!operationList.isEmpty()) {
                this.operate(operationList, originals, reader, session, input, indexOperationParameters, resultRecords, errorRecords, successfulRecords, batches);
                ++batches;
            }
        }
        catch (ElasticsearchException ese) {
            String msg = String.format("Encountered a server-side problem with Elasticsearch. %s", ese.isElastic() ? "Routing to retry." : "Routing to failure");
            this.getLogger().error(msg, (Throwable)ese);
            Relationship rel = ese.isElastic() ? REL_RETRY : REL_FAILURE;
            this.transferFlowFilesOnException((Exception)((Object)ese), rel, session, true, input);
            this.removeResultRecordFlowFiles(resultRecords, session);
            return;
        }
        catch (IOException | SchemaNotFoundException ex) {
            this.getLogger().warn("Could not log Elasticsearch operation errors nor determine which documents errored.", ex);
            this.transferFlowFilesOnException((Exception)ex, REL_FAILURE, session, true, input);
            this.removeResultRecordFlowFiles(resultRecords, session);
            return;
        }
        catch (Exception ex) {
            this.getLogger().error("Could not index documents.", (Throwable)ex);
            this.transferFlowFilesOnException(ex, REL_FAILURE, session, false, input);
            context.yield();
            this.removeResultRecordFlowFiles(resultRecords, session);
            return;
        }
        stopWatch.stop();
        session.getProvenanceReporter().send(input, ((ElasticSearchClientService)this.clientService.get()).getTransitUrl(String.join((CharSequence)",", indices), types.isEmpty() ? null : String.join((CharSequence)",", types)), String.format(Locale.getDefault(), "%d Elasticsearch _bulk operation batch(es) [%d error(s), %d success(es)]", batches, errorRecords.get(), successfulRecords.get()), stopWatch.getDuration(TimeUnit.MILLISECONDS));
        input = session.putAllAttributes(input, (Map)new HashMap<String, String>(){
            {
                this.put("elasticsearch.put.error.count", String.valueOf(errorRecords.get()));
                this.put("elasticsearch.put.success.count", String.valueOf(successfulRecords.get()));
            }
        });
        session.transfer(input, REL_ORIGINAL);
    }

    private void addOperation(List<IndexOperationRequest> operationList, Record record, IndexOperationParameters indexOperationParameters, Set<String> indices, Set<String> types) {
        String index = this.getFromRecordPath(record, indexOperationParameters.getIndexPath(), true, indexOperationParameters.getDefaultIndex(), false);
        indices.add(index);
        String type = this.getFromRecordPath(record, indexOperationParameters.getTypePath(), true, indexOperationParameters.getDefaultType(), false);
        if (StringUtils.isNotBlank((String)type)) {
            types.add(type);
        }
        String op = this.getFromRecordPath(record, indexOperationParameters.getIndexOpPath(), true, indexOperationParameters.getDefaultIndexOp(), false);
        IndexOperationRequest.Operation indexOp = IndexOperationRequest.Operation.forValue((String)op);
        String id = this.getFromRecordPath(record, indexOperationParameters.getIdPath(), true, null, indexOperationParameters.isRetainId());
        Object atTimestamp = this.getTimestampFromRecordPath(record, indexOperationParameters.getAtTimestampPath(), indexOperationParameters.getDefaultAtTimestamp(), indexOperationParameters.isRetainTimestamp());
        Map<String, Object> script = this.getMapFromRecordPath(record, indexOperationParameters.getScriptPath());
        boolean scriptedUpsert = Boolean.parseBoolean(this.getFromRecordPath(record, indexOperationParameters.getScriptedUpsertPath(), false, "false", false));
        Map<String, Object> dynamicTemplates = this.getMapFromRecordPath(record, indexOperationParameters.getDynamicTypesPath());
        HashMap<String, String> bulkHeaderFields = new HashMap<String, String>(indexOperationParameters.getBulkHeaderRecordPaths().size(), 1.0f);
        for (Map.Entry<String, RecordPath> entry : indexOperationParameters.getBulkHeaderRecordPaths().entrySet()) {
            bulkHeaderFields.put(entry.getKey(), this.getFromRecordPath(record, entry.getValue(), false, null, false));
        }
        Map contentMap = (Map)DataTypeUtils.convertRecordFieldtoObject((Object)record, (DataType)RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
        this.formatDateTimeFields(contentMap, record);
        if (atTimestamp != null) {
            contentMap.putIfAbsent("@timestamp", atTimestamp);
        }
        operationList.add(new IndexOperationRequest(index, type, id, contentMap, indexOp, script, scriptedUpsert, dynamicTemplates, bulkHeaderFields));
    }

    private void operate(List<IndexOperationRequest> operationList, List<Record> originals, RecordReader reader, ProcessSession session, FlowFile input, IndexOperationParameters indexOperationParameters, List<FlowFile> resultRecords, AtomicLong erroredRecords, AtomicLong successfulRecords, int batch) throws IOException, SchemaNotFoundException, MalformedRecordException {
        BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema());
        ResponseDetails responseDetails = this.indexDocuments(bundle, session, input, indexOperationParameters, batch);
        successfulRecords.getAndAdd(responseDetails.successCount());
        erroredRecords.getAndAdd(responseDetails.errorCount());
        resultRecords.addAll(responseDetails.outputs().values().stream().map(Output::getFlowFile).toList());
        operationList.clear();
        originals.clear();
    }

    private void removeResultRecordFlowFiles(List<FlowFile> results, ProcessSession session) {
        for (FlowFile flowFile : results) {
            session.remove(flowFile);
        }
        results.clear();
    }

    private ResponseDetails indexDocuments(BulkOperation bundle, ProcessSession session, FlowFile input, IndexOperationParameters indexOperationParameters, int batch) throws IOException, SchemaNotFoundException, MalformedRecordException {
        IndexOperationResponse response = ((ElasticSearchClientService)this.clientService.get()).bulk(bundle.getOperationList(), indexOperationParameters.getElasticsearchRequestOptions());
        Map<Integer, Map<String, Object>> errors = this.findElasticsearchResponseErrors(response);
        if (!errors.isEmpty()) {
            this.handleElasticsearchDocumentErrors(errors, session, input);
        }
        int numErrors = errors.size();
        int numSuccessful = response.getItems() == null ? 0 : response.getItems().size() - numErrors;
        HashMap<String, Output> outputs = new HashMap<String, Output>();
        try (InputStream inStream = session.read(input);
             RecordReader inputReader = this.readerFactory.createRecordReader(input, inStream, this.getLogger());){
            if (numErrors > 0) {
                for (int r = 0; r < batch * indexOperationParameters.getBatchSize(); ++r) {
                    inputReader.nextRecord();
                }
            }
            for (int o2 = 0; o2 < bundle.getOriginalRecords().size(); ++o2) {
                RecordSchema recordSchema;
                Record outputRecord;
                String type;
                Map<String, Object> error;
                Relationship relationship;
                if (numErrors > 0 && errors.containsKey(o2)) {
                    relationship = REL_ERRORS;
                    error = errors.get(o2);
                    type = this.groupBulkErrors ? (this.isElasticsearchNotFound().test(error) ? OUTPUT_TYPE_NOT_FOUND : this.getErrorType(error)) : OUTPUT_TYPE_ERROR;
                    outputRecord = inputReader.nextRecord();
                    recordSchema = outputRecord.getSchema();
                } else {
                    relationship = REL_SUCCESSFUL;
                    error = null;
                    type = OUTPUT_TYPE_SUCCESS;
                    outputRecord = bundle.getOriginalRecords().get(o2);
                    recordSchema = bundle.getSchema();
                    if (numErrors > 0) {
                        inputReader.nextRecord();
                    }
                }
                Output output = this.getOutputByType(outputs, type, session, relationship, input, recordSchema);
                output.write(outputRecord, error);
            }
            for (Output output : outputs.values()) {
                output.transfer(session);
            }
        }
        catch (IOException | SchemaNotFoundException | MalformedRecordException ex) {
            this.getLogger().error("Unable to write error/successful records", ex);
            outputs.values().forEach(o -> {
                try {
                    o.remove(session);
                }
                catch (IOException ioe) {
                    this.getLogger().warn("Error closing RecordSetWriter for FlowFile", (Throwable)ioe);
                }
            });
            throw ex;
        }
        return new ResponseDetails(outputs, numSuccessful, numErrors);
    }

    private String getErrorType(Map<String, Object> errorInner) {
        try {
            return (String)((Map)errorInner.get(OUTPUT_TYPE_ERROR)).get("type");
        }
        catch (Exception ex) {
            return OUTPUT_TYPE_UNKNOWN_EXCEPTION;
        }
    }

    private Output getOutputByType(Map<String, Output> outputs, String type, ProcessSession session, Relationship relationship, FlowFile input, RecordSchema schema) throws IOException, SchemaNotFoundException {
        Output output = outputs.get(type);
        if (output == null) {
            output = new Output(session, this.writerFactory, this.getLogger(), schema, input, relationship, !OUTPUT_TYPE_ERROR.equals(type) && !OUTPUT_TYPE_SUCCESS.equals(type) ? type : null);
            outputs.put(type, output);
        }
        return output;
    }

    private void formatDateTimeFields(Map<String, Object> contentMap, Record record) {
        for (RecordField recordField : record.getSchema().getFields()) {
            DataType chosenDataType;
            String format;
            Object value = contentMap.get(recordField.getFieldName());
            if (value == null || (format = this.determineDateFormat((chosenDataType = recordField.getDataType().getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType((Object)record.getValue(recordField), (ChoiceDataType)((ChoiceDataType)recordField.getDataType())) : recordField.getDataType()).getFieldType())) == null) continue;
            Object formattedValue = this.coerceStringToLong(recordField.getFieldName(), (String)StandardFieldConverterRegistry.getRegistry().getFieldConverter(String.class).convertField(value, Optional.of(format), recordField.getFieldName()));
            contentMap.put(recordField.getFieldName(), formattedValue);
        }
    }

    private String getFromRecordPath(Record record, RecordPath path, boolean strictString, String fallback, boolean retain) {
        if (path == null) {
            return fallback;
        }
        RecordPathResult result = path.evaluate(record);
        Optional value = result.getSelectedFields().findFirst();
        if (value.isPresent() && ((FieldValue)value.get()).getValue() != null && DataTypeUtils.isStringTypeCompatible((Object)((FieldValue)value.get()).getValue())) {
            FieldValue fieldValue = (FieldValue)value.get();
            if (strictString && !fieldValue.getField().getDataType().getFieldType().equals((Object)RecordFieldType.STRING)) {
                throw new ProcessException(String.format("Field referenced by %s must be a string", path.getPath()));
            }
            if (!retain) {
                fieldValue.updateValue(null);
            }
            return fieldValue.toString();
        }
        return fallback;
    }

    private Map<String, Object> getMapFromRecordPath(Record record, RecordPath path) {
        if (path == null) {
            return Collections.emptyMap();
        }
        RecordPathResult result = path.evaluate(record);
        Optional value = result.getSelectedFields().findFirst();
        if (value.isPresent() && ((FieldValue)value.get()).getValue() != null) {
            Map map;
            FieldValue fieldValue = (FieldValue)value.get();
            if (DataTypeUtils.isMapTypeCompatible((Object)fieldValue.getValue())) {
                map = (Map)DataTypeUtils.convertRecordFieldtoObject((Object)fieldValue.getValue(), (DataType)fieldValue.getField().getDataType());
            } else {
                try {
                    map = (Map)this.mapper.readValue(fieldValue.getValue().toString(), Map.class);
                }
                catch (JsonProcessingException jpe) {
                    this.getLogger().error("Unable to parse field {} as Map", new Object[]{path.getPath(), jpe});
                    throw new ProcessException(String.format("Field referenced by %s must be Map-type compatible or a String parsable into a JSON Object", path.getPath()));
                }
            }
            fieldValue.updateValue(null);
            return map;
        }
        return Collections.emptyMap();
    }

    private Object getTimestampFromRecordPath(Record record, RecordPath path, String fallback, boolean retain) {
        if (path == null) {
            return this.coerceStringToLong("@timestamp", fallback);
        }
        RecordPathResult result = path.evaluate(record);
        Optional value = result.getSelectedFields().findFirst();
        if (value.isPresent() && ((FieldValue)value.get()).getValue() != null) {
            FieldValue fieldValue = (FieldValue)value.get();
            DataType dataType = fieldValue.getField().getDataType();
            String fieldName = fieldValue.getField().getFieldName();
            DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType)((ChoiceDataType)dataType)) : dataType;
            Object coercedValue = DataTypeUtils.convertType((Object)fieldValue.getValue(), (DataType)chosenDataType, (String)fieldName);
            if (coercedValue == null) {
                return null;
            }
            Object returnValue = switch (chosenDataType.getFieldType()) {
                case RecordFieldType.DATE, RecordFieldType.TIME, RecordFieldType.TIMESTAMP -> {
                    String format = this.determineDateFormat(chosenDataType.getFieldType());
                    yield this.coerceStringToLong(fieldName, (String)StandardFieldConverterRegistry.getRegistry().getFieldConverter(String.class).convertField(coercedValue, Optional.ofNullable(format), path.getPath()));
                }
                case RecordFieldType.LONG -> DataTypeUtils.toLong((Object)coercedValue, (String)fieldName);
                case RecordFieldType.INT, RecordFieldType.BYTE, RecordFieldType.SHORT -> DataTypeUtils.toInteger((Object)coercedValue, (String)fieldName);
                case RecordFieldType.CHAR, RecordFieldType.STRING -> this.coerceStringToLong(fieldName, coercedValue.toString());
                case RecordFieldType.BIGINT -> coercedValue;
                default -> throw new ProcessException(String.format("Cannot use %s field referenced by %s as @timestamp.", chosenDataType, path.getPath()));
            };
            if (!retain) {
                fieldValue.updateValue(null);
            }
            return returnValue;
        }
        return this.coerceStringToLong("@timestamp", fallback);
    }

    private String determineDateFormat(RecordFieldType recordFieldType) {
        return switch (recordFieldType) {
            case RecordFieldType.DATE -> this.dateFormat;
            case RecordFieldType.TIME -> this.timeFormat;
            case RecordFieldType.TIMESTAMP -> this.timestampFormat;
            default -> null;
        };
    }

    private Object coerceStringToLong(String fieldName, String stringValue) {
        return DataTypeUtils.isLongTypeCompatible((Object)stringValue) ? DataTypeUtils.toLong((Object)stringValue, (String)fieldName) : stringValue;
    }

    private class IndexOperationParameters {
        private final String defaultIndexOp;
        private final String defaultIndex;
        private final String defaultType;
        private final String defaultAtTimestamp;
        private final RecordPath indexOpPath;
        private final RecordPath idPath;
        private final RecordPath indexPath;
        private final RecordPath typePath;
        private final RecordPath atTimestampPath;
        private final RecordPath scriptPath;
        private final RecordPath scriptedUpsertPath;
        private final RecordPath dynamicTypesPath;
        private final ElasticsearchRequestOptions elasticsearchRequestOptions;
        private final Map<String, RecordPath> bulkHeaderRecordPaths;
        private final boolean retainId;
        private final boolean retainTimestamp;
        private final int batchSize;

        IndexOperationParameters(ProcessContext context, FlowFile input) {
            this.defaultIndexOp = context.getProperty(AbstractPutElasticsearch.INDEX_OP).evaluateAttributeExpressions(input).getValue();
            this.defaultIndex = context.getProperty(ElasticsearchRestProcessor.INDEX).evaluateAttributeExpressions(input).getValue();
            this.defaultType = context.getProperty(ElasticsearchRestProcessor.TYPE).evaluateAttributeExpressions(input).getValue();
            this.defaultAtTimestamp = context.getProperty(AT_TIMESTAMP).evaluateAttributeExpressions(input).getValue();
            this.indexOpPath = this.compileRecordPathFromProperty(context, INDEX_OP_RECORD_PATH, input);
            this.idPath = this.compileRecordPathFromProperty(context, ID_RECORD_PATH, input);
            this.indexPath = this.compileRecordPathFromProperty(context, INDEX_RECORD_PATH, input);
            this.typePath = this.compileRecordPathFromProperty(context, TYPE_RECORD_PATH, input);
            this.atTimestampPath = this.compileRecordPathFromProperty(context, AT_TIMESTAMP_RECORD_PATH, input);
            this.scriptPath = this.compileRecordPathFromProperty(context, SCRIPT_RECORD_PATH, input);
            this.scriptedUpsertPath = this.compileRecordPathFromProperty(context, SCRIPTED_UPSERT_RECORD_PATH, input);
            this.dynamicTypesPath = this.compileRecordPathFromProperty(context, DYNAMIC_TEMPLATES_RECORD_PATH, input);
            Map dynamicProperties = PutElasticsearchRecord.this.getRequestParametersFromDynamicProperties(context, input);
            this.elasticsearchRequestOptions = new ElasticsearchRequestOptions(PutElasticsearchRecord.this.getRequestURLParameters(dynamicProperties), PutElasticsearchRecord.this.getRequestHeadersFromDynamicProperties(context, input));
            Map<String, String> bulkHeaderParameterPaths = PutElasticsearchRecord.this.getBulkHeaderParameters(dynamicProperties);
            this.bulkHeaderRecordPaths = new HashMap<String, RecordPath>(bulkHeaderParameterPaths.size(), 1.0f);
            for (Map.Entry<String, String> entry : bulkHeaderParameterPaths.entrySet()) {
                RecordPath rp;
                if (!StringUtils.isNotBlank((String)entry.getValue()) || (rp = PutElasticsearchRecord.this.recordPathCache.getCompiled(entry.getValue())) == null) continue;
                this.bulkHeaderRecordPaths.put(entry.getKey(), rp);
            }
            this.retainId = this.idPath != null && context.getProperty(RETAIN_ID_FIELD).evaluateAttributeExpressions(input).asBoolean() != false;
            this.retainTimestamp = this.atTimestampPath != null && context.getProperty(RETAIN_AT_TIMESTAMP_FIELD).evaluateAttributeExpressions(input).asBoolean() != false;
            this.batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger();
        }

        private RecordPath compileRecordPathFromProperty(ProcessContext context, PropertyDescriptor property, FlowFile input) {
            String recordPathProperty = context.getProperty(property).evaluateAttributeExpressions(input).getValue();
            return StringUtils.isNotBlank((String)recordPathProperty) ? PutElasticsearchRecord.this.recordPathCache.getCompiled(recordPathProperty) : null;
        }

        public String getDefaultIndexOp() {
            return this.defaultIndexOp;
        }

        public String getDefaultIndex() {
            return this.defaultIndex;
        }

        public String getDefaultType() {
            return this.defaultType;
        }

        public String getDefaultAtTimestamp() {
            return this.defaultAtTimestamp;
        }

        public RecordPath getIndexOpPath() {
            return this.indexOpPath;
        }

        public RecordPath getIdPath() {
            return this.idPath;
        }

        public RecordPath getIndexPath() {
            return this.indexPath;
        }

        public RecordPath getTypePath() {
            return this.typePath;
        }

        public RecordPath getAtTimestampPath() {
            return this.atTimestampPath;
        }

        public RecordPath getScriptPath() {
            return this.scriptPath;
        }

        public RecordPath getScriptedUpsertPath() {
            return this.scriptedUpsertPath;
        }

        public RecordPath getDynamicTypesPath() {
            return this.dynamicTypesPath;
        }

        public ElasticsearchRequestOptions getElasticsearchRequestOptions() {
            return this.elasticsearchRequestOptions;
        }

        public Map<String, RecordPath> getBulkHeaderRecordPaths() {
            return this.bulkHeaderRecordPaths;
        }

        public boolean isRetainId() {
            return this.retainId;
        }

        public boolean isRetainTimestamp() {
            return this.retainTimestamp;
        }

        public int getBatchSize() {
            return this.batchSize;
        }
    }

    private record ResponseDetails(Map<String, Output> outputs, int successCount, int errorCount) {
    }

    private class Output {
        private FlowFile flowFile;
        private final RecordSetWriter writer;
        private final Relationship relationship;
        private final String errorType;
        private String exampleError;
        private int numRecords;

        public Output(ProcessSession session, RecordSetWriterFactory writerFactory, ComponentLog logger, RecordSchema schema, FlowFile input, Relationship relationship, String errorType) throws IOException, SchemaNotFoundException {
            this.flowFile = session.create(input);
            this.relationship = relationship;
            this.errorType = errorType;
            OutputStream outputStream = session.write(this.flowFile);
            this.writer = writerFactory.createWriter(logger, schema, outputStream, this.flowFile);
            this.writer.beginRecordSet();
        }

        public FlowFile getFlowFile() {
            return this.flowFile;
        }

        public void write(Record record, Map<String, Object> error) throws IOException {
            ++this.numRecords;
            this.writer.write(record);
            if (this.errorType != null && this.exampleError == null && error != null) {
                try {
                    this.exampleError = PutElasticsearchRecord.this.mapper.writeValueAsString(error);
                }
                catch (JsonProcessingException e) {
                    this.exampleError = String.format("{\"error\": {\"type\": \"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}", e.getMessage().replace("\"", "\\\""));
                }
            }
        }

        private void close() throws IOException {
            if (this.writer != null) {
                this.writer.close();
            }
        }

        public void transfer(ProcessSession session) throws IOException {
            this.writer.finishRecordSet();
            this.close();
            if (this.numRecords > 0) {
                HashMap<String, String> attributes = new HashMap<String, String>(2, 1.0f);
                attributes.put("record.count", String.valueOf(this.numRecords));
                if (this.errorType != null) {
                    attributes.put("elasticsearch.bulk.error", this.exampleError);
                }
                this.flowFile = session.putAllAttributes(this.flowFile, attributes);
                session.transfer(this.flowFile, this.relationship);
            } else {
                session.remove(this.flowFile);
            }
        }

        public void remove(ProcessSession session) throws IOException {
            this.close();
            session.remove(this.flowFile);
        }
    }
}

