/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.script;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.script.RecordCounts;
import org.apache.nifi.processors.script.ScriptEvaluator;
import org.apache.nifi.processors.script.ScriptRunner;
import org.apache.nifi.processors.script.ScriptedRecordProcessor;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;

@SideEffectFree
@Tags(value={"record", "partition", "script", "groovy", "segment", "split", "group", "organize"})
@CapabilityDescription(value="Receives Record-oriented data (i.e., data that can be read by the configured Record Reader) and evaluates the user provided script against each record in the incoming flow file. Each record is then grouped with other records sharing the same partition and a FlowFile is created for each groups of records. Two records shares the same partition if the evaluation of the script results the same return value for both. Those will be considered as part of the same partition.")
@Restricted(restrictions={@Restriction(requiredPermission=RequiredPermission.EXECUTE_CODE, explanation="Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")})
@WritesAttributes(value={@WritesAttribute(attribute="partition", description="The partition of the outgoing flow file. If the script indicates that the partition has a null value, the attribute will be set to the literal string \"<null partition>\" (without quotes). Otherwise, the attribute is set to the String representation of whatever value is returned by the script."), @WritesAttribute(attribute="mime.type", description="Sets the mime.type attribute to the MIME Type specified by the Record Writer"), @WritesAttribute(attribute="record.count", description="The number of records within the flow file."), @WritesAttribute(attribute="record.error.message", description="This attribute provides on failure the error message encountered by the Reader or Writer."), @WritesAttribute(attribute="fragment.index", description="A one-up number that indicates the ordering of the partitioned FlowFiles that were created from a single parent FlowFile"), @WritesAttribute(attribute="fragment.count", description="The number of partitioned FlowFiles generated from the parent FlowFile")})
@SeeAlso(classNames={"org.apache.nifi.processors.script.ScriptedTransformRecord", "org.apache.nifi.processors.script.ScriptedValidateRecord", "org.apache.nifi.processors.script.ScriptedFilterRecord"})
public class ScriptedPartitionRecord
extends ScriptedRecordProcessor {
    static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles that are successfully partitioned will be routed to this relationship").build();
    static final Relationship RELATIONSHIP_ORIGINAL = new Relationship.Builder().name("original").description("Once all records in an incoming FlowFile have been partitioned, the original FlowFile is routed to this relationship.").build();
    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile cannot be partitioned from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship").build();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(RELATIONSHIP_ORIGINAL, RELATIONSHIP_SUCCESS, RELATIONSHIP_FAILURE);

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        boolean success;
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        ScriptRunner scriptRunner = this.pollScriptRunner();
        try {
            ScriptEvaluator evaluator;
            try {
                ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
                evaluator = this.createEvaluator(scriptEngine, flowFile);
            }
            catch (ScriptException se) {
                if (this.getLogger().isDebugEnabled()) {
                    this.getLogger().debug("Script as executed by NiFi with preloads {}", new Object[]{scriptRunner.getScript()});
                }
                this.getLogger().error("Failed to initialize script engine", (Throwable)se);
                session.transfer(flowFile, RELATIONSHIP_FAILURE);
                this.offerScriptRunner(scriptRunner);
                return;
            }
            success = this.partition(context, session, flowFile, evaluator);
        }
        finally {
            this.offerScriptRunner(scriptRunner);
        }
        session.transfer(flowFile, success ? RELATIONSHIP_ORIGINAL : RELATIONSHIP_FAILURE);
    }

    private boolean partition(ProcessContext context, ProcessSession session, FlowFile incomingFlowFile, ScriptEvaluator evaluator) {
        long startMillis = System.currentTimeMillis();
        RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        Map originalAttributes = incomingFlowFile.getAttributes();
        RecordCounts counts = new RecordCounts();
        try {
            session.read(incomingFlowFile, in -> {
                try (RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, incomingFlowFile.getSize(), this.getLogger());){
                    FlowFile outgoingFlowFile;
                    RecordSetWriter writer;
                    RecordSchema schema = writerFactory.getSchema(originalAttributes, reader.getSchema());
                    RecordSet recordSet = reader.createRecordSet();
                    PushBackRecordSet pushBackSet = new PushBackRecordSet(recordSet);
                    HashMap<Object, FlowFile> outgoingFlowFiles = new HashMap<Object, FlowFile>();
                    HashMap<Object, RecordSetWriter> recordSetWriters = new HashMap<Object, RecordSetWriter>();
                    while (pushBackSet.isAnotherRecord()) {
                        Record record = pushBackSet.next();
                        List<Object> evaluatedValue = evaluator.evaluate(record, counts.getRecordCount());
                        this.getLogger().debug("Evaluated scripted against {} (index {}), producing result of {}", new Object[]{record, counts.getRecordCount(), evaluatedValue});
                        counts.incrementRecordCount();
                        Object partition = evaluatedValue instanceof Object[] ? Arrays.asList((Object[])evaluatedValue) : evaluatedValue;
                        writer = (RecordSetWriter)recordSetWriters.get(partition);
                        if (writer == null) {
                            outgoingFlowFile = session.create(incomingFlowFile);
                            OutputStream out = session.write(outgoingFlowFile);
                            writer = writerFactory.createWriter(this.getLogger(), schema, out, outgoingFlowFile);
                            writer.beginRecordSet();
                            outgoingFlowFiles.put(partition, outgoingFlowFile);
                            recordSetWriters.put(partition, writer);
                        }
                        writer.write(record);
                    }
                    int fragmentIndex = 0;
                    for (Object partition : outgoingFlowFiles.keySet()) {
                        writer = (RecordSetWriter)recordSetWriters.get(partition);
                        outgoingFlowFile = (FlowFile)outgoingFlowFiles.get(partition);
                        HashMap<String, String> attributes = new HashMap<String, String>(incomingFlowFile.getAttributes());
                        attributes.put("mime.type", writer.getMimeType());
                        attributes.put("partition", partition == null ? "<null partition>" : partition.toString());
                        attributes.put("fragment.index", String.valueOf(fragmentIndex));
                        attributes.put("fragment.count", String.valueOf(outgoingFlowFiles.size()));
                        try {
                            WriteResult finalResult = writer.finishRecordSet();
                            int outgoingFlowFileRecords = finalResult.getRecordCount();
                            attributes.put("record.count", String.valueOf(outgoingFlowFileRecords));
                            writer.close();
                        }
                        catch (IOException e) {
                            throw new ProcessException("Resources used for record writing might not be closed", (Throwable)e);
                        }
                        session.putAllAttributes(outgoingFlowFile, attributes);
                        session.transfer(outgoingFlowFile, RELATIONSHIP_SUCCESS);
                        ++fragmentIndex;
                    }
                    long millis = System.currentTimeMillis() - startMillis;
                    session.adjustCounter("Records Processed", counts.getRecordCount(), true);
                    session.getProvenanceReporter().fork(incomingFlowFile, outgoingFlowFiles.values(), "Processed " + counts.getRecordCount() + " Records", millis);
                }
                catch (ScriptException | SchemaNotFoundException | MalformedRecordException e) {
                    throw new ProcessException("After processing " + counts.getRecordCount() + " Records, encountered failure when attempting to process " + String.valueOf(incomingFlowFile), e);
                }
            });
            return true;
        }
        catch (Exception e) {
            this.getLogger().error("Failed to partition records", (Throwable)e);
            return false;
        }
    }
}

