/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.python.processor;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.NullSuppression;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.json.JsonParserFactory;
import org.apache.nifi.json.JsonRecordSource;
import org.apache.nifi.json.JsonSchemaInference;
import org.apache.nifi.json.JsonTreeRowRecordReader;
import org.apache.nifi.json.OutputGrouping;
import org.apache.nifi.json.TokenParserFactory;
import org.apache.nifi.json.WriteJsonResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.python.PythonObjectProxy;
import org.apache.nifi.python.processor.FlowFileAttributeMap;
import org.apache.nifi.python.processor.PythonProcessorBridge;
import org.apache.nifi.python.processor.PythonProcessorProxy;
import org.apache.nifi.python.processor.RecordTransform;
import org.apache.nifi.python.processor.RecordTransformResult;
import org.apache.nifi.schema.access.NopSchemaAccessWriter;
import org.apache.nifi.schema.access.SchemaAccessWriter;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schema.inference.RecordSource;
import org.apache.nifi.schema.inference.TimeValueInference;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;

@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
public class RecordTransformProxy
extends PythonProcessorProxy<RecordTransform> {
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("Record Reader").description("Specifies the Controller Service to use for reading incoming data").required(true).identifiesControllerService(RecordReaderFactory.class).build();
    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("Record Writer").description("Specifies the Controller Service to use for writing out the records").identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
    private static final JsonParserFactory jsonParserFactory = new JsonParserFactory();

    public RecordTransformProxy(String processorType, Supplier<PythonProcessorBridge> bridgeFactory, boolean initialize) {
        super(processorType, bridgeFactory, initialize);
    }

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(RECORD_READER);
        properties.add(RECORD_WRITER);
        properties.addAll(super.getSupportedPropertyDescriptors());
        return properties;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        Map<Relationship, List<FlowFile>> flowFilesPerRelationship;
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        HashMap<RecordGroupingKey, DestinationTuple> destinationTuples = new HashMap<RecordGroupingKey, DestinationTuple>();
        FlowFileAttributeMap attributeMap = new FlowFileAttributeMap(flowFile);
        long recordsRead = 0L;
        long recordsWritten = 0L;
        RecordTransform transform = (RecordTransform)this.getTransform();
        try (InputStream in = session.read(flowFile);
             RecordReader reader = readerFactory.createRecordReader(flowFile, in, this.getLogger());){
            block31: {
                RecordSchema recordSchema = reader.getSchema();
                try (ByteArrayOutputStream baos = new ByteArrayOutputStream();){
                    List<RecordTransformResult> results;
                    String json;
                    Record record;
                    WriteJsonResult writeJsonResult = new WriteJsonResult(this.getLogger(), recordSchema, (SchemaAccessWriter)new NopSchemaAccessWriter(), (OutputStream)baos, false, NullSuppression.NEVER_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, null, null, null);
                    int writtenSinceFlush = 0;
                    while ((record = reader.nextRecord()) != null) {
                        ++recordsRead;
                        if (writtenSinceFlush == 0) {
                            writeJsonResult.beginRecordSet();
                        }
                        writeJsonResult.writeRawRecord(record);
                        ++writtenSinceFlush;
                        if (baos.size() < 1000000) continue;
                        writeJsonResult.finishRecordSet();
                        writeJsonResult.flush();
                        json = baos.toString();
                        baos.reset();
                        results = transform.transformRecord(json, recordSchema, attributeMap);
                        try {
                            for (RecordTransformResult result : results) {
                                this.writeResult(result, destinationTuples, writerFactory, session, flowFile);
                                ++recordsWritten;
                            }
                        }
                        finally {
                            results.forEach(PythonObjectProxy::free);
                        }
                        writtenSinceFlush = 0;
                    }
                    if (writtenSinceFlush <= 0) break block31;
                    writeJsonResult.finishRecordSet();
                    writeJsonResult.flush();
                    json = baos.toString();
                    baos.reset();
                    results = transform.transformRecord(json, recordSchema, attributeMap);
                    try {
                        for (RecordTransformResult result : results) {
                            this.writeResult(result, destinationTuples, writerFactory, session, flowFile);
                            ++recordsWritten;
                        }
                    }
                    finally {
                        results.forEach(PythonObjectProxy::free);
                    }
                }
            }
            flowFilesPerRelationship = this.mapResults(destinationTuples, session);
            session.adjustCounter("Records Read", recordsRead, false);
            session.adjustCounter("Record Written", recordsWritten, false);
        }
        catch (Exception e) {
            this.getLogger().error("Failed to transform {}; routing to failure", new Object[]{flowFile, e});
            session.transfer(flowFile, REL_FAILURE);
            destinationTuples.values().forEach(tuple -> {
                session.remove(tuple.flowFile());
                try {
                    tuple.writer().close();
                }
                catch (IOException ioe) {
                    this.getLogger().warn("Failed to close Record Writer for FlowFile created in this session", (Throwable)ioe);
                }
            });
            return;
        }
        flowFilesPerRelationship.forEach((rel, flowFiles) -> session.transfer((Collection)flowFiles, rel));
        session.transfer(flowFile, REL_ORIGINAL);
    }

    private Map<Relationship, List<FlowFile>> mapResults(Map<RecordGroupingKey, DestinationTuple> destinationTuples, ProcessSession session) throws IOException {
        HashMap<Relationship, List<FlowFile>> flowFilesPerRelationship = new HashMap<Relationship, List<FlowFile>>();
        for (Map.Entry<RecordGroupingKey, DestinationTuple> entry : destinationTuples.entrySet()) {
            DestinationTuple destinationTuple = entry.getValue();
            RecordSetWriter writer = destinationTuple.writer();
            WriteResult writeResult = writer.finishRecordSet();
            writer.close();
            HashMap<String, String> attributes = new HashMap<String, String>(writeResult.getAttributes());
            attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
            attributes.put("mime.type", writer.getMimeType());
            RecordGroupingKey groupingKey = entry.getKey();
            Map<String, Object> partition = groupingKey.partition();
            if (partition != null) {
                partition.forEach((key, value) -> attributes.put((String)key, Objects.toString(value)));
            }
            FlowFile outputFlowFile = session.putAllAttributes(destinationTuple.flowFile(), attributes);
            Relationship destinationRelationship = new Relationship.Builder().name(groupingKey.relationship()).build();
            List flowFiles = flowFilesPerRelationship.computeIfAbsent(destinationRelationship, key -> new ArrayList());
            flowFiles.add(outputFlowFile);
        }
        return flowFilesPerRelationship;
    }

    private void writeResult(RecordTransformResult result, Map<RecordGroupingKey, DestinationTuple> destinationTuples, RecordSetWriterFactory writerFactory, ProcessSession session, FlowFile originalFlowFile) throws SchemaNotFoundException, IOException, MalformedRecordException {
        Record transformed = this.createRecordFromJson(result);
        if (transformed == null) {
            this.getLogger().debug("Received null result from RecordTransform; will not write result to output for {}", new Object[]{originalFlowFile});
            return;
        }
        RecordGroupingKey key = new RecordGroupingKey(result.getRelationship(), result.getPartition());
        DestinationTuple destinationTuple = destinationTuples.get(key);
        if (destinationTuple == null) {
            RecordSetWriter writer;
            FlowFile destinationFlowFile = session.create(originalFlowFile);
            OutputStream out = null;
            try {
                out = session.write(destinationFlowFile);
                Map originalAttributes = originalFlowFile.getAttributes();
                RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, transformed.getSchema());
                writer = writerFactory.createWriter(this.getLogger(), writeSchema, out, originalAttributes);
                writer.beginRecordSet();
            }
            catch (Exception e) {
                IOUtils.closeQuietly((OutputStream)out);
                session.remove(destinationFlowFile);
                throw e;
            }
            destinationTuple = new DestinationTuple(destinationFlowFile, writer);
            destinationTuples.put(key, destinationTuple);
        }
        destinationTuple.writer().write(transformed);
    }

    private Record createRecordFromJson(RecordTransformResult transformResult) throws IOException, MalformedRecordException {
        String json = transformResult.getRecordJson();
        byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);
        RecordSchema returnedSchema = transformResult.getSchema();
        RecordSchema schema = returnedSchema == null ? this.inferSchema(jsonBytes) : returnedSchema;
        try (ByteArrayInputStream in = new ByteArrayInputStream(jsonBytes);){
            Record record;
            JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader((InputStream)in, this.getLogger(), schema, null, null, null, null, null, null, null, (TokenParserFactory)jsonParserFactory);
            Record record2 = record = reader.nextRecord(false, false);
            return record2;
        }
    }

    private RecordSchema inferSchema(byte[] jsonBytes) throws IOException {
        try (ByteArrayInputStream in = new ByteArrayInputStream(jsonBytes);){
            JsonRecordSource recordSource = new JsonRecordSource((InputStream)in);
            TimeValueInference timeValueInference = new TimeValueInference(null, null, null);
            JsonSchemaInference schemaInference = new JsonSchemaInference(timeValueInference);
            RecordSchema recordSchema = schemaInference.inferSchema((RecordSource)recordSource);
            return recordSchema;
        }
    }

    private record DestinationTuple(FlowFile flowFile, RecordSetWriter writer) {
    }

    private record RecordGroupingKey(String relationship, Map<String, Object> partition) {
    }
}

