package com.mulesoft.connectors.kafka.internal.connection;

import com.mulesoft.connectors.commons.template.connection.ConnectorConnection;
import com.mulesoft.connectors.kafka.internal.error.exception.InputTooLargeException;
import com.mulesoft.connectors.kafka.internal.error.exception.InvalidTopicPartitionException;
import com.mulesoft.connectors.kafka.internal.error.exception.KafkaProducerFencedException;
import com.mulesoft.connectors.kafka.internal.error.exception.OperationTimeoutException;
import com.mulesoft.connectors.kafka.internal.error.exception.UnexpectedException;
import com.mulesoft.connectors.kafka.internal.model.serializer.InputStreamSerializer;
import java.io.InputStream;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.mule.runtime.api.tx.TransactionException;
import org.mule.runtime.core.api.util.IOUtils;
import org.mule.runtime.extension.api.connectivity.TransactionalConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mulesoft/connectors/kafka/internal/connection/ProducerConnection.class */
public class ProducerConnection implements TransactionalConnection, ConnectorConnection {
    private static final String PUBLISH = "publish";
    private static final String PRODUCER_FENCED_TEMPLATE = "The producer was fenced when beginning the transaction %s";
    private static final String CLUSTER_VERSION_NOT_SUPPORTING_TRANSACTIONS = "The Kafka version of the cluster does not support transactions";
    private static final Logger logger = LoggerFactory.getLogger(ProducerConnection.class);
    private final Producer<InputStream, InputStream> defaultProducer;
    private final Properties defaultProperties;
    private Producer<InputStream, InputStream> transactionProducer;
    private volatile String transactionId;
    private Function<Properties, Producer<InputStream, InputStream>> producerFactory = properties -> {
        return new KafkaProducer(properties, new InputStreamSerializer(), new InputStreamSerializer());
    };

    public ProducerConnection(Producer<InputStream, InputStream> producer, Properties properties) {
        this.defaultProducer = producer;
        this.defaultProperties = properties;
    }

    public void publish(String str, Integer num, InputStream inputStream, InputStream inputStream2, Map<String, InputStream> map, Consumer<RecordMetadata> consumer, Consumer<Throwable> consumer2) {
        getProducer().send(createProducerRecord(str, num, inputStream, inputStream2, map), (recordMetadata, exc) -> {
            if (exc == null) {
                consumer.accept(recordMetadata);
                return;
            }
            logger.error("There was an error while publishing a message", exc);
            if (exc instanceof AuthenticationException) {
                consumer2.accept(new com.mulesoft.connectors.kafka.internal.error.exception.AuthenticationException(exc, this));
                return;
            }
            if (exc instanceof AuthorizationException) {
                consumer2.accept(new com.mulesoft.connectors.kafka.internal.error.exception.AuthorizationException(exc));
                return;
            }
            if (exc instanceof TimeoutException) {
                consumer2.accept(new OperationTimeoutException(PUBLISH, exc));
            } else if (exc instanceof RecordTooLargeException) {
                consumer2.accept(new InputTooLargeException(exc.getMessage(), exc));
            } else {
                consumer2.accept(new UnexpectedException(PUBLISH, exc));
            }
        });
    }

    private ProducerRecord<InputStream, InputStream> createProducerRecord(String str, Integer num, InputStream inputStream, InputStream inputStream2, Map<String, InputStream> map) {
        try {
            return new ProducerRecord<>(str, num, Long.valueOf(System.currentTimeMillis()), inputStream, inputStream2, map != null ? (Iterable) map.entrySet().stream().map(entry -> {
                return new RecordHeader((String) entry.getKey(), IOUtils.toByteArray((InputStream) entry.getValue()));
            }).collect(Collectors.toList()) : null);
        } catch (IllegalArgumentException e) {
            throw new InvalidTopicPartitionException("Failed to construct message. Partition or topic is invalid.", e);
        }
    }

    public void begin() throws TransactionException {
        this.transactionId = UUID.randomUUID().toString();
        try {
            createTransactionalProducer();
        } catch (RuntimeException e) {
            wrapThrowTx(e);
        }
    }

    private void createTransactionalProducer() throws TransactionException {
        try {
            Properties properties = new Properties();
            properties.putAll(this.defaultProperties);
            properties.put("transactional.id", this.transactionId);
            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            Thread.currentThread().setContextClassLoader(KafkaProducer.class.getClassLoader());
            Producer<InputStream, InputStream> apply = this.producerFactory.apply(properties);
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            apply.initTransactions();
            apply.beginTransaction();
            this.transactionProducer = apply;
        } catch (AuthorizationException e) {
            throw new com.mulesoft.connectors.kafka.internal.error.exception.AuthorizationException(e);
        } catch (IllegalStateException | KafkaException e2) {
            throw new TransactionException(e2);
        } catch (AuthenticationException e3) {
            throw new com.mulesoft.connectors.kafka.internal.error.exception.AuthenticationException(e3, this);
        } catch (UnsupportedVersionException e4) {
            logger.error(CLUSTER_VERSION_NOT_SUPPORTING_TRANSACTIONS);
            throw new UnexpectedException(e4);
        } catch (ProducerFencedException e5) {
            throw new KafkaProducerFencedException(String.format(PRODUCER_FENCED_TEMPLATE, this.transactionId), e5);
        }
    }

    public void commit() throws TransactionException {
        try {
            handleTransactionalException((v0) -> {
                v0.commitTransaction();
            });
        } catch (RuntimeException e) {
            wrapThrowTx(e);
        }
    }

    public void rollback() throws TransactionException {
        try {
            handleTransactionalException((v0) -> {
                v0.abortTransaction();
            });
        } catch (RuntimeException e) {
            wrapThrowTx(e);
        }
    }

    private void handleTransactionalException(Consumer<Producer<InputStream, InputStream>> consumer) throws TransactionException {
        try {
            try {
                try {
                    try {
                        try {
                            try {
                                consumer.accept(this.transactionProducer);
                                this.transactionProducer.close();
                                this.transactionProducer = null;
                            } catch (UnsupportedVersionException e) {
                                logger.error(CLUSTER_VERSION_NOT_SUPPORTING_TRANSACTIONS);
                                throw new UnexpectedException(e);
                            }
                        } catch (ProducerFencedException e2) {
                            throw new KafkaProducerFencedException(String.format(PRODUCER_FENCED_TEMPLATE, this.transactionId), e2);
                        }
                    } catch (IllegalStateException | KafkaException e3) {
                        throw new TransactionException(e3);
                    }
                } catch (AuthenticationException e4) {
                    throw new com.mulesoft.connectors.kafka.internal.error.exception.AuthenticationException(e4, this);
                }
            } catch (TimeoutException e5) {
                throw new OperationTimeoutException(PUBLISH, (Throwable) e5);
            } catch (AuthorizationException e6) {
                throw new com.mulesoft.connectors.kafka.internal.error.exception.AuthorizationException(e6);
            }
        } catch (Throwable th) {
            this.transactionProducer.close();
            this.transactionProducer = null;
            throw th;
        }
    }

    public void disconnect() {
        Optional.ofNullable(this.transactionProducer).ifPresent((v0) -> {
            v0.close();
        });
    }

    public void validate() {
        getProducer().metrics();
    }

    protected void wrapThrowTx(Throwable th) throws TransactionException {
        throw new TransactionException(th);
    }

    private Producer<InputStream, InputStream> getProducer() {
        return (Producer) Optional.ofNullable(this.transactionProducer).orElse(this.defaultProducer);
    }
}
