/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.core;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.jspecify.annotations.Nullable;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaProducerException;
import org.springframework.kafka.core.KafkaResourceHolder;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.ProducerFactoryUtils;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.micrometer.KafkaRecordSenderContext;
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation;
import org.springframework.kafka.support.micrometer.KafkaTemplateObservationConvention;
import org.springframework.kafka.support.micrometer.MicrometerHolder;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

public class KafkaTemplate<K, V>
implements KafkaOperations<K, V>,
ApplicationContextAware,
BeanNameAware,
ApplicationListener<ContextStoppedEvent>,
DisposableBean,
SmartInitializingSingleton {
    protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));
    private final ProducerFactory<K, V> producerFactory;
    private final boolean customProducerFactory;
    private final boolean autoFlush;
    private final boolean transactional;
    private final Map<Thread, Producer<K, V>> producers = new ConcurrentHashMap<Thread, Producer<K, V>>();
    private final Map<String, String> micrometerTags = new HashMap<String, String>();
    private final Lock clusterIdLock = new ReentrantLock();
    private String beanName = "kafkaTemplate";
    private @Nullable ApplicationContext applicationContext;
    private RecordMessageConverter messageConverter = new MessagingMessageConverter();
    private @Nullable String defaultTopic;
    private @Nullable ProducerListener<K, V> producerListener = new LoggingProducerListener();
    private @Nullable String transactionIdPrefix;
    private Duration closeTimeout = ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT;
    private boolean allowNonTransactional;
    private boolean converterSet;
    private @Nullable ConsumerFactory<K, V> consumerFactory;
    private @Nullable ProducerInterceptor<K, V> producerInterceptor;
    private boolean micrometerEnabled = true;
    private @Nullable MicrometerHolder micrometerHolder;
    private boolean observationEnabled;
    private @Nullable KafkaTemplateObservationConvention observationConvention;
    private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;
    private @Nullable Function<ProducerRecord<?, ?>, Map<String, String>> micrometerTagsProvider;
    private @Nullable KafkaAdmin kafkaAdmin;
    private @Nullable String clusterId;

    public KafkaTemplate(ProducerFactory<K, V> producerFactory) {
        this(producerFactory, false);
    }

    public KafkaTemplate(ProducerFactory<K, V> producerFactory, @Nullable Map<String, Object> configOverrides) {
        this(producerFactory, false, configOverrides);
    }

    public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) {
        this(producerFactory, autoFlush, null);
    }

    public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush, @Nullable Map<String, Object> configOverrides) {
        Assert.notNull(producerFactory, (String)"'producerFactory' cannot be null");
        this.autoFlush = autoFlush;
        this.micrometerEnabled = KafkaUtils.MICROMETER_PRESENT;
        this.customProducerFactory = !CollectionUtils.isEmpty(configOverrides);
        this.producerFactory = this.customProducerFactory ? producerFactory.copyWithConfigurationOverride(configOverrides) : producerFactory;
        this.transactional = this.producerFactory.transactionCapable();
    }

    public void setBeanName(String name) {
        this.beanName = name;
    }

    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
        if (this.customProducerFactory) {
            ((DefaultKafkaProducerFactory)this.producerFactory).setApplicationContext(applicationContext);
        }
    }

    public @Nullable String getDefaultTopic() {
        return this.defaultTopic;
    }

    public void setDefaultTopic(String defaultTopic) {
        this.defaultTopic = defaultTopic;
    }

    public void setProducerListener(@Nullable ProducerListener<K, V> producerListener) {
        this.producerListener = producerListener;
    }

    public RecordMessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setMessageConverter(RecordMessageConverter messageConverter) {
        Assert.notNull((Object)messageConverter, (String)"'messageConverter' cannot be null");
        this.messageConverter = messageConverter;
        this.converterSet = true;
    }

    public void setMessagingConverter(SmartMessageConverter messageConverter) {
        Assert.isTrue((!this.converterSet ? 1 : 0) != 0, (String)"Cannot set the SmartMessageConverter when setting the messageConverter, add the SmartConverter to the message converter instead");
        ((MessagingMessageConverter)this.messageConverter).setMessagingConverter(messageConverter);
    }

    @Override
    public boolean isTransactional() {
        return this.transactional;
    }

    public @Nullable String getTransactionIdPrefix() {
        return this.transactionIdPrefix;
    }

    public void setTransactionIdPrefix(String transactionIdPrefix) {
        this.transactionIdPrefix = transactionIdPrefix;
    }

    public void setCloseTimeout(Duration closeTimeout) {
        Assert.notNull((Object)closeTimeout, (String)"'closeTimeout' cannot be null");
        this.closeTimeout = closeTimeout;
    }

    public void setAllowNonTransactional(boolean allowNonTransactional) {
        this.allowNonTransactional = allowNonTransactional;
    }

    @Override
    public boolean isAllowNonTransactional() {
        return this.allowNonTransactional;
    }

    public void setMicrometerEnabled(boolean micrometerEnabled) {
        this.micrometerEnabled = micrometerEnabled;
    }

    public void setMicrometerTags(@Nullable Map<String, String> tags) {
        if (tags != null) {
            this.micrometerTags.putAll(tags);
        }
    }

    public void setMicrometerTagsProvider(@Nullable Function<ProducerRecord<?, ?>, Map<String, String>> micrometerTagsProvider) {
        this.micrometerTagsProvider = micrometerTagsProvider;
    }

    public @Nullable Function<ProducerRecord<?, ?>, Map<String, String>> getMicrometerTagsProvider() {
        return this.micrometerTagsProvider;
    }

    @Override
    public ProducerFactory<K, V> getProducerFactory() {
        return this.producerFactory;
    }

    protected ProducerFactory<K, V> getProducerFactory(String topic) {
        return this.producerFactory;
    }

    public void setConsumerFactory(ConsumerFactory<K, V> consumerFactory) {
        this.consumerFactory = consumerFactory;
    }

    public void setProducerInterceptor(ProducerInterceptor<K, V> producerInterceptor) {
        this.producerInterceptor = producerInterceptor;
    }

    public void setObservationEnabled(boolean observationEnabled) {
        this.observationEnabled = observationEnabled;
    }

    public void setObservationConvention(KafkaTemplateObservationConvention observationConvention) {
        this.observationConvention = observationConvention;
    }

    public void setObservationRegistry(ObservationRegistry observationRegistry) {
        Assert.notNull((Object)observationRegistry, (String)"'observationRegistry' must not be null");
        this.observationRegistry = observationRegistry;
    }

    public @Nullable KafkaAdmin getKafkaAdmin() {
        return this.kafkaAdmin;
    }

    public void setKafkaAdmin(KafkaAdmin kafkaAdmin) {
        this.kafkaAdmin = kafkaAdmin;
    }

    public void afterSingletonsInstantiated() {
        if (this.observationEnabled && this.applicationContext != null) {
            if (this.observationRegistry.isNoop()) {
                this.observationRegistry = (ObservationRegistry)this.applicationContext.getBeanProvider(ObservationRegistry.class).getIfUnique(() -> this.observationRegistry);
            }
            if (this.kafkaAdmin == null) {
                this.kafkaAdmin = (KafkaAdmin)this.applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
                if (this.kafkaAdmin != null) {
                    Object bootstrapServerConfig = this.producerFactory.getConfigurationProperties().get("bootstrap.servers");
                    Assert.state((bootstrapServerConfig != null ? 1 : 0) != 0, (String)"'bootstrapServers' must not be null");
                    String producerServers = bootstrapServerConfig.toString();
                    producerServers = KafkaTemplate.removeLeadingAndTrailingBrackets(producerServers);
                    String adminServers = this.getAdminBootstrapAddress();
                    if (!producerServers.equals(adminServers)) {
                        HashMap<String, Object> props = new HashMap<String, Object>(this.kafkaAdmin.getConfigurationProperties());
                        props.put("bootstrap.servers", producerServers);
                        int opTo = this.kafkaAdmin.getOperationTimeout();
                        String clusterId = this.kafkaAdmin.getClusterId();
                        this.kafkaAdmin = new KafkaAdmin(props);
                        this.kafkaAdmin.setOperationTimeout(opTo);
                        if (clusterId != null && !clusterId.isEmpty()) {
                            this.kafkaAdmin.setClusterId(clusterId);
                        }
                    }
                }
            }
        } else if (this.micrometerEnabled) {
            this.micrometerHolder = this.obtainMicrometerHolder();
        }
    }

    private String getAdminBootstrapAddress() {
        String adminServers;
        String string = adminServers = this.kafkaAdmin == null ? null : this.kafkaAdmin.getBootstrapServers();
        if (adminServers == null && this.kafkaAdmin != null) {
            adminServers = this.kafkaAdmin.getConfigurationProperties().getOrDefault("bootstrap.servers", "").toString();
        }
        return KafkaTemplate.removeLeadingAndTrailingBrackets(adminServers == null ? "" : adminServers);
    }

    private @Nullable String clusterId() {
        if (this.kafkaAdmin != null && this.clusterId == null) {
            this.clusterIdLock.lock();
            try {
                if (this.clusterId == null) {
                    this.clusterId = this.kafkaAdmin.clusterId();
                }
            }
            finally {
                this.clusterIdLock.unlock();
            }
        }
        return this.clusterId;
    }

    public void onApplicationEvent(ContextStoppedEvent event) {
        if (this.customProducerFactory) {
            ((DefaultKafkaProducerFactory)this.producerFactory).onApplicationEvent(event);
        }
    }

    @Override
    public CompletableFuture<SendResult<K, V>> sendDefault(@Nullable V data) {
        Assert.state((this.defaultTopic != null ? 1 : 0) != 0, (String)"'defaultTopic' must not be null");
        return this.send(this.defaultTopic, data);
    }

    @Override
    public CompletableFuture<SendResult<K, V>> sendDefault(K key, @Nullable V data) {
        Assert.state((this.defaultTopic != null ? 1 : 0) != 0, (String)"'defaultTopic' must not be null");
        return this.send(this.defaultTopic, key, data);
    }

    @Override
    public CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, @Nullable V data) {
        Assert.state((this.defaultTopic != null ? 1 : 0) != 0, (String)"'defaultTopic' must not be null");
        return this.send(this.defaultTopic, partition, key, data);
    }

    @Override
    public CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, @Nullable V data) {
        Assert.state((this.defaultTopic != null ? 1 : 0) != 0, (String)"'defaultTopic' must not be null");
        return this.send(this.defaultTopic, partition, timestamp, key, data);
    }

    @Override
    public CompletableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
        ProducerRecord producerRecord = new ProducerRecord(topic, data);
        return this.observeSend(producerRecord);
    }

    @Override
    public CompletableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
        ProducerRecord producerRecord = new ProducerRecord(topic, key, data);
        return this.observeSend(producerRecord);
    }

    @Override
    public CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) {
        ProducerRecord producerRecord = new ProducerRecord(topic, partition, key, data);
        return this.observeSend(producerRecord);
    }

    @Override
    public CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, @Nullable V data) {
        ProducerRecord producerRecord = new ProducerRecord(topic, partition, timestamp, key, data);
        return this.observeSend(producerRecord);
    }

    @Override
    public CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {
        Assert.notNull(record, (String)"'record' cannot be null");
        return this.observeSend(record);
    }

    @Override
    public CompletableFuture<SendResult<K, V>> send(Message<?> message) {
        byte[] correlationId;
        ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic);
        if (!producerRecord.headers().iterator().hasNext() && (correlationId = (byte[])message.getHeaders().get((Object)"kafka_correlationId", byte[].class)) != null) {
            producerRecord.headers().add("kafka_correlationId", correlationId);
        }
        return this.observeSend(producerRecord);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        Producer<K, V> producer = this.getTheProducer();
        try {
            List list = producer.partitionsFor(topic);
            return list;
        }
        finally {
            this.closeProducer(producer, this.inTransaction());
        }
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        Producer<K, V> producer = this.getTheProducer();
        try {
            Map map = producer.metrics();
            return map;
        }
        finally {
            this.closeProducer(producer, this.inTransaction());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <T> T execute(KafkaOperations.ProducerCallback<K, V, T> callback) {
        Assert.notNull(callback, (String)"'callback' cannot be null");
        Producer<K, V> producer = this.getTheProducer();
        try {
            T t = callback.doInKafka(producer);
            return t;
        }
        finally {
            this.closeProducer(producer, this.inTransaction());
        }
    }

    @Override
    public <T> @Nullable T executeInTransaction(KafkaOperations.OperationsCallback<K, V, T> callback) {
        Assert.notNull(callback, (String)"'callback' cannot be null");
        Assert.state((boolean)this.transactional, (String)"Producer factory does not support transactions");
        Thread currentThread = Thread.currentThread();
        Producer<K, V> producer = this.producers.get(currentThread);
        Assert.state((producer == null ? 1 : 0) != 0, (String)"Nested calls to 'executeInTransaction' are not allowed");
        producer = this.producerFactory.createProducer(this.transactionIdPrefix);
        try {
            producer.beginTransaction();
        }
        catch (Exception e) {
            this.closeProducer(producer, false);
            throw e;
        }
        this.producers.put(currentThread, producer);
        try {
            T result = callback.doInOperations(this);
            try {
                producer.commitTransaction();
            }
            catch (Exception e) {
                throw new SkipAbortException(e);
            }
            T e = result;
            return e;
        }
        catch (SkipAbortException e) {
            throw (RuntimeException)e.getCause();
        }
        catch (Exception ex) {
            try {
                producer.abortTransaction();
            }
            catch (Exception abortException) {
                ex.addSuppressed(abortException);
            }
            throw ex;
        }
        finally {
            this.producers.remove(currentThread);
            this.closeProducer(producer, false);
        }
    }

    @Override
    public void flush() {
        Producer<K, V> producer = this.getTheProducer();
        try {
            producer.flush();
        }
        finally {
            this.closeProducer(producer, this.inTransaction());
        }
    }

    @Override
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata) {
        this.producerForOffsets().sendOffsetsToTransaction(offsets, groupMetadata);
    }

    @Override
    public @Nullable ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout) {
        Properties props = this.oneOnly();
        Assert.notNull(this.consumerFactory, (String)"A consumerFactory is required");
        try (Consumer<K, V> consumer = this.consumerFactory.createConsumer(null, null, null, props);){
            TopicPartition topicPartition = new TopicPartition(topic, partition);
            ConsumerRecord<K, V> consumerRecord = this.receiveOne(topicPartition, offset, pollTimeout, consumer);
            return consumerRecord;
        }
    }

    @Override
    public ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout) {
        Properties props = this.oneOnly();
        LinkedHashMap records = new LinkedHashMap();
        Assert.notNull(this.consumerFactory, (String)"A consumerFactory is required");
        try (Consumer<K, V> consumer = this.consumerFactory.createConsumer(null, null, null, props);){
            requested.forEach(tpo -> {
                if (tpo.getOffset() == null || tpo.getOffset() < 0L) {
                    throw new KafkaException("Offset supplied in TopicPartitionOffset is invalid: " + String.valueOf(tpo));
                }
                ConsumerRecord<K, V> one = this.receiveOne(tpo.getTopicPartition(), tpo.getOffset(), pollTimeout, consumer);
                List consumerRecords = records.computeIfAbsent(tpo.getTopicPartition(), tp -> new ArrayList());
                if (one != null) {
                    consumerRecords.add(one);
                }
            });
            ConsumerRecords consumerRecords = new ConsumerRecords(records);
            return consumerRecords;
        }
    }

    private Properties oneOnly() {
        Assert.notNull(this.consumerFactory, (String)"A consumerFactory is required");
        Properties props = new Properties();
        props.setProperty("max.poll.records", "1");
        return props;
    }

    private @Nullable ConsumerRecord<K, V> receiveOne(TopicPartition topicPartition, long offset, Duration pollTimeout, Consumer<K, V> consumer) {
        consumer.assign(Collections.singletonList(topicPartition));
        consumer.seek(topicPartition, offset);
        ConsumerRecords records = consumer.poll(pollTimeout);
        if (records.count() == 1) {
            return (ConsumerRecord)records.iterator().next();
        }
        return null;
    }

    private Producer<K, V> producerForOffsets() {
        Producer<K, V> producer = this.producers.get(Thread.currentThread());
        if (producer == null) {
            KafkaResourceHolder resourceHolder = (KafkaResourceHolder)((Object)TransactionSynchronizationManager.getResource(this.producerFactory));
            Assert.isTrue((resourceHolder != null ? 1 : 0) != 0, (String)"No transaction in process");
            producer = resourceHolder.getProducer();
        }
        return producer;
    }

    protected void closeProducer(Producer<K, V> producer, boolean inTx) {
        if (!inTx) {
            producer.close(this.closeTimeout);
        }
    }

    private CompletableFuture<SendResult<K, V>> observeSend(ProducerRecord<K, V> producerRecord) {
        CompletableFuture<SendResult<K, V>> completableFuture;
        block9: {
            Observation observation = KafkaTemplateObservation.TEMPLATE_OBSERVATION.observation(this.observationConvention, KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention.INSTANCE, () -> new KafkaRecordSenderContext(producerRecord, this.beanName, this::clusterId), this.observationRegistry);
            observation.start();
            Observation.Scope ignored = observation.openScope();
            try {
                completableFuture = this.doSend(producerRecord, observation);
                if (ignored == null) break block9;
            }
            catch (Throwable throwable) {
                try {
                    if (ignored != null) {
                        try {
                            ignored.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (RuntimeException ex) {
                    if (observation.getContext().getError() == null) {
                        observation.error((Throwable)ex);
                        observation.stop();
                    }
                    throw ex;
                }
            }
            ignored.close();
        }
        return completableFuture;
    }

    protected CompletableFuture<SendResult<K, V>> doSend(ProducerRecord<K, V> producerRecord, Observation observation) {
        ProducerRecord interceptedRecord;
        Future sendFuture;
        Producer<K, V> producer = this.getTheProducer(producerRecord.topic());
        this.logger.trace(() -> "Sending: " + KafkaUtils.format(producerRecord));
        CompletableFuture<SendResult<K, V>> future = new CompletableFuture<SendResult<K, V>>();
        Object sample = null;
        if (this.micrometerHolder != null) {
            sample = this.micrometerHolder.start();
        }
        if ((sendFuture = producer.send(interceptedRecord = this.interceptorProducerRecord(producerRecord), this.buildCallback(interceptedRecord, producer, future, sample, observation))).isDone()) {
            try {
                sendFuture.get();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new KafkaException("Interrupted", e);
            }
            catch (ExecutionException e) {
                throw new KafkaException("Send failed", e.getCause());
            }
        }
        if (this.autoFlush) {
            this.flush();
        }
        this.logger.trace(() -> "Sent: " + KafkaUtils.format(interceptedRecord));
        return future;
    }

    private ProducerRecord<K, V> interceptorProducerRecord(ProducerRecord<K, V> producerRecord) {
        if (this.producerInterceptor != null) {
            return this.producerInterceptor.onSend(producerRecord);
        }
        return producerRecord;
    }

    private Callback buildCallback(ProducerRecord<K, V> producerRecord, Producer<K, V> producer, CompletableFuture<SendResult<K, V>> future, @Nullable Object sample, Observation observation) {
        return (metadata, exception) -> {
            try {
                if (this.producerInterceptor != null) {
                    this.producerInterceptor.onAcknowledgement(metadata, exception);
                }
            }
            catch (Exception e) {
                this.logger.warn((Throwable)e, () -> "Error executing interceptor onAcknowledgement callback");
            }
            try (Observation.Scope ignored = observation.openScope();){
                if (exception == null) {
                    this.successTimer(sample, producerRecord);
                    future.complete(new SendResult(producerRecord, metadata));
                    if (this.producerListener != null) {
                        this.producerListener.onSuccess(producerRecord, metadata);
                    }
                    this.logger.trace(() -> "Sent ok: " + KafkaUtils.format(producerRecord) + ", metadata: " + String.valueOf(metadata));
                } else {
                    this.failureTimer(sample, exception, producerRecord);
                    observation.error((Throwable)exception);
                    future.completeExceptionally((Throwable)((Object)new KafkaProducerException(producerRecord, "Failed to send", (Throwable)exception)));
                    if (this.producerListener != null) {
                        this.producerListener.onError(producerRecord, metadata, exception);
                    }
                    this.logger.debug((Throwable)exception, () -> "Failed to send: " + KafkaUtils.format(producerRecord));
                }
            }
            finally {
                observation.stop();
                this.closeProducer(producer, this.transactional);
            }
        };
    }

    private void successTimer(@Nullable Object sample, ProducerRecord<?, ?> record) {
        if (sample != null) {
            if (this.micrometerTagsProvider == null && this.micrometerHolder != null) {
                this.micrometerHolder.success(sample);
            } else if (this.micrometerHolder != null) {
                this.micrometerHolder.success(sample, record);
            }
        }
    }

    private void failureTimer(@Nullable Object sample, Exception exception, ProducerRecord<?, ?> record) {
        if (sample != null) {
            if (this.micrometerTagsProvider == null && this.micrometerHolder != null) {
                this.micrometerHolder.failure(sample, exception.getClass().getSimpleName());
            } else if (this.micrometerHolder != null) {
                this.micrometerHolder.failure(sample, exception.getClass().getSimpleName(), record);
            }
        }
    }

    @Override
    public boolean inTransaction() {
        return this.transactional && (this.producers.get(Thread.currentThread()) != null || TransactionSynchronizationManager.getResource(this.producerFactory) != null || TransactionSynchronizationManager.isActualTransactionActive());
    }

    private Producer<K, V> getTheProducer() {
        return this.getTheProducer(null);
    }

    protected Producer<K, V> getTheProducer(@Nullable String topic) {
        boolean transactionalProducer = this.transactional;
        if (transactionalProducer) {
            boolean inTransaction = this.inTransaction();
            Assert.state((this.allowNonTransactional || inTransaction ? 1 : 0) != 0, (String)"No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record");
            if (!inTransaction) {
                transactionalProducer = false;
            }
        }
        if (transactionalProducer) {
            Producer<K, V> producer = this.producers.get(Thread.currentThread());
            if (producer != null) {
                return producer;
            }
            KafkaResourceHolder<K, V> holder = ProducerFactoryUtils.getTransactionalResourceHolder(this.producerFactory, this.transactionIdPrefix, this.closeTimeout);
            return holder.getProducer();
        }
        if (this.allowNonTransactional) {
            return this.producerFactory.createNonTransactionalProducer();
        }
        if (topic == null) {
            return this.producerFactory.createProducer();
        }
        return this.getProducerFactory(topic).createProducer();
    }

    private @Nullable MicrometerHolder obtainMicrometerHolder() {
        MicrometerHolder holder = null;
        try {
            if (KafkaUtils.MICROMETER_PRESENT) {
                Function<@Nullable Object, Map<String, String>> mergedProvider = cr -> this.micrometerTags;
                if (this.micrometerTagsProvider != null) {
                    mergedProvider = cr -> {
                        HashMap<String, String> tags = new HashMap<String, String>(this.micrometerTags);
                        if (cr != null && this.micrometerTagsProvider != null) {
                            tags.putAll(this.micrometerTagsProvider.apply((ProducerRecord)cr));
                        }
                        return tags;
                    };
                }
                holder = new MicrometerHolder(this.applicationContext, this.beanName, "spring.kafka.template", "KafkaTemplate Timer", mergedProvider);
            }
        }
        catch (IllegalStateException ex) {
            this.micrometerEnabled = false;
        }
        return holder;
    }

    public void destroy() {
        if (this.micrometerHolder != null) {
            this.micrometerHolder.destroy();
        }
        if (this.customProducerFactory) {
            ((DefaultKafkaProducerFactory)this.producerFactory).destroy();
        }
        if (this.producerInterceptor != null) {
            this.producerInterceptor.close();
        }
    }

    private static String removeLeadingAndTrailingBrackets(String str) {
        return StringUtils.trimTrailingCharacter((String)StringUtils.trimLeadingCharacter((String)str, (char)'['), (char)']');
    }

    private static final class SkipAbortException
    extends RuntimeException {
        SkipAbortException(Throwable cause) {
            super(cause);
        }
    }
}

