/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.kafka.service;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.VerifiableControllerService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kafka.service.api.KafkaConnectionService;
import org.apache.nifi.kafka.service.api.common.ServiceConfiguration;
import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService;
import org.apache.nifi.kafka.service.api.consumer.PollingContext;
import org.apache.nifi.kafka.service.api.producer.KafkaProducerService;
import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration;
import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService;
import org.apache.nifi.kafka.service.consumer.pool.Subscription;
import org.apache.nifi.kafka.service.producer.Kafka3ProducerService;
import org.apache.nifi.kafka.shared.property.IsolationLevel;
import org.apache.nifi.kafka.shared.property.KafkaClientProperty;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;

@DynamicProperty(name="The name of a Kafka configuration property.", value="The value of a given Kafka configuration property.", description="These properties will be added on the Kafka configuration after loading any provided configuration properties. In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged. For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.", expressionLanguageScope=ExpressionLanguageScope.ENVIRONMENT)
@CapabilityDescription(value="Provides and manages connections to Kafka Brokers for producer or consumer operations.")
public class Kafka3ConnectionService
extends AbstractControllerService
implements KafkaConnectionService,
VerifiableControllerService {
    public static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder().name("bootstrap.servers").displayName("Bootstrap Servers").description("Comma-separated list of Kafka Bootstrap Servers in the format host:port. Corresponds to Kafka bootstrap.servers property").required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder().name("security.protocol").displayName("Security Protocol").description("Security protocol used to communicate with brokers. Corresponds to Kafka Client security.protocol property").required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues((Enum[])SecurityProtocol.values()).defaultValue(SecurityProtocol.PLAINTEXT.name()).build();
    public static final PropertyDescriptor SASL_MECHANISM = new PropertyDescriptor.Builder().name("sasl.mechanism").displayName("SASL Mechanism").description("SASL mechanism used for authentication. Corresponds to Kafka Client sasl.mechanism property").required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(SaslMechanism.getAvailableSaslMechanisms()).defaultValue(SaslMechanism.GSSAPI.getValue()).build();
    public static final PropertyDescriptor SASL_USERNAME = new PropertyDescriptor.Builder().name("sasl.username").displayName("SASL Username").description("Username provided with configured password when using PLAIN or SCRAM SASL Mechanisms").required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).dependsOn(SASL_MECHANISM, SaslMechanism.PLAIN.getValue(), new String[]{SaslMechanism.SCRAM_SHA_256.getValue(), SaslMechanism.SCRAM_SHA_512.getValue()}).build();
    public static final PropertyDescriptor SASL_PASSWORD = new PropertyDescriptor.Builder().name("sasl.password").displayName("SASL Password").description("Password provided with configured username when using PLAIN or SCRAM SASL Mechanisms").required(true).sensitive(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).dependsOn(SASL_MECHANISM, SaslMechanism.PLAIN.getValue(), new String[]{SaslMechanism.SCRAM_SHA_256.getValue(), SaslMechanism.SCRAM_SHA_512.getValue()}).build();
    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("SSL Context Service").description("Service supporting SSL communication with Kafka brokers").required(false).identifiesControllerService(SSLContextService.class).build();
    public static final PropertyDescriptor TRANSACTION_ISOLATION_LEVEL = new PropertyDescriptor.Builder().name("isolation.level").displayName("Transaction Isolation Level").description("Specifies how the service should handle transaction isolation levels when communicating with Kafka.\nThe uncommited option means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions.\nThe committed option configures the service to not receive any messages for which the producer's transaction was canceled, but this can result in some latency since the\nconsumer must wait for the producer to finish its entire transaction instead of pulling as the messages become available.\nCorresponds to Kafka isolation.level property.\n").expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(IsolationLevel.class).defaultValue((DescribedValue)IsolationLevel.READ_COMMITTED).required(true).build();
    public static final PropertyDescriptor MAX_POLL_RECORDS = new PropertyDescriptor.Builder().name("max.poll.records").displayName("Max Poll Records").description("Maximum number of records Kafka should return in a single poll.").required(true).defaultValue("10000").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor CLIENT_TIMEOUT = new PropertyDescriptor.Builder().name("default.api.timeout.ms").displayName("Client Timeout").description("Default timeout for Kafka client operations. Mapped to Kafka default.api.timeout.ms. The Kafka request.timeout.ms property is derived from half of the configured timeout").defaultValue("60 sec").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final PropertyDescriptor METADATA_WAIT_TIME = new PropertyDescriptor.Builder().name("max.block.ms").displayName("Max Metadata Wait Time").description("The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the\nentire 'send' call. Corresponds to Kafka max.block.ms property\n").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT).defaultValue("5 sec").build();
    public static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder().name("ack.wait.time").displayName("Acknowledgment Wait Time").description("After sending a message to Kafka, this indicates the amount of time that the service will wait for a response from Kafka.\nIf Kafka does not acknowledge the message within this time period, the service will throw an exception.\n").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).defaultValue("5 sec").build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(BOOTSTRAP_SERVERS, SECURITY_PROTOCOL, SASL_MECHANISM, SASL_USERNAME, SASL_PASSWORD, SSL_CONTEXT_SERVICE, TRANSACTION_ISOLATION_LEVEL, MAX_POLL_RECORDS, CLIENT_TIMEOUT, METADATA_WAIT_TIME, ACK_WAIT_TIME);
    private static final Duration VERIFY_TIMEOUT = Duration.ofSeconds(2L);
    private static final String CONNECTION_STEP = "Kafka Broker Connection";
    private static final String TOPIC_LISTING_STEP = "Kafka Topic Listing";
    private volatile Properties clientProperties;
    private volatile ServiceConfiguration serviceConfiguration;
    private volatile Properties consumerProperties;

    @OnEnabled
    public void onEnabled(ConfigurationContext configurationContext) {
        this.clientProperties = this.getClientProperties((PropertyContext)configurationContext);
        this.serviceConfiguration = this.getServiceConfiguration((PropertyContext)configurationContext);
        this.consumerProperties = this.getConsumerProperties((PropertyContext)configurationContext, this.clientProperties);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public KafkaConsumerService getConsumerService(PollingContext pollingContext) {
        Objects.requireNonNull(pollingContext, "Polling Context required");
        Subscription subscription = this.createSubscription(pollingContext);
        Properties properties = new Properties();
        properties.putAll((Map<?, ?>)this.consumerProperties);
        properties.put("group.id", subscription.getGroupId());
        properties.put("auto.offset.reset", subscription.getAutoOffsetReset().getValue());
        ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
        KafkaConsumer consumer = new KafkaConsumer(properties, (Deserializer)deserializer, (Deserializer)deserializer);
        Optional<Pattern> topicPatternFound = subscription.getTopicPattern();
        if (topicPatternFound.isPresent()) {
            Pattern topicPattern = topicPatternFound.get();
            consumer.subscribe(topicPattern);
        } else {
            Collection<String> topics = subscription.getTopics();
            consumer.subscribe(topics);
        }
        Kafka3ConsumerService consumerService = new Kafka3ConsumerService(this.getLogger(), (Consumer<byte[], byte[]>)consumer, subscription, pollingContext.getMaxUncommittedTime());
        return consumerService;
    }

    private Subscription createSubscription(PollingContext pollingContext) {
        String groupId = pollingContext.getGroupId();
        Optional topicPatternFound = pollingContext.getTopicPattern();
        AutoOffsetReset autoOffsetReset = pollingContext.getAutoOffsetReset();
        return topicPatternFound.map(pattern -> new Subscription(groupId, (Pattern)pattern, autoOffsetReset)).orElseGet(() -> new Subscription(groupId, pollingContext.getTopics(), autoOffsetReset));
    }

    public KafkaProducerService getProducerService(ProducerConfiguration producerConfiguration) {
        String partitionClass;
        Properties propertiesProducer = new Properties();
        propertiesProducer.putAll((Map<?, ?>)this.clientProperties);
        if (producerConfiguration.getTransactionsEnabled()) {
            propertiesProducer.put("transactional.id", new TransactionIdSupplier(producerConfiguration.getTransactionIdPrefix()).get());
        }
        if (producerConfiguration.getDeliveryGuarantee() != null) {
            propertiesProducer.put("acks", producerConfiguration.getDeliveryGuarantee());
        }
        if (producerConfiguration.getCompressionCodec() != null) {
            propertiesProducer.put("compression.type", producerConfiguration.getCompressionCodec());
        }
        if ((partitionClass = producerConfiguration.getPartitionClass()) != null && partitionClass.startsWith("org.apache.kafka")) {
            propertiesProducer.put("partitioner.class", partitionClass);
        }
        return new Kafka3ProducerService(propertiesProducer, this.serviceConfiguration, producerConfiguration);
    }

    public List<ConfigVerificationResult> verify(ConfigurationContext configurationContext, ComponentLog verificationLogger, Map<String, String> variables) {
        ArrayList<ConfigVerificationResult> results = new ArrayList<ConfigVerificationResult>();
        Properties clientProperties = this.getClientProperties((PropertyContext)configurationContext);
        clientProperties.putAll(variables);
        try (Admin admin = Admin.create((Properties)clientProperties);){
            ListTopicsResult listTopicsResult = admin.listTopics();
            KafkaFuture requestedListings = listTopicsResult.listings();
            Collection topicListings = (Collection)requestedListings.get(VERIFY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
            String topicListingExplanation = String.format("Topics Found [%d]", topicListings.size());
            results.add(new ConfigVerificationResult.Builder().verificationStepName(TOPIC_LISTING_STEP).outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation(topicListingExplanation).build());
        }
        catch (Exception e) {
            verificationLogger.error("Kafka Broker verification failed", (Throwable)e);
            results.add(new ConfigVerificationResult.Builder().verificationStepName(CONNECTION_STEP).outcome(ConfigVerificationResult.Outcome.FAILED).explanation(e.getMessage()).build());
        }
        return results;
    }

    private Properties getConsumerProperties(PropertyContext propertyContext, Properties defaultProperties) {
        Properties properties = new Properties();
        properties.putAll((Map<?, ?>)defaultProperties);
        IsolationLevel isolationLevel = (IsolationLevel)propertyContext.getProperty(TRANSACTION_ISOLATION_LEVEL).asAllowableValue(IsolationLevel.class);
        properties.put(TRANSACTION_ISOLATION_LEVEL.getName(), isolationLevel.getValue());
        return properties;
    }

    private Properties getClientProperties(PropertyContext propertyContext) {
        Properties properties = new Properties();
        StandardKafkaPropertyProvider propertyProvider = new StandardKafkaPropertyProvider(ConsumerConfig.class);
        Map propertiesProvider = propertyProvider.getProperties(propertyContext);
        propertiesProvider.forEach((key, value) -> properties.setProperty((String)key, value.toString()));
        String configuredBootstrapServers = propertyContext.getProperty(BOOTSTRAP_SERVERS).getValue();
        properties.put("bootstrap.servers", configuredBootstrapServers);
        this.setSslProperties(properties, propertyContext);
        int defaultApiTimeoutMs = this.getDefaultApiTimeoutMs(propertyContext);
        properties.put("default.api.timeout.ms", (Object)defaultApiTimeoutMs);
        int requestTimeoutMs = this.getRequestTimeoutMs(propertyContext);
        properties.put("request.timeout.ms", (Object)requestTimeoutMs);
        long timePeriod = propertyContext.getProperty(METADATA_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
        properties.put("max.block.ms", (Object)timePeriod);
        return properties;
    }

    private void setSslProperties(Properties properties, PropertyContext context) {
        PropertyValue sslContextServiceProperty = context.getProperty(SSL_CONTEXT_SERVICE);
        if (sslContextServiceProperty.isSet()) {
            SSLContextService sslContextService = (SSLContextService)sslContextServiceProperty.asControllerService(SSLContextService.class);
            if (sslContextService.isKeyStoreConfigured()) {
                properties.put(KafkaClientProperty.SSL_KEYSTORE_LOCATION.getProperty(), sslContextService.getKeyStoreFile());
                properties.put(KafkaClientProperty.SSL_KEYSTORE_TYPE.getProperty(), sslContextService.getKeyStoreType());
                String keyStorePassword = sslContextService.getKeyStorePassword();
                properties.put(KafkaClientProperty.SSL_KEYSTORE_PASSWORD.getProperty(), keyStorePassword);
                String keyPassword = sslContextService.getKeyPassword();
                String configuredKeyPassword = keyPassword == null ? keyStorePassword : keyPassword;
                properties.put(KafkaClientProperty.SSL_KEY_PASSWORD.getProperty(), configuredKeyPassword);
            }
            if (sslContextService.isTrustStoreConfigured()) {
                properties.put(KafkaClientProperty.SSL_TRUSTSTORE_LOCATION.getProperty(), sslContextService.getTrustStoreFile());
                properties.put(KafkaClientProperty.SSL_TRUSTSTORE_TYPE.getProperty(), sslContextService.getTrustStoreType());
                properties.put(KafkaClientProperty.SSL_TRUSTSTORE_PASSWORD.getProperty(), sslContextService.getTrustStorePassword());
            }
        }
    }

    private ServiceConfiguration getServiceConfiguration(PropertyContext propertyContext) {
        Duration maxAckWait = propertyContext.getProperty(ACK_WAIT_TIME).asDuration();
        return new ServiceConfiguration(maxAckWait);
    }

    private int getDefaultApiTimeoutMs(PropertyContext propertyContext) {
        return propertyContext.getProperty(CLIENT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
    }

    private int getRequestTimeoutMs(PropertyContext propertyContext) {
        int defaultApiTimeoutMs = this.getDefaultApiTimeoutMs(propertyContext);
        return defaultApiTimeoutMs / 2;
    }
}

