package com.mulesoft.connectors.kafka.internal.connection.provider;

import com.mulesoft.connectors.commons.template.connection.ConnectorConnection;
import com.mulesoft.connectors.commons.template.connection.provider.ConnectorConnectionProvider;
import com.mulesoft.connectors.kafka.internal.error.exception.InvalidConfigurationException;
import com.mulesoft.connectors.kafka.internal.metadata.EndpointIdentificationAlgorithmValueProvider;
import com.mulesoft.connectors.kafka.internal.model.serializer.InputStreamDeserializer;
import java.io.InputStream;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.lifecycle.Disposable;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.meta.ExpressionSupport;
import org.mule.runtime.api.tls.TlsContextFactory;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.extension.api.annotation.Expression;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.annotation.param.display.Example;
import org.mule.runtime.extension.api.annotation.param.display.Placement;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.annotation.values.OfValues;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mulesoft/connectors/kafka/internal/connection/provider/KafkaConnectionProvider.class */
public abstract class KafkaConnectionProvider<T extends ConnectorConnection> implements ConnectorConnectionProvider<T>, Initialisable, Disposable {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConnectionProvider.class);

    @Parameter
    @Summary("The urls that the consumer can try to use to connect to the kafka cluster.")
    @Example("localhost:9092,1.2.3.4:9092,abc.def.com:9092")
    @Placement(order = 10)
    @DisplayName("Bootstrap Server URLs")
    @Expression(ExpressionSupport.NOT_SUPPORTED)
    private List<String> bootstrapServers;

    @Optional
    @Parameter
    @Placement(tab = "Security", order = 1)
    @DisplayName("TLS Configuration")
    @Expression(ExpressionSupport.NOT_SUPPORTED)
    private TlsContextFactory tlsContext;

    @Optional
    @OfValues(EndpointIdentificationAlgorithmValueProvider.class)
    @Parameter
    @Summary("The endpoint identification algorithm used by clients to validate server host name.")
    @Placement(tab = "Security", order = 100)
    @DisplayName("Endpoint identification algorithm")
    private String endpointIdentificationAlgorithm;
    private final SecurityProtocol plainProtocol;
    private final SecurityProtocol sslProtocol;
    private Properties properties;
    private Function<Properties, Consumer<InputStream, InputStream>> kafkaConsumerFactory = KafkaConsumer::new;

    /* JADX INFO: Access modifiers changed from: protected */
    public Function<Properties, Consumer<InputStream, InputStream>> getKafkaConsumerFunction() {
        return this.kafkaConsumerFactory;
    }

    protected void testConnectivity(Properties properties) throws ConnectionException {
        Properties properties2 = (Properties) properties.clone();
        properties2.setProperty("key.deserializer", InputStreamDeserializer.class.getName());
        properties2.setProperty("value.deserializer", InputStreamDeserializer.class.getName());
        properties2.setProperty("group.id", "connectivity");
        properties2.remove("compression.type");
        properties2.remove("enable.idempotence");
        properties2.remove("delivery.timeout.ms");
        properties2.remove("buffer.memory");
        properties2.remove("key.serializer");
        properties2.remove("max.block.ms");
        properties2.remove("max.in.flight.requests.per.connection");
        properties2.remove("acks");
        properties2.remove("batch.size");
        properties2.remove("retries");
        properties2.remove("max.request.size");
        properties2.remove("value.serializer");
        properties2.remove("linger.ms");
        properties2.remove("metadata.max.idle.ms");
        properties2.remove("delivery.timeout.ms");
        properties2.remove("partitioner.class");
        properties2.remove("transaction.timeout.ms");
        properties2.remove("transactional.id");
        Consumer<InputStream, InputStream> apply = getKafkaConsumerFunction().apply(properties2);
        try {
            apply.listTopics();
            apply.close();
        } catch (KafkaException e) {
            throw new ConnectionException(e);
        }
    }

    public KafkaConnectionProvider(SecurityProtocol securityProtocol, SecurityProtocol securityProtocol2) {
        this.plainProtocol = securityProtocol;
        this.sslProtocol = securityProtocol2;
    }

    public void initialise() throws InitialisationException {
        this.properties = new Properties();
        setPropertyAsString("bootstrap.servers", this.bootstrapServers.stream().collect(Collectors.joining(",")));
        if (this.tlsContext != null) {
            LifecycleUtils.initialiseIfNeeded(this.tlsContext);
            if (this.tlsContext.isKeyStoreConfigured()) {
                setPropertyAsString("ssl.key.password", this.tlsContext.getKeyStoreConfiguration().getKeyPassword());
                setPropertyAsString("ssl.keystore.location", this.tlsContext.getKeyStoreConfiguration().getPath());
                setPropertyAsString("ssl.keystore.password", this.tlsContext.getKeyStoreConfiguration().getKeyPassword());
                setPropertyAsString("ssl.keystore.type", this.tlsContext.getKeyStoreConfiguration().getType());
                setPropertyAsString("ssl.keymanager.algorithm", this.tlsContext.getKeyStoreConfiguration().getAlgorithm());
            }
            if (this.tlsContext.isTrustStoreConfigured()) {
                setPropertyAsString("ssl.truststore.location", this.tlsContext.getTrustStoreConfiguration().getPath());
                setPropertyAsString("ssl.truststore.password", this.tlsContext.getTrustStoreConfiguration().getPassword());
                setPropertyAsString("ssl.truststore.type", this.tlsContext.getTrustStoreConfiguration().getType());
                setPropertyAsString("ssl.trustmanager.algorithm", this.tlsContext.getTrustStoreConfiguration().getAlgorithm());
            }
            try {
                SSLContext createSslContext = this.tlsContext.createSslContext();
                setProperties("ssl.cipher.suites", createSslContext.getDefaultSSLParameters().getCipherSuites());
                setProperties("ssl.enabled.protocols", this.tlsContext.getEnabledProtocols());
                setPropertyAsString("ssl.protocol", createSslContext.getProtocol());
                setPropertyAsString("ssl.endpoint.identification.algorithm", this.endpointIdentificationAlgorithm != null ? this.endpointIdentificationAlgorithm.replace("disabled", "") : null);
                setPropertyAsString("ssl.provider", java.util.Optional.ofNullable(createSslContext.getProvider()).map((v0) -> {
                    return v0.getName();
                }).orElse(null));
            } catch (KeyManagementException | NoSuchAlgorithmException e) {
                throw new MuleRuntimeException(e);
            }
        }
        setPropertyAsString("security.protocol", (this.tlsContext == null ? this.plainProtocol : this.sslProtocol).name());
        initialise(this.properties);
        if (logger.isTraceEnabled()) {
            logger.trace("Logging Kafka Consumer connection properties:");
            this.properties.forEach((obj, obj2) -> {
                logger.trace("Key: '{}', Value: '{}'", obj, obj2.toString().replaceAll("password=.*", "password=********"));
            });
        }
    }

    protected abstract void initialise(Properties properties) throws InitialisationException;

    /* renamed from: connect, reason: merged with bridge method [inline-methods] */
    public T m14connect() throws ConnectionException {
        testConnectivity(this.properties);
        return connect(this.properties);
    }

    protected abstract T connect(Properties properties) throws ConnectionException;

    /* JADX INFO: Access modifiers changed from: protected */
    public void setPropertyAsString(String str, Object obj) {
        if (obj != null) {
            this.properties.put(str, obj.toString());
        }
    }

    private void setProperties(String str, String[] strArr) {
        if (strArr == null || strArr.length <= 0) {
            return;
        }
        this.properties.put(str, Arrays.asList(strArr));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleConnectionException(KafkaException kafkaException) {
        throw new InvalidConfigurationException("The provided configuration is invalid!", kafkaException);
    }
}
