/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.event.StandardEvent;
import org.apache.nifi.processor.util.listen.event.StandardEventFactory;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;

@SupportsBatching
@Tags(value={"ingest", "udp", "listen", "source", "record"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@CapabilityDescription(value="Listens for Datagram Packets on a given port and reads the content of each datagram using the configured Record Reader. Each record will then be written to a flow file using the configured Record Writer. This processor can be restricted to listening for datagrams from  a specific remote host and port by specifying the Sending Host and Sending Host Port properties, otherwise it will listen for datagrams from all hosts and ports.")
@WritesAttributes(value={@WritesAttribute(attribute="udp.sender", description="The sending host of the messages."), @WritesAttribute(attribute="udp.port", description="The sending port the messages were received."), @WritesAttribute(attribute="record.count", description="The number of records written to the flow file."), @WritesAttribute(attribute="mime.type", description="The mime-type of the writer used to write the records to the flow file.")})
public class ListenUDPRecord
extends AbstractListenEventProcessor<StandardEvent> {
    public static final PropertyDescriptor SENDING_HOST = new PropertyDescriptor.Builder().name("sending-host").displayName("Sending Host").description("IP, or name, of a remote host. Only Datagrams from the specified Sending Host Port and this host will be accepted. Improves Performance. May be a system property or an environment variable.").addValidator((Validator)new HostValidator()).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).build();
    public static final PropertyDescriptor SENDING_HOST_PORT = new PropertyDescriptor.Builder().name("sending-host-port").displayName("Sending Host Port").description("Port being used by remote host to send Datagrams. Only Datagrams from the specified Sending Host and this port will be accepted. Improves Performance. May be a system property or an environment variable.").addValidator(StandardValidators.PORT_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).build();
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("The Record Reader to use for reading the content of incoming datagrams.").identifiesControllerService(RecordReaderFactory.class).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).build();
    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("The Record Writer to use in order to serialize the data before writing to a flow file.").identifiesControllerService(RecordSetWriterFactory.class).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).build();
    public static final PropertyDescriptor POLL_TIMEOUT = new PropertyDescriptor.Builder().name("poll-timeout").displayName("Poll Timeout").description("The amount of time to wait when polling the internal queue for more datagrams. If no datagrams are found after waiting for the configured timeout, then the processor will emit whatever records have been obtained up to that point.").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("50 ms").required(true).build();
    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("batch-size").displayName("Batch Size").description("The maximum number of datagrams to write as records to a single FlowFile. The Batch Size will only be reached when data is coming in more frequently than the Poll Timeout.").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).defaultValue("1000").required(true).build();
    private static final List<PropertyDescriptor> ADDITIONAL_PROPERTIES = List.of(POLL_TIMEOUT, BATCH_SIZE, RECORD_READER, RECORD_WRITER, SENDING_HOST, SENDING_HOST_PORT);
    public static final Relationship REL_PARSE_FAILURE = new Relationship.Builder().name("parse.failure").description("If a datagram cannot be parsed using the configured Record Reader, the contents of the message will be routed to this Relationship as its own individual FlowFile.").build();
    private static final List<Relationship> ADDITIONAL_RELATIONSHIPS = List.of(REL_PARSE_FAILURE);
    public static final String UDP_PORT_ATTR = "udp.port";
    public static final String UDP_SENDER_ATTR = "udp.sender";
    public static final String RECORD_COUNT_ATTR = "record.count";
    private volatile long pollTimeout;

    protected List<PropertyDescriptor> getAdditionalProperties() {
        return ADDITIONAL_PROPERTIES;
    }

    protected List<Relationship> getAdditionalRelationships() {
        return ADDITIONAL_RELATIONSHIPS;
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) throws IOException {
        super.onScheduled(context);
        this.pollTimeout = context.getProperty(POLL_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
    }

    protected long getLongPollTimeout() {
        return this.pollTimeout;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList<ValidationResult> result = new ArrayList<ValidationResult>();
        String sendingHost = validationContext.getProperty(SENDING_HOST).getValue();
        String sendingPort = validationContext.getProperty(SENDING_HOST_PORT).getValue();
        if (StringUtils.isBlank((CharSequence)sendingHost) && StringUtils.isNotBlank((CharSequence)sendingPort)) {
            result.add(new ValidationResult.Builder().subject(SENDING_HOST.getName()).valid(false).explanation("Must specify Sending Host when specifying Sending Host Port").build());
        } else if (StringUtils.isBlank((CharSequence)sendingPort) && StringUtils.isNotBlank((CharSequence)sendingHost)) {
            result.add(new ValidationResult.Builder().subject(SENDING_HOST_PORT.getName()).valid(false).explanation("Must specify Sending Host Port when specifying Sending Host").build());
        }
        return result;
    }

    protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<StandardEvent> events) throws IOException {
        String sendingHost = context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue();
        Integer sendingHostPort = context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger();
        Integer bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        ByteBufferPool byteBufferSource = new ByteBufferPool(context.getMaxConcurrentTasks(), bufferSize.intValue());
        StandardEventFactory eventFactory = new StandardEventFactory();
        return new DatagramChannelDispatcher((EventFactory)eventFactory, (ByteBufferSource)byteBufferSource, events, this.getLogger(), sendingHost, sendingHostPort);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        StandardEvent event;
        int maxBatchSize = context.getProperty(BATCH_SIZE).asInteger();
        HashMap<String, FlowFileRecordWriter> flowFileRecordWriters = new HashMap<String, FlowFileRecordWriter>();
        RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        for (int i = 0; i < maxBatchSize && (event = (StandardEvent)this.getMessage(true, false, session)) != null; ++i) {
            ArrayList<Record> records = new ArrayList<Record>();
            try (ByteArrayInputStream in = new ByteArrayInputStream(event.getData());){
                Record record;
                long inputLength = event.getData() == null ? -1L : (long)event.getData().length;
                RecordReader reader = readerFactory.createRecordReader(Collections.emptyMap(), (InputStream)in, inputLength, this.getLogger());
                while ((record = reader.nextRecord()) != null) {
                    records.add(record);
                }
            }
            catch (Exception e) {
                this.handleParseFailure(event, session, e);
                continue;
            }
            if (records.isEmpty()) {
                this.handleParseFailure(event, session, null);
                continue;
            }
            FlowFileRecordWriter flowFileRecordWriter = (FlowFileRecordWriter)flowFileRecordWriters.get(event.getSender());
            if (flowFileRecordWriter == null) {
                FlowFile flowFile = null;
                Object rawOut = null;
                RecordSetWriter writer = null;
                try {
                    flowFile = session.create();
                    rawOut = session.write(flowFile);
                    Record firstRecord = (Record)records.get(0);
                    RecordSchema recordSchema = firstRecord.getSchema();
                    RecordSchema writeSchema = writerFactory.getSchema(Collections.emptyMap(), recordSchema);
                    writer = writerFactory.createWriter(this.getLogger(), writeSchema, (OutputStream)rawOut, flowFile);
                    writer.beginRecordSet();
                    flowFileRecordWriter = new FlowFileRecordWriter(flowFile, writer);
                    flowFileRecordWriters.put(event.getSender(), flowFileRecordWriter);
                }
                catch (Exception ex) {
                    this.getLogger().error("Failed to properly initialize record writer. Datagram will be queued for re-processing.", (Throwable)ex);
                    try {
                        if (writer != null) {
                            writer.close();
                        }
                    }
                    catch (Exception e) {
                        this.getLogger().warn("Failed to close Record Writer", (Throwable)e);
                    }
                    if (rawOut != null) {
                        IOUtils.closeQuietly((OutputStream)rawOut);
                    }
                    if (flowFile != null) {
                        session.remove(flowFile);
                    }
                    context.yield();
                    break;
                }
            }
            RecordSetWriter writer = flowFileRecordWriter.getRecordWriter();
            try {
                for (Record record : records) {
                    writer.write(record);
                }
                continue;
            }
            catch (Exception e) {
                this.getLogger().error("Failed to write records", (Throwable)e);
                IOUtils.closeQuietly((Closeable)writer);
                session.remove(flowFileRecordWriter.getFlowFile());
                flowFileRecordWriters.remove(event.getSender());
                break;
            }
        }
        for (Map.Entry entry : flowFileRecordWriters.entrySet()) {
            String sender = (String)entry.getKey();
            FlowFileRecordWriter flowFileRecordWriter = (FlowFileRecordWriter)entry.getValue();
            RecordSetWriter writer = flowFileRecordWriter.getRecordWriter();
            FlowFile flowFile = flowFileRecordWriter.getFlowFile();
            try {
                WriteResult writeResult;
                try {
                    writeResult = writer.finishRecordSet();
                }
                finally {
                    writer.close();
                }
                if (writeResult.getRecordCount() == 0) {
                    session.remove(flowFile);
                    continue;
                }
                HashMap<String, String> attributes = new HashMap<String, String>();
                attributes.putAll(this.getAttributes(sender));
                attributes.putAll(writeResult.getAttributes());
                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
                attributes.put(RECORD_COUNT_ATTR, String.valueOf(writeResult.getRecordCount()));
                flowFile = session.putAllAttributes(flowFile, attributes);
                session.transfer(flowFile, REL_SUCCESS);
                String transitUri = this.getTransitUri(sender);
                session.getProvenanceReporter().receive(flowFile, transitUri);
            }
            catch (Exception e) {
                this.getLogger().error("Unable to properly complete record set", (Throwable)e);
                session.remove(flowFile);
            }
        }
    }

    private void handleParseFailure(StandardEvent event, ProcessSession session, Exception cause) {
        this.handleParseFailure(event, session, cause, "Failed to parse datagram using the configured Record Reader. Will route message as its own FlowFile to the 'parse.failure' relationship");
    }

    private void handleParseFailure(StandardEvent event, ProcessSession session, Exception cause, String message) {
        Map<String, String> attributes = this.getAttributes(event.getSender());
        FlowFile failureFlowFile = session.create();
        failureFlowFile = session.write(failureFlowFile, out -> out.write(event.getData()));
        failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
        String transitUri = this.getTransitUri(event.getSender());
        session.getProvenanceReporter().receive(failureFlowFile, transitUri);
        session.transfer(failureFlowFile, REL_PARSE_FAILURE);
        if (cause == null) {
            this.getLogger().error(message);
        } else {
            this.getLogger().error(message, (Throwable)cause);
        }
        session.adjustCounter("Parse Failures", 1L, false);
    }

    private Map<String, String> getAttributes(String sender) {
        HashMap<String, String> attributes = new HashMap<String, String>(3);
        attributes.put(UDP_SENDER_ATTR, sender);
        attributes.put(UDP_PORT_ATTR, String.valueOf(this.port));
        return attributes;
    }

    private String getTransitUri(String sender) {
        String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
        return "udp" + "://" + senderHost + ":" + this.port;
    }

    private static class FlowFileRecordWriter {
        private final FlowFile flowFile;
        private final RecordSetWriter recordWriter;

        public FlowFileRecordWriter(FlowFile flowFile, RecordSetWriter recordWriter) {
            this.flowFile = flowFile;
            this.recordWriter = recordWriter;
        }

        public FlowFile getFlowFile() {
            return this.flowFile;
        }

        public RecordSetWriter getRecordWriter() {
            return this.recordWriter;
        }
    }

    private static class HostValidator
    implements Validator {
        private HostValidator() {
        }

        public ValidationResult validate(String subject, String input, ValidationContext context) {
            try {
                InetAddress.getByName(input);
                return new ValidationResult.Builder().subject(subject).valid(true).input(input).build();
            }
            catch (UnknownHostException e) {
                return new ValidationResult.Builder().subject(subject).valid(false).input(input).explanation("Unknown host: " + String.valueOf(e)).build();
            }
        }
    }
}

