/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.kafka.processors;

import java.io.BufferedInputStream;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.kafka.processors.ConsumeKafka;
import org.apache.nifi.kafka.processors.producer.PartitionStrategy;
import org.apache.nifi.kafka.processors.producer.common.PublishKafkaUtil;
import org.apache.nifi.kafka.processors.producer.config.DeliveryGuarantee;
import org.apache.nifi.kafka.processors.producer.convert.DelimitedStreamKafkaRecordConverter;
import org.apache.nifi.kafka.processors.producer.convert.FlowFileStreamKafkaRecordConverter;
import org.apache.nifi.kafka.processors.producer.convert.KafkaRecordConverter;
import org.apache.nifi.kafka.processors.producer.convert.RecordStreamKafkaRecordConverter;
import org.apache.nifi.kafka.processors.producer.convert.RecordWrapperStreamKafkaRecordConverter;
import org.apache.nifi.kafka.processors.producer.header.AttributesHeadersFactory;
import org.apache.nifi.kafka.processors.producer.key.AttributeKeyFactory;
import org.apache.nifi.kafka.processors.producer.key.KeyFactory;
import org.apache.nifi.kafka.processors.producer.key.MessageKeyFactory;
import org.apache.nifi.kafka.processors.producer.wrapper.RecordMetadataStrategy;
import org.apache.nifi.kafka.service.api.KafkaConnectionService;
import org.apache.nifi.kafka.service.api.producer.FlowFileResult;
import org.apache.nifi.kafka.service.api.producer.KafkaProducerService;
import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration;
import org.apache.nifi.kafka.service.api.producer.PublishContext;
import org.apache.nifi.kafka.service.api.producer.RecordSummary;
import org.apache.nifi.kafka.service.api.record.KafkaRecord;
import org.apache.nifi.kafka.shared.component.KafkaPublishComponent;
import org.apache.nifi.kafka.shared.property.FailureStrategy;
import org.apache.nifi.kafka.shared.property.KeyEncoding;
import org.apache.nifi.kafka.shared.property.PublishStrategy;
import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;

@Tags(value={"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub"})
@CapabilityDescription(value="Sends the contents of a FlowFile as either a message or as individual records to Apache Kafka using the Kafka Producer API. The messages to send may be individual FlowFiles, may be delimited using a user-specified delimiter (such as a new-line), or may be record-oriented data that can be read by the configured Record Reader. The complementary NiFi processor for fetching messages is ConsumeKafka.")
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@ReadsAttribute(attribute="kafka.tombstone", description="If this attribute is set to 'true', if the processor is not configured with a demarcator and if the FlowFile's content is null, then a tombstone message with zero bytes will be sent to Kafka.")
@WritesAttribute(attribute="msg.count", description="The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to FlowFiles that are routed to success.")
@SeeAlso(value={ConsumeKafka.class})
public class PublishKafka
extends AbstractProcessor
implements KafkaPublishComponent,
VerifiableProcessor {
    protected static final String MSG_COUNT = "msg.count";
    public static final PropertyDescriptor CONNECTION_SERVICE = new PropertyDescriptor.Builder().name("Kafka Connection Service").description("Provides connections to Kafka Broker for publishing Kafka Records").identifiesControllerService(KafkaConnectionService.class).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).build();
    public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder().name("Topic Name").description("Name of the Kafka Topic to which the Processor publishes Kafka Records").required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder().name("acks").displayName("Delivery Guarantee").description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka Client acks property.").required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(DeliveryGuarantee.class).defaultValue((DescribedValue)DeliveryGuarantee.DELIVERY_REPLICATED).build();
    static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder().name("compression.type").displayName("Compression Type").description("Specifies the compression strategy for records sent to Kafka. Corresponds to Kafka Client compression.type property.").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).allowableValues(new String[]{"none", "gzip", "snappy", "lz4"}).defaultValue("none").build();
    public static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder().name("max.request.size").displayName("Max Request Size").description("The maximum size of a request in bytes. Corresponds to Kafka Client max.request.size property.").required(true).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("1 MB").build();
    public static final PropertyDescriptor TRANSACTIONS_ENABLED = new PropertyDescriptor.Builder().name("Transactions Enabled").description("Specifies whether to provide transactional guarantees when communicating with Kafka. If there is a problem sending data to Kafka, and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true requires that the [Delivery Guarantee] property be set to [Guarantee Replicated Delivery.]").expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(new String[]{"true", "false"}).defaultValue("true").required(true).build();
    static final PropertyDescriptor TRANSACTIONAL_ID_PREFIX = new PropertyDescriptor.Builder().name("Transactional ID Prefix").description("Specifies the KafkaProducer config transactional.id will be a generated UUID and will be prefixed with the configured string.").expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).dependsOn(TRANSACTIONS_ENABLED, "true", new String[0]).required(false).build();
    static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder().name("partitioner.class").displayName("Partitioner Class").description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka Client partitioner.class property.").allowableValues(PartitionStrategy.class).defaultValue(PartitionStrategy.RANDOM_PARTITIONING.getValue()).required(true).build();
    public static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder().name("partition").displayName("Partition").description("Specifies the Kafka Partition destination for Records.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder().name("Message Demarcator").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS.").build();
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("Record Reader").description("The Record Reader to use for incoming FlowFiles").identifiesControllerService(RecordReaderFactory.class).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("Record Writer").description("The Record Writer to use in order to serialize the data before sending to Kafka").identifiesControllerService(RecordSetWriterFactory.class).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final PropertyDescriptor PUBLISH_STRATEGY = new PropertyDescriptor.Builder().name("Publish Strategy").description("The format used to publish the incoming FlowFile record to Kafka.").required(true).defaultValue((DescribedValue)PublishStrategy.USE_VALUE).dependsOn(RECORD_READER, new AllowableValue[0]).allowableValues(PublishStrategy.class).build();
    public static final PropertyDescriptor MESSAGE_KEY_FIELD = new PropertyDescriptor.Builder().name("Message Key Field").description("The name of a field in the Input Records that should be used as the Key for the Kafka message.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dependsOn(PUBLISH_STRATEGY, (DescribedValue)PublishStrategy.USE_VALUE, new DescribedValue[0]).required(false).build();
    public static final PropertyDescriptor ATTRIBUTE_HEADER_PATTERN = new PropertyDescriptor.Builder().name("FlowFile Attribute Header Pattern").description("A Regular Expression that is matched against all FlowFile attribute names. Any attribute whose name matches the pattern will be added to the Kafka messages as a Header. If not specified, no FlowFile attributes will be added as headers.").addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).dependsOn(PUBLISH_STRATEGY, (DescribedValue)PublishStrategy.USE_VALUE, new DescribedValue[0]).required(false).build();
    static final PropertyDescriptor HEADER_ENCODING = new PropertyDescriptor.Builder().name("Header Encoding").description("For any attribute that is added as a Kafka Record Header, this property indicates the Character Encoding to use for serializing the headers.").addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).defaultValue(StandardCharsets.UTF_8.displayName()).required(true).dependsOn(ATTRIBUTE_HEADER_PATTERN, new AllowableValue[0]).build();
    static final PropertyDescriptor KAFKA_KEY = new PropertyDescriptor.Builder().name("Kafka Key").description("The Key to use for the Message. If not specified, the FlowFile attribute 'kafka.key' is used as the message key, if it is present.Beware that setting Kafka key and demarcating at the same time may potentially lead to many Kafka messages with the same key.Normally this is not a problem as Kafka does not enforce or assume message and key uniqueness. Still, setting the demarcator and Kafka key at the same time poses a risk of data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dependsOn(PUBLISH_STRATEGY, (DescribedValue)PublishStrategy.USE_WRAPPER, new DescribedValue[0]).build();
    static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder().name("Kafka Key Attribute Encoding").description("FlowFiles that are emitted have an attribute named 'kafka.key'. This property dictates how the value of the attribute should be encoded.").required(true).defaultValue(KeyEncoding.UTF8.getValue()).allowableValues(KeyEncoding.class).dependsOn(PUBLISH_STRATEGY, (DescribedValue)PublishStrategy.USE_WRAPPER, new DescribedValue[0]).build();
    static final PropertyDescriptor RECORD_KEY_WRITER = new PropertyDescriptor.Builder().name("Record Key Writer").description("The Record Key Writer to use for outgoing FlowFiles").required(false).identifiesControllerService(RecordSetWriterFactory.class).dependsOn(PUBLISH_STRATEGY, (DescribedValue)PublishStrategy.USE_WRAPPER, new DescribedValue[0]).build();
    public static final PropertyDescriptor RECORD_METADATA_STRATEGY = new PropertyDescriptor.Builder().name("Record Metadata Strategy").description("Specifies whether the Record's metadata (topic and partition) should come from the Record's metadata field or if it should come from the configured Topic Name and Partition / Partitioner class properties").required(true).defaultValue((DescribedValue)RecordMetadataStrategy.FROM_PROPERTIES).allowableValues(RecordMetadataStrategy.class).dependsOn(PUBLISH_STRATEGY, (DescribedValue)PublishStrategy.USE_WRAPPER, new DescribedValue[0]).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles for which all content was sent to Kafka.").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship").build();
    private static final List<PropertyDescriptor> DESCRIPTORS = List.of(CONNECTION_SERVICE, TOPIC_NAME, FAILURE_STRATEGY, DELIVERY_GUARANTEE, COMPRESSION_CODEC, MAX_REQUEST_SIZE, TRANSACTIONS_ENABLED, TRANSACTIONAL_ID_PREFIX, PARTITION_CLASS, PARTITION, MESSAGE_DEMARCATOR, RECORD_READER, RECORD_WRITER, PUBLISH_STRATEGY, MESSAGE_KEY_FIELD, ATTRIBUTE_HEADER_PATTERN, HEADER_ENCODING, KAFKA_KEY, KEY_ATTRIBUTE_ENCODING, RECORD_KEY_WRITER, RECORD_METADATA_STRATEGY);
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);
    private final Queue<KafkaProducerService> producerServices = new LinkedBlockingQueue<KafkaProducerService>();

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public List<ConfigVerificationResult> verify(ProcessContext context, ComponentLog verificationLogger, Map<String, String> attributes) {
        ArrayList<ConfigVerificationResult> verificationResults = new ArrayList<ConfigVerificationResult>();
        KafkaConnectionService connectionService = (KafkaConnectionService)context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
        boolean transactionsEnabled = context.getProperty(TRANSACTIONS_ENABLED).asBoolean();
        String transactionalIdPrefix = context.getProperty(TRANSACTIONAL_ID_PREFIX).evaluateAttributeExpressions().getValue();
        TransactionIdSupplier transactionalIdSupplier = new TransactionIdSupplier(transactionalIdPrefix);
        String deliveryGuarantee = context.getProperty(DELIVERY_GUARANTEE).getValue();
        String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue();
        String partitionClass = context.getProperty(PARTITION_CLASS).getValue();
        ProducerConfiguration producerConfiguration = new ProducerConfiguration(transactionsEnabled, (String)transactionalIdSupplier.get(), deliveryGuarantee, compressionCodec, partitionClass);
        try (KafkaProducerService producerService = connectionService.getProducerService(producerConfiguration);){
            ConfigVerificationResult.Builder verificationPartitions = new ConfigVerificationResult.Builder().verificationStepName("Verify Topic Partitions");
            String topicName = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions(attributes).getValue();
            try {
                List partitionStates = producerService.getPartitionStates(topicName);
                verificationPartitions.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation(String.format("Partitions [%d] found for Topic [%s]", partitionStates.size(), topicName));
            }
            catch (Exception e) {
                this.getLogger().error("Topic [%s] Partition verification failed", new Object[]{topicName, e});
                verificationPartitions.outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("Topic [%s] Partition access failed: %s", topicName, e));
            }
            verificationResults.add(verificationPartitions.build());
            ArrayList<ConfigVerificationResult> arrayList = verificationResults;
            return arrayList;
        }
    }

    @OnStopped
    public void onStopped() {
        KafkaProducerService service;
        while ((service = this.producerServices.poll()) != null) {
            service.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        List<FlowFile> flowFiles = PublishKafkaUtil.pollFlowFiles(session);
        if (flowFiles.isEmpty()) {
            return;
        }
        KafkaProducerService producerService = this.getProducerService(context);
        try {
            this.publishFlowFiles(context, session, flowFiles, producerService);
        }
        catch (Exception e) {
            String uuids = flowFiles.stream().map(ff -> ff.getAttribute(CoreAttributes.UUID.key())).collect(Collectors.joining(", "));
            this.getLogger().error("Failed to publish {} FlowFiles to Kafka: uuids={}", new Object[]{flowFiles.size(), uuids, e});
            producerService.close();
        }
        finally {
            if (!producerService.isClosed()) {
                this.producerServices.offer(producerService);
            }
        }
    }

    private KafkaProducerService getProducerService(ProcessContext context) {
        KafkaProducerService producerService = this.producerServices.poll();
        if (producerService != null) {
            return producerService;
        }
        return this.createProducerService(context);
    }

    private KafkaProducerService createProducerService(ProcessContext context) {
        KafkaConnectionService connectionService = (KafkaConnectionService)context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
        boolean transactionsEnabled = context.getProperty(TRANSACTIONS_ENABLED).asBoolean();
        String transactionalIdPrefix = context.getProperty(TRANSACTIONAL_ID_PREFIX).evaluateAttributeExpressions().getValue();
        String deliveryGuarantee = context.getProperty(DELIVERY_GUARANTEE).getValue();
        String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue();
        String partitionClass = context.getProperty(PARTITION_CLASS).getValue();
        ProducerConfiguration producerConfiguration = new ProducerConfiguration(transactionsEnabled, transactionalIdPrefix, deliveryGuarantee, compressionCodec, partitionClass);
        return connectionService.getProducerService(producerConfiguration);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void publishFlowFiles(ProcessContext context, ProcessSession session, List<FlowFile> flowFiles, KafkaProducerService producerService) {
        try {
            for (FlowFile flowFile : flowFiles) {
                this.publishFlowFile(context, session, flowFile, producerService);
            }
        }
        finally {
            RecordSummary recordSummary = null;
            try {
                recordSummary = producerService.complete();
            }
            catch (Exception e) {
                this.getLogger().warn("Failed to complete transaction with Kafka", (Throwable)e);
                producerService.close();
            }
            if (recordSummary == null || recordSummary.isFailure()) {
                this.routeFailureStrategy(context, session, flowFiles);
            } else {
                this.routeResults(session, recordSummary.getFlowFileResults());
            }
        }
    }

    private void routeFailureStrategy(ProcessContext context, ProcessSession session, List<FlowFile> flowFiles) {
        FailureStrategy strategy = (FailureStrategy)context.getProperty(FAILURE_STRATEGY).asAllowableValue(FailureStrategy.class);
        if (FailureStrategy.ROLLBACK == strategy) {
            session.rollback();
            context.yield();
        } else {
            session.transfer(flowFiles, REL_FAILURE);
        }
    }

    private void routeResults(ProcessSession session, List<FlowFileResult> flowFileResults) {
        for (FlowFileResult flowFileResult : flowFileResults) {
            long msgCount = flowFileResult.getSentCount();
            FlowFile flowFile = session.putAttribute(flowFileResult.getFlowFile(), MSG_COUNT, String.valueOf(msgCount));
            session.adjustCounter("Messages Sent", msgCount, true);
            for (Map.Entry entry : flowFileResult.getSentPerTopic().entrySet()) {
                session.adjustCounter("Messages Sent to " + (String)entry.getKey(), ((Long)entry.getValue()).longValue(), true);
            }
            Relationship relationship = flowFileResult.getExceptions().isEmpty() ? REL_SUCCESS : REL_FAILURE;
            session.transfer(flowFile, relationship);
            String topicList = String.join((CharSequence)",", flowFileResult.getSentPerTopic().keySet());
            session.getProvenanceReporter().send(flowFile, "kafka://" + topicList);
        }
    }

    private void publishFlowFile(ProcessContext context, ProcessSession session, FlowFile flowFile, KafkaProducerService producerService) {
        String topic = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions(flowFile.getAttributes()).getValue();
        Integer partition = this.getPartition(context, flowFile);
        PublishContext publishContext = new PublishContext(topic, partition, null, flowFile);
        KafkaRecordConverter kafkaRecordConverter = this.getKafkaRecordConverter(context, flowFile);
        PublishCallback callback = new PublishCallback(producerService, publishContext, kafkaRecordConverter, flowFile.getAttributes(), flowFile.getSize());
        session.read(flowFile, (InputStreamCallback)callback);
    }

    private Integer getPartition(ProcessContext context, FlowFile flowFile) {
        String partitionClass = context.getProperty(PARTITION_CLASS).getValue();
        if (PartitionStrategy.EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
            String partition = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
            return Objects.hashCode(partition);
        }
        return null;
    }

    private KafkaRecordConverter getKafkaRecordConverter(ProcessContext context, FlowFile flowFile) {
        RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        String attributeHeaderPatternProperty = context.getProperty(ATTRIBUTE_HEADER_PATTERN).getValue();
        Pattern attributeHeaderPattern = attributeHeaderPatternProperty == null ? null : Pattern.compile(attributeHeaderPatternProperty);
        String headerEncoding = context.getProperty(HEADER_ENCODING).evaluateAttributeExpressions().getValue();
        Charset headerEncodingCharacterSet = Charset.forName(headerEncoding);
        AttributesHeadersFactory headersFactory = new AttributesHeadersFactory(attributeHeaderPattern, headerEncodingCharacterSet);
        int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
        if (readerFactory != null && writerFactory != null) {
            KeyFactory keyFactory;
            RecordSetWriterFactory keyWriterFactory = (RecordSetWriterFactory)context.getProperty(RECORD_KEY_WRITER).asControllerService(RecordSetWriterFactory.class);
            PublishStrategy publishStrategy = PublishStrategy.valueOf((String)context.getProperty(PUBLISH_STRATEGY).getValue());
            RecordMetadataStrategy metadataStrategy = RecordMetadataStrategy.valueOf(context.getProperty(RECORD_METADATA_STRATEGY).getValue());
            String kafkaKeyAttribute = context.getProperty(KAFKA_KEY).getValue();
            String keyAttributeEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
            String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
            KeyFactory keyFactory2 = keyFactory = PublishStrategy.USE_VALUE == publishStrategy && messageKeyField != null ? new MessageKeyFactory(flowFile, messageKeyField, keyWriterFactory, this.getLogger()) : new AttributeKeyFactory(kafkaKeyAttribute, keyAttributeEncoding);
            if (publishStrategy == PublishStrategy.USE_WRAPPER) {
                return new RecordWrapperStreamKafkaRecordConverter(flowFile, metadataStrategy, readerFactory, writerFactory, keyWriterFactory, maxMessageSize, this.getLogger());
            }
            return new RecordStreamKafkaRecordConverter(readerFactory, writerFactory, headersFactory, keyFactory, maxMessageSize, this.getLogger());
        }
        PropertyValue demarcatorValue = context.getProperty(MESSAGE_DEMARCATOR);
        if (demarcatorValue.isSet()) {
            String demarcator = demarcatorValue.evaluateAttributeExpressions(flowFile).getValue();
            return new DelimitedStreamKafkaRecordConverter(demarcator.getBytes(StandardCharsets.UTF_8), maxMessageSize, headersFactory);
        }
        return new FlowFileStreamKafkaRecordConverter(maxMessageSize, headersFactory);
    }

    private static class PublishCallback
    implements InputStreamCallback {
        private final KafkaProducerService producerService;
        private final PublishContext publishContext;
        private final KafkaRecordConverter kafkaConverter;
        private final Map<String, String> attributes;
        private final long inputLength;

        public PublishCallback(KafkaProducerService producerService, PublishContext publishContext, KafkaRecordConverter kafkaConverter, Map<String, String> attributes, long inputLength) {
            this.producerService = producerService;
            this.publishContext = publishContext;
            this.kafkaConverter = kafkaConverter;
            this.attributes = attributes;
            this.inputLength = inputLength;
        }

        public void process(InputStream in) {
            try (BufferedInputStream is = new BufferedInputStream(in);){
                Iterator<KafkaRecord> records = this.kafkaConverter.convert(this.attributes, is, this.inputLength);
                this.producerService.send(records, this.publishContext);
            }
            catch (Exception e) {
                this.publishContext.setException(e);
                this.producerService.send(Collections.emptyIterator(), this.publishContext);
            }
        }
    }
}

