/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.mqtt;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.mqtt.PublishMQTT;
import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
import org.apache.nifi.processors.mqtt.common.MqttConstants;
import org.apache.nifi.processors.mqtt.common.MqttException;
import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SchemaValidationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;

@Tags(value={"subscribe", "MQTT", "IOT", "consume", "listen"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@TriggerSerially
@CapabilityDescription(value="Subscribes to a topic and receives messages from an MQTT broker")
@SeeAlso(value={PublishMQTT.class})
@WritesAttributes(value={@WritesAttribute(attribute="record.count", description="The number of records received"), @WritesAttribute(attribute="mqtt.broker", description="MQTT broker that was the message source"), @WritesAttribute(attribute="mqtt.topic", description="MQTT topic on which message was received"), @WritesAttribute(attribute="mqtt.qos", description="The quality of service for this message."), @WritesAttribute(attribute="mqtt.isDuplicate", description="Whether or not this message might be a duplicate of one which has already been received."), @WritesAttribute(attribute="mqtt.isRetained", description="Whether or not this message was from a current publisher, or was \"retained\" by the server as the last message published on the topic.")})
@SystemResourceConsideration(resource=SystemResource.MEMORY, description="The 'Max Queue Size' specifies the maximum number of messages that can be hold in memory by NiFi by a single instance of this processor. A high value for this property could represent a lot of data being stored in memory.")
public class ConsumeMQTT
extends AbstractMQTTProcessor {
    public static final String RECORD_COUNT_KEY = "record.count";
    public static final String BROKER_ATTRIBUTE_KEY = "mqtt.broker";
    public static final String TOPIC_ATTRIBUTE_KEY = "mqtt.topic";
    public static final String QOS_ATTRIBUTE_KEY = "mqtt.qos";
    public static final String IS_DUPLICATE_ATTRIBUTE_KEY = "mqtt.isDuplicate";
    public static final String IS_RETAINED_ATTRIBUTE_KEY = "mqtt.isRetained";
    public static final String TOPIC_FIELD_KEY = "_topic";
    public static final String QOS_FIELD_KEY = "_qos";
    public static final String IS_DUPLICATE_FIELD_KEY = "_isDuplicate";
    public static final String IS_RETAINED_FIELD_KEY = "_isRetained";
    private static final String COUNTER_PARSE_FAILURES = "Parse Failures";
    private static final String COUNTER_RECORDS_RECEIVED = "Records Received";
    private static final String COUNTER_RECORDS_PROCESSED = "Records Processed";
    private static final int MAX_MESSAGES_PER_FLOW_FILE = 10000;
    public static final PropertyDescriptor PROP_GROUPID = new PropertyDescriptor.Builder().name("Group ID").description("MQTT consumer group ID to use. If group ID not set, client will connect as individual consumer.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor PROP_TOPIC_FILTER = new PropertyDescriptor.Builder().name("Topic Filter").description("The MQTT topic filter to designate the topics to subscribe to.").required(true).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.NON_BLANK_VALIDATOR).build();
    public static final PropertyDescriptor PROP_QOS = new PropertyDescriptor.Builder().name("Quality of Service(QoS)").displayName("Quality of Service (QoS)").description("The Quality of Service (QoS) to receive the message with. Accepts values '0', '1' or '2'; '0' for 'at most once', '1' for 'at least once', '2' for 'exactly once'.").required(true).defaultValue(MqttConstants.ALLOWABLE_VALUE_QOS_0.getValue()).allowableValues(new DescribedValue[]{MqttConstants.ALLOWABLE_VALUE_QOS_0, MqttConstants.ALLOWABLE_VALUE_QOS_1, MqttConstants.ALLOWABLE_VALUE_QOS_2}).build();
    public static final PropertyDescriptor PROP_MAX_QUEUE_SIZE = new PropertyDescriptor.Builder().name("Max Queue Size").description("The MQTT messages are always being sent to subscribers on a topic regardless of how frequently the processor is scheduled to run. If the 'Run Schedule' is significantly behind the rate at which the messages are arriving to this processor, then a back up can occur in the internal queue of this processor. This property specifies the maximum number of messages this processor will hold in memory at one time in the internal queue. This data would be lost in case of a NiFi restart.").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().fromPropertyDescriptor(BASE_RECORD_READER).description("The Record Reader to use for parsing received MQTT Messages into Records.").build();
    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().fromPropertyDescriptor(BASE_RECORD_WRITER).description("The Record Writer to use for serializing Records before writing them to a FlowFile.").build();
    public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder().fromPropertyDescriptor(BASE_MESSAGE_DEMARCATOR).description("With this property, you have an option to output FlowFiles which contains multiple messages. This property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart multiple messages. This is an optional property ; if not provided, and if not defining a Record Reader/Writer, each message received will result in a single FlowFile. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS.").build();
    public static final PropertyDescriptor ADD_ATTRIBUTES_AS_FIELDS = new PropertyDescriptor.Builder().name("add-attributes-as-fields").displayName("Add attributes as fields").description("If setting this property to true, default fields are going to be added in each record: _topic, _qos, _isDuplicate, _isRetained.").required(true).defaultValue("true").allowableValues(new String[]{"true", "false"}).addValidator(StandardValidators.BOOLEAN_VALIDATOR).dependsOn(RECORD_READER, new AllowableValue[0]).build();
    private volatile int qos;
    private volatile String topicPrefix = "";
    private volatile String topicFilter;
    private final AtomicBoolean scheduled = new AtomicBoolean(false);
    private volatile LinkedBlockingQueue<ReceivedMqttMessage> mqttQueue;
    public static final Relationship REL_MESSAGE = new Relationship.Builder().name("Message").description("The MQTT message output").build();
    public static final Relationship REL_PARSE_FAILURE = new Relationship.Builder().name("parse.failure").description("If a message cannot be parsed using the configured Record Reader, the contents of the message will be routed to this Relationship as its own individual FlowFile.").autoTerminateDefault(true).build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(PROP_BROKER_URI, PROP_MQTT_VERSION, PROP_USERNAME, PROP_PASSWORD, PROP_SSL_CONTEXT_SERVICE, PROP_CLEAN_SESSION, PROP_SESSION_EXPIRY_INTERVAL, PROP_CLIENTID, PROP_GROUPID, PROP_TOPIC_FILTER, PROP_QOS, RECORD_READER, RECORD_WRITER, ADD_ATTRIBUTES_AS_FIELDS, MESSAGE_DEMARCATOR, PROP_CONN_TIMEOUT, PROP_KEEP_ALIVE_INTERVAL, PROP_LAST_WILL_MESSAGE, PROP_LAST_WILL_TOPIC, PROP_LAST_WILL_RETAIN, PROP_LAST_WILL_QOS, PROP_MAX_QUEUE_SIZE);
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_MESSAGE, REL_PARSE_FAILURE);

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        if (descriptor == PROP_MAX_QUEUE_SIZE) {
            int newSize = Integer.parseInt(newValue);
            if (this.mqttQueue != null) {
                int msgPending = this.mqttQueue.size();
                if (msgPending > newSize) {
                    this.logger.warn("New receive buffer size ({}) is smaller than the number of messages pending ({}), ignoring resize request. Processor will be invalid.", new Object[]{newSize, msgPending});
                    return;
                }
                LinkedBlockingQueue newBuffer = new LinkedBlockingQueue(newSize);
                this.mqttQueue.drainTo(newBuffer);
                this.mqttQueue = newBuffer;
            }
        }
    }

    @Override
    public Collection<ValidationResult> customValidate(ValidationContext context) {
        int msgPending;
        Collection<ValidationResult> results = super.customValidate(context);
        int newSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asInteger();
        if (this.mqttQueue == null) {
            this.mqttQueue = new LinkedBlockingQueue(context.getProperty(PROP_MAX_QUEUE_SIZE).asInteger());
        }
        if ((msgPending = this.mqttQueue.size()) > newSize) {
            results.add(new ValidationResult.Builder().valid(false).subject("ConsumeMQTT Configuration").explanation(String.format("%s (%d) is smaller than the number of messages pending (%d).", PROP_MAX_QUEUE_SIZE.getDisplayName(), newSize, msgPending)).build());
        }
        boolean clientIDSet = context.getProperty(PROP_CLIENTID).isSet();
        boolean clientIDwithEL = context.getProperty(PROP_CLIENTID).isExpressionLanguagePresent();
        boolean groupIDSet = context.getProperty(PROP_GROUPID).isSet();
        if (!clientIDwithEL && clientIDSet && groupIDSet) {
            results.add(new ValidationResult.Builder().subject("Client ID and Group ID").valid(false).explanation("if client ID is not unique, multiple nodes cannot join the consumer group (if you want to set the client ID, please use expression language to make sure each node in the NiFi cluster gets a unique client ID with something like ${hostname()}).").build());
        }
        return results;
    }

    protected void init(ProcessorInitializationContext context) {
        this.logger = this.getLogger();
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    @Override
    @OnScheduled
    public void onScheduled(ProcessContext context) {
        super.onScheduled(context);
        this.qos = context.getProperty(PROP_QOS).asInteger();
        this.topicFilter = context.getProperty(PROP_TOPIC_FILTER).evaluateAttributeExpressions().getValue();
        this.topicPrefix = context.getProperty(PROP_GROUPID).isSet() ? "$share/" + context.getProperty(PROP_GROUPID).getValue() + "/" : "";
        this.scheduled.set(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @OnUnscheduled
    public void onUnscheduled(ProcessContext context) {
        this.scheduled.set(false);
        ConsumeMQTT consumeMQTT = this;
        synchronized (consumeMQTT) {
            this.stopClient();
        }
    }

    @OnStopped
    public void onStopped(ProcessContext context) {
        if (this.mqttQueue != null && !this.mqttQueue.isEmpty() && this.processSessionFactory != null) {
            this.logger.info("Finishing processing leftover messages");
            ProcessSession session = this.processSessionFactory.createSession();
            if (context.getProperty(RECORD_READER).isSet()) {
                this.transferQueueRecord(context, session);
            } else if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
                this.transferQueueDemarcator(context, session);
            } else {
                this.transferQueue(session);
            }
        } else if (this.mqttQueue != null && !this.mqttQueue.isEmpty()) {
            throw new ProcessException("Stopping the processor but there is no ProcessSessionFactory stored and there are messages in the MQTT internal queue. Removing the processor now will clear the queue but will result in DATA LOSS. This is normally due to starting the processor, receiving messages and stopping before the onTrigger happens. The messages in the MQTT internal queue cannot finish processing until until the processor is triggered to run.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        boolean isScheduled = this.scheduled.get();
        if (!this.isConnected() && isScheduled) {
            ConsumeMQTT consumeMQTT = this;
            synchronized (consumeMQTT) {
                if (!this.isConnected()) {
                    this.initializeClient(context);
                }
            }
        }
        if (this.mqttQueue.isEmpty()) {
            context.yield();
            return;
        }
        if (context.getProperty(RECORD_READER).isSet()) {
            this.transferQueueRecord(context, session);
        } else if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
            this.transferQueueDemarcator(context, session);
        } else {
            this.transferQueue(session);
        }
    }

    private void initializeClient(ProcessContext context) {
        try {
            this.mqttClient = this.createMqttClient();
            this.mqttClient.connect();
            this.mqttClient.subscribe(this.topicPrefix + this.topicFilter, this.qos, this::handleReceivedMessage);
        }
        catch (Exception e) {
            this.logger.error("Connection failed to {}. Yielding processor", new Object[]{this.clientProperties.getRawBrokerUris(), e});
            this.mqttClient = null;
            context.yield();
        }
    }

    private void transferQueue(ProcessSession session) {
        while (!this.mqttQueue.isEmpty()) {
            ReceivedMqttMessage mqttMessage = this.mqttQueue.peek();
            FlowFile messageFlowfile = session.write(this.createFlowFileAndPopulateAttributes(session, mqttMessage), out -> out.write(mqttMessage.getPayload() == null ? new byte[]{} : mqttMessage.getPayload()));
            session.getProvenanceReporter().receive(messageFlowfile, this.getTransitUri(mqttMessage.getTopic()));
            session.transfer(messageFlowfile, REL_MESSAGE);
            session.commitAsync();
            this.mqttQueue.remove(mqttMessage);
        }
    }

    private void transferQueueDemarcator(ProcessContext context, ProcessSession session) {
        byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
        FlowFile messageFlowfile = session.create();
        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, this.clientProperties.getRawBrokerUris());
        messageFlowfile = session.append(messageFlowfile, out -> {
            for (int i = 0; !this.mqttQueue.isEmpty() && i < 10000; ++i) {
                ReceivedMqttMessage mqttMessage = this.mqttQueue.poll();
                if (i > 0) {
                    out.write(demarcator);
                }
                out.write(mqttMessage.getPayload() == null ? new byte[]{} : mqttMessage.getPayload());
                session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
            }
        });
        session.getProvenanceReporter().receive(messageFlowfile, this.getTransitUri(this.topicPrefix, this.topicFilter));
        session.transfer(messageFlowfile, REL_MESSAGE);
        session.commitAsync();
    }

    private void transferFailure(ProcessSession session, ReceivedMqttMessage mqttMessage) {
        FlowFile messageFlowfile = session.write(this.createFlowFileAndPopulateAttributes(session, mqttMessage), out -> out.write(mqttMessage.getPayload()));
        session.getProvenanceReporter().receive(messageFlowfile, this.getTransitUri(mqttMessage.getTopic()));
        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
        session.adjustCounter(COUNTER_PARSE_FAILURES, 1L, false);
    }

    private FlowFile createFlowFileAndPopulateAttributes(ProcessSession session, ReceivedMqttMessage mqttMessage) {
        FlowFile messageFlowfile = session.create();
        HashMap<String, String> attrs = new HashMap<String, String>();
        attrs.put(BROKER_ATTRIBUTE_KEY, this.clientProperties.getRawBrokerUris());
        attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
        attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
        attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
        attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
        messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
        return messageFlowfile;
    }

    /*
     * Unable to fully structure code
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void transferQueueRecord(ProcessContext context, ProcessSession session) {
        readerFactory = (RecordReaderFactory)context.getProperty(ConsumeMQTT.RECORD_READER).asControllerService(RecordReaderFactory.class);
        writerFactory = (RecordSetWriterFactory)context.getProperty(ConsumeMQTT.RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        flowFile = session.create();
        session.putAttribute(flowFile, "mqtt.broker", this.clientProperties.getRawBrokerUris());
        attributes = new HashMap<String, String>();
        recordCount = new AtomicInteger();
        doneList = new ArrayList<ReceivedMqttMessage>();
        writer = null;
        isWriterInitialized = false;
        i = 0;
        try {
            while (!this.mqttQueue.isEmpty() && i < 10000 && (mqttMessage = this.mqttQueue.poll()) != null) {
                recordBytes = mqttMessage.getPayload() == null ? new byte[]{} : mqttMessage.getPayload();
                try {
                    in = new ByteArrayInputStream(recordBytes);
                    try {
                        reader = readerFactory.createRecordReader(attributes, (InputStream)in, (long)recordBytes.length, this.logger);
                    }
                    catch (Exception e) {
                        this.logger.error("Failed to parse the message from the internal queue, sending to the parse failure relationship", (Throwable)e);
                        this.transferFailure(session, mqttMessage);
                        in.close();
                        continue;
                    }
                    ** try [egrp 4[TRYBLOCK] [7 : 230->667)] { 
lbl30:
                    // 1 sources

                    try {
                        while ((record = reader.nextRecord()) != null) {
                            if (!isWriterInitialized) {
                                block32: {
                                    recordSchema = record.getSchema();
                                    rawOut = session.write(flowFile);
                                    try {
                                        writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
                                        if (!context.getProperty(ConsumeMQTT.ADD_ATTRIBUTES_AS_FIELDS).asBoolean().booleanValue()) break block32;
                                        fields = new ArrayList<RecordField>(writeSchema.getFields());
                                        fields.add(new RecordField("_topic", RecordFieldType.STRING.getDataType()));
                                        fields.add(new RecordField("_qos", RecordFieldType.INT.getDataType()));
                                        fields.add(new RecordField("_isDuplicate", RecordFieldType.BOOLEAN.getDataType()));
                                        fields.add(new RecordField("_isRetained", RecordFieldType.BOOLEAN.getDataType()));
                                        writeSchema = new SimpleRecordSchema(fields);
                                    }
                                    catch (Exception e) {
                                        this.logger.error("Failed to obtain Schema for FlowFile, sending to the parse failure relationship", (Throwable)e);
                                        this.transferFailure(session, mqttMessage);
                                        continue;
                                    }
                                }
                                writer = writerFactory.createWriter(this.logger, writeSchema, rawOut, flowFile);
                                writer.beginRecordSet();
                            }
                            try {
                                if (context.getProperty(ConsumeMQTT.ADD_ATTRIBUTES_AS_FIELDS).asBoolean().booleanValue()) {
                                    record.setValue("_topic", (Object)mqttMessage.getTopic());
                                    record.setValue("_qos", (Object)mqttMessage.getQos());
                                    record.setValue("_isRetained", (Object)mqttMessage.isRetained());
                                    record.setValue("_isDuplicate", (Object)mqttMessage.isDuplicate());
                                }
                                writer.write(record);
                                isWriterInitialized = true;
                                doneList.add(mqttMessage);
                            }
                            catch (RuntimeException re) {
                                this.logger.error("Failed to write message using the configured Record Writer, sending to the parse failure relationship", (Throwable)re);
                                this.transferFailure(session, mqttMessage);
                                continue;
                            }
                            session.adjustCounter("Records Received", 1L, false);
                            ++i;
                        }
                    }
                    catch (IOException | MalformedRecordException | SchemaValidationException e) {
                        this.logger.error("Failed to write message, sending to the parse failure relationship", e);
                        this.transferFailure(session, mqttMessage);
                    }
lbl80:
                    // 1 sources

                    finally {
                        in.close();
                    }
                }
                catch (Exception e) {
                    this.logger.error("Failed to write message, sending to the parse failure relationship", (Throwable)e);
                    this.transferFailure(session, mqttMessage);
                }
            }
            if (writer != null) {
                writeResult = writer.finishRecordSet();
                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
                attributes.putAll(writeResult.getAttributes());
                recordCount.set(writeResult.getRecordCount());
            }
        }
        catch (Exception e) {
            context.yield();
            numberOfMessages = 0;
            var14_19 = doneList.iterator();
            while (true) {
                if (!var14_19.hasNext()) {
                    if (numberOfMessages <= 0) throw new ProcessException("Could not process data received from the MQTT broker(s): " + this.clientProperties.getRawBrokerUris(), (Throwable)e);
                    this.logger.error("Could not add {} message(s) back into the internal queue, this could mean data loss", new Object[]{numberOfMessages});
                    throw new ProcessException("Could not process data received from the MQTT broker(s): " + this.clientProperties.getRawBrokerUris(), (Throwable)e);
                }
                done = (ReceivedMqttMessage)var14_19.next();
                try {
                    this.mqttQueue.offer(done, 1L, TimeUnit.SECONDS);
                }
                catch (InterruptedException ex) {
                    ++numberOfMessages;
                    if (!this.getLogger().isDebugEnabled()) continue;
                    this.logger.debug("Could not add message back into the internal queue, this could lead to data loss", (Throwable)ex);
                }
            }
        }
        finally {
            this.closeWriter(writer);
        }
        if (recordCount.get() == 0) {
            session.remove(flowFile);
            return;
        }
        session.putAllAttributes(flowFile, attributes);
        session.getProvenanceReporter().receive(flowFile, this.getTransitUri(new String[]{this.topicPrefix, this.topicFilter}));
        session.transfer(flowFile, ConsumeMQTT.REL_MESSAGE);
        count = recordCount.get();
        session.adjustCounter("Records Processed", (long)count, false);
        this.logger.info("Successfully processed {} records for {}", new Object[]{count, flowFile});
    }

    private void closeWriter(RecordSetWriter writer) {
        try {
            if (writer != null) {
                writer.close();
            }
        }
        catch (Exception ioe) {
            this.logger.warn("Failed to close Record Writer", (Throwable)ioe);
        }
    }

    private String getTransitUri(String ... appends) {
        StringBuilder stringBuilder = new StringBuilder(this.clientProperties.getProvenanceFormattedBrokerUris()).append("/");
        for (String append : appends) {
            stringBuilder.append(append);
        }
        return stringBuilder.toString();
    }

    private void handleReceivedMessage(ReceivedMqttMessage message) {
        if (this.logger.isDebugEnabled()) {
            byte[] payload = message.getPayload();
            String text = new String(payload, StandardCharsets.UTF_8);
            if (StringUtils.isAsciiPrintable((CharSequence)text)) {
                this.logger.debug("Message arrived from topic {}. Payload: {}", new Object[]{message.getTopic(), text});
            } else {
                this.logger.debug("Message arrived from topic {}. Binary value of size {}", new Object[]{message.getTopic(), payload.length});
            }
        }
        try {
            if (!this.mqttQueue.offer(message, 1L, TimeUnit.SECONDS)) {
                throw new IllegalStateException("The subscriber queue is full, cannot receive another message until the processor is scheduled to run.");
            }
        }
        catch (InterruptedException e) {
            throw new MqttException("Failed to process message arrived from topic " + message.getTopic());
        }
    }
}

