package org.springframework.kafka.listener;

import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaResourceHolder;
import org.springframework.kafka.core.ProducerFactoryUtils;
import org.springframework.kafka.event.ConsumerPausedEvent;
import org.springframework.kafka.event.ConsumerResumedEvent;
import org.springframework.kafka.event.ListenerContainerIdleEvent;
import org.springframework.kafka.event.NonResponsiveConsumerEvent;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.LogIfLevelEnabled;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/* loaded from: input_file:org/springframework/kafka/listener/KafkaMessageListenerContainer.class */
public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
    private final AbstractMessageListenerContainer<K, V> container;
    private final TopicPartitionInitialOffset[] topicPartitions;
    private volatile KafkaMessageListenerContainer<K, V>.ListenerConsumer listenerConsumer;
    private volatile ListenableFuture<?> listenerConsumerFuture;
    private GenericMessageListener<?> listener;
    private String clientIdSuffix;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/kafka/listener/KafkaMessageListenerContainer$ListenerConsumer.class */
    public final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekAware.ConsumerSeekCallback {
        private final Log logger = LogFactory.getLog(ListenerConsumer.class);
        private final ContainerProperties containerProperties;
        private final OffsetCommitCallback commitCallback;
        private final Consumer<K, V> consumer;
        private final Map<String, Map<Integer, Long>> offsets;
        private final GenericMessageListener<?> genericListener;
        private final MessageListener<K, V> listener;
        private final BatchMessageListener<K, V> batchListener;
        private final ListenerType listenerType;
        private final boolean isConsumerAwareListener;
        private final boolean isBatchListener;
        private final boolean autoCommit;
        private final boolean isManualAck;
        private final boolean isManualImmediateAck;
        private final boolean isAnyManualAck;
        private final boolean isRecordAck;
        private final boolean isBatchAck;
        private final BlockingQueue<ConsumerRecord<K, V>> acks;
        private final BlockingQueue<TopicPartitionInitialOffset> seeks;
        private final ErrorHandler errorHandler;
        private final BatchErrorHandler batchErrorHandler;
        private final PlatformTransactionManager transactionManager;
        private final KafkaAwareTransactionManager kafkaTxManager;
        private final TransactionTemplate transactionTemplate;
        private final String consumerGroupId;
        private final TaskScheduler taskScheduler;
        private final ScheduledFuture<?> monitorTask;
        private final LogIfLevelEnabled commitLogger;
        private volatile Map<TopicPartition, OffsetMetadata> definedPartitions;
        private volatile Collection<TopicPartition> assignedPartitions;
        private volatile Thread consumerThread;
        private int count;
        private long last;
        private boolean fatalError;
        private boolean taskSchedulerExplicitlySet;
        private boolean consumerPaused;
        private volatile long lastPoll;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/springframework/kafka/listener/KafkaMessageListenerContainer$ListenerConsumer$ConsumerAcknowledgment.class */
        public final class ConsumerAcknowledgment implements Acknowledgment {
            private final ConsumerRecord<K, V> record;

            ConsumerAcknowledgment(ConsumerRecord<K, V> consumerRecord) {
                this.record = consumerRecord;
            }

            @Override // org.springframework.kafka.support.Acknowledgment
            public void acknowledge() {
                Assert.state(ListenerConsumer.this.isAnyManualAck, "A manual ackmode is required for an acknowledging listener");
                ListenerConsumer.this.processAck(this.record);
            }

            public String toString() {
                return "Acknowledgment for " + this.record;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/springframework/kafka/listener/KafkaMessageListenerContainer$ListenerConsumer$ConsumerBatchAcknowledgment.class */
        public final class ConsumerBatchAcknowledgment implements Acknowledgment {
            private final List<ConsumerRecord<K, V>> records;

            ConsumerBatchAcknowledgment(List<ConsumerRecord<K, V>> list) {
                this.records = new LinkedList(list);
            }

            @Override // org.springframework.kafka.support.Acknowledgment
            public void acknowledge() {
                Assert.state(ListenerConsumer.this.isAnyManualAck, "A manual ackmode is required for an acknowledging listener");
                Iterator it = ListenerConsumer.this.getHighestOffsetRecords(this.records).iterator();
                while (it.hasNext()) {
                    ListenerConsumer.this.processAck((ConsumerRecord) it.next());
                }
            }

            public String toString() {
                return "Acknowledgment for " + this.records;
            }
        }

        ListenerConsumer(GenericMessageListener<?> genericMessageListener, ListenerType listenerType) {
            this.containerProperties = KafkaMessageListenerContainer.this.getContainerProperties();
            this.commitCallback = this.containerProperties.getCommitCallback() != null ? this.containerProperties.getCommitCallback() : new LoggingCommitCallback();
            this.offsets = new HashMap();
            this.autoCommit = KafkaMessageListenerContainer.this.consumerFactory.isAutoCommit();
            this.isManualAck = this.containerProperties.getAckMode().equals(AbstractMessageListenerContainer.AckMode.MANUAL);
            this.isManualImmediateAck = this.containerProperties.getAckMode().equals(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
            this.isAnyManualAck = this.isManualAck || this.isManualImmediateAck;
            this.isRecordAck = this.containerProperties.getAckMode().equals(AbstractMessageListenerContainer.AckMode.RECORD);
            this.isBatchAck = this.containerProperties.getAckMode().equals(AbstractMessageListenerContainer.AckMode.BATCH);
            this.acks = new LinkedBlockingQueue();
            this.seeks = new LinkedBlockingQueue();
            this.transactionManager = this.containerProperties.getTransactionManager();
            this.kafkaTxManager = this.transactionManager instanceof KafkaAwareTransactionManager ? (KafkaAwareTransactionManager) this.transactionManager : null;
            this.consumerGroupId = this.containerProperties.getGroupId() == null ? (String) KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties().get("group.id") : this.containerProperties.getGroupId();
            this.commitLogger = new LogIfLevelEnabled(this.logger, this.containerProperties.getCommitLogLevel());
            this.last = System.currentTimeMillis();
            this.lastPoll = System.currentTimeMillis();
            Assert.state((this.isAnyManualAck && this.autoCommit) ? false : true, "Consumer cannot be configured for auto commit for ackMode " + this.containerProperties.getAckMode());
            Consumer<K, V> createConsumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer(this.consumerGroupId, this.containerProperties.getClientId(), KafkaMessageListenerContainer.this.clientIdSuffix);
            this.consumer = createConsumer;
            ConsumerRebalanceListener createRebalanceListener = createRebalanceListener(createConsumer);
            if (KafkaMessageListenerContainer.this.topicPartitions != null) {
                List<TopicPartitionInitialOffset> asList = Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
                this.definedPartitions = new HashMap(asList.size());
                for (TopicPartitionInitialOffset topicPartitionInitialOffset : asList) {
                    this.definedPartitions.put(topicPartitionInitialOffset.topicPartition(), new OffsetMetadata(topicPartitionInitialOffset.initialOffset(), topicPartitionInitialOffset.isRelativeToCurrent(), topicPartitionInitialOffset.getPosition()));
                }
                createConsumer.assign(new ArrayList(this.definedPartitions.keySet()));
            } else if (this.containerProperties.getTopicPattern() != null) {
                createConsumer.subscribe(this.containerProperties.getTopicPattern(), createRebalanceListener);
            } else {
                createConsumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), createRebalanceListener);
            }
            GenericErrorHandler<?> genericErrorHandler = this.containerProperties.getGenericErrorHandler();
            this.genericListener = genericMessageListener;
            if (genericMessageListener instanceof BatchMessageListener) {
                this.listener = null;
                this.batchListener = (BatchMessageListener) genericMessageListener;
                this.isBatchListener = true;
            } else {
                if (!(genericMessageListener instanceof MessageListener)) {
                    throw new IllegalArgumentException("Listener must be one of 'MessageListener', 'BatchMessageListener', or the variants that are consumer aware and/or Acknowledging not " + genericMessageListener.getClass().getName());
                }
                this.listener = (MessageListener) genericMessageListener;
                this.batchListener = null;
                this.isBatchListener = false;
            }
            this.listenerType = listenerType;
            this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE) || listenerType.equals(ListenerType.CONSUMER_AWARE);
            if (this.isBatchListener) {
                validateErrorHandler(true);
                this.errorHandler = new LoggingErrorHandler();
                this.batchErrorHandler = determineBatchErrorHandler(genericErrorHandler);
            } else {
                validateErrorHandler(false);
                this.errorHandler = determineErrorHandler(genericErrorHandler);
                this.batchErrorHandler = new BatchLoggingErrorHandler();
            }
            Assert.state((this.isBatchListener && this.isRecordAck) ? false : true, "Cannot use AckMode.RECORD with a batch listener");
            if (this.transactionManager != null) {
                this.transactionTemplate = new TransactionTemplate(this.transactionManager);
            } else {
                this.transactionTemplate = null;
            }
            if (this.containerProperties.getScheduler() != null) {
                this.taskScheduler = this.containerProperties.getScheduler();
                this.taskSchedulerExplicitlySet = true;
            } else {
                ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
                threadPoolTaskScheduler.initialize();
                this.taskScheduler = threadPoolTaskScheduler;
            }
            this.monitorTask = this.taskScheduler.scheduleAtFixedRate(() -> {
                checkConsumer();
            }, this.containerProperties.getMonitorInterval() * 1000);
            if (this.containerProperties.isLogContainerConfig()) {
                this.logger.info(this);
            }
        }

        protected void checkConsumer() {
            long currentTimeMillis = System.currentTimeMillis() - this.lastPoll;
            if (((float) currentTimeMillis) / ((float) this.containerProperties.getPollTimeout()) > this.containerProperties.getNoPollThreshold()) {
                KafkaMessageListenerContainer.this.publishNonResponsiveConsumerEvent(currentTimeMillis, this.consumer);
            }
        }

        protected BatchErrorHandler determineBatchErrorHandler(GenericErrorHandler<?> genericErrorHandler) {
            if (genericErrorHandler != null) {
                return (BatchErrorHandler) genericErrorHandler;
            }
            if (this.transactionManager != null) {
                return null;
            }
            return new BatchLoggingErrorHandler();
        }

        protected ErrorHandler determineErrorHandler(GenericErrorHandler<?> genericErrorHandler) {
            if (genericErrorHandler != null) {
                return (ErrorHandler) genericErrorHandler;
            }
            if (this.transactionManager != null) {
                return null;
            }
            return new LoggingErrorHandler();
        }

        public ConsumerRebalanceListener createRebalanceListener(final Consumer<K, V> consumer) {
            return new ConsumerRebalanceListener() { // from class: org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer.1
                final ConsumerRebalanceListener userListener;
                final ConsumerAwareRebalanceListener consumerAwareListener;

                {
                    this.userListener = KafkaMessageListenerContainer.this.getContainerProperties().getConsumerRebalanceListener();
                    this.consumerAwareListener = this.userListener instanceof ConsumerAwareRebalanceListener ? (ConsumerAwareRebalanceListener) this.userListener : null;
                }

                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    if (this.consumerAwareListener != null) {
                        this.consumerAwareListener.onPartitionsRevokedBeforeCommit(consumer, collection);
                    } else {
                        this.userListener.onPartitionsRevoked(collection);
                    }
                    ListenerConsumer.this.commitPendingAcks();
                    if (this.consumerAwareListener != null) {
                        this.consumerAwareListener.onPartitionsRevokedAfterCommit(consumer, collection);
                    }
                }

                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    ListenerConsumer.this.assignedPartitions = collection;
                    if (!ListenerConsumer.this.autoCommit) {
                        final HashMap hashMap = new HashMap();
                        for (TopicPartition topicPartition : collection) {
                            try {
                                hashMap.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition)));
                            } catch (NoOffsetForPartitionException e) {
                                ListenerConsumer.this.fatalError = true;
                                ListenerConsumer.this.logger.error("No offset and no reset policy", e);
                                return;
                            }
                        }
                        ListenerConsumer.this.commitLogger.log(() -> {
                            return "Committing on assignment: " + hashMap;
                        });
                        if (ListenerConsumer.this.transactionTemplate != null && ListenerConsumer.this.kafkaTxManager != null) {
                            ListenerConsumer.this.transactionTemplate.execute(new TransactionCallbackWithoutResult() { // from class: org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer.1.1
                                protected void doInTransactionWithoutResult(TransactionStatus transactionStatus) {
                                    ((KafkaResourceHolder) TransactionSynchronizationManager.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory())).getProducer().sendOffsetsToTransaction(hashMap, ListenerConsumer.this.consumerGroupId);
                                }
                            });
                        } else if (KafkaMessageListenerContainer.this.getContainerProperties().isSyncCommits()) {
                            ListenerConsumer.this.consumer.commitSync(hashMap);
                        } else {
                            ListenerConsumer.this.consumer.commitAsync(hashMap, KafkaMessageListenerContainer.this.getContainerProperties().getCommitCallback());
                        }
                    }
                    if (ListenerConsumer.this.genericListener instanceof ConsumerSeekAware) {
                        ListenerConsumer.this.seekPartitions(collection, false);
                    }
                    if (this.consumerAwareListener != null) {
                        this.consumerAwareListener.onPartitionsAssigned(consumer, collection);
                    } else {
                        this.userListener.onPartitionsAssigned(collection);
                    }
                }
            };
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void seekPartitions(Collection<TopicPartition> collection, boolean z) {
            Map<TopicPartition, Long> hashMap = new HashMap<>();
            for (TopicPartition topicPartition : collection) {
                hashMap.put(topicPartition, Long.valueOf(this.consumer.position(topicPartition)));
            }
            ConsumerSeekAware.ConsumerSeekCallback consumerSeekCallback = new ConsumerSeekAware.ConsumerSeekCallback() { // from class: org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer.2
                @Override // org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback
                public void seek(String str, int i, long j) {
                    ListenerConsumer.this.consumer.seek(new TopicPartition(str, i), j);
                }

                @Override // org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback
                public void seekToBeginning(String str, int i) {
                    ListenerConsumer.this.consumer.seekToBeginning(Collections.singletonList(new TopicPartition(str, i)));
                }

                @Override // org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback
                public void seekToEnd(String str, int i) {
                    ListenerConsumer.this.consumer.seekToEnd(Collections.singletonList(new TopicPartition(str, i)));
                }
            };
            if (z) {
                ((ConsumerSeekAware) this.genericListener).onIdleContainer(hashMap, consumerSeekCallback);
            } else {
                ((ConsumerSeekAware) this.genericListener).onPartitionsAssigned(hashMap, consumerSeekCallback);
            }
        }

        private void validateErrorHandler(boolean z) {
            GenericErrorHandler<?> genericErrorHandler = this.containerProperties.getGenericErrorHandler();
            if (this.errorHandler == null) {
                return;
            }
            Type[] genericInterfaces = genericErrorHandler.getClass().getGenericInterfaces();
            boolean z2 = false;
            int length = genericInterfaces.length;
            int i = 0;
            while (true) {
                if (i >= length) {
                    break;
                }
                Type type = genericInterfaces[i];
                if (type.equals(ErrorHandler.class)) {
                    z2 = !z;
                } else {
                    if (type.equals(BatchErrorHandler.class)) {
                        z2 = z;
                        break;
                    }
                    i++;
                }
            }
            Assert.state(z2, "Error handler is not compatible with the message listener, expecting an instance of " + (z ? "BatchErrorHandler" : "ErrorHandler") + " not " + genericErrorHandler.getClass().getName());
        }

        public boolean isLongLived() {
            return true;
        }

        public void run() {
            this.consumerThread = Thread.currentThread();
            if (this.genericListener instanceof ConsumerSeekAware) {
                ((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
            }
            if (this.transactionManager != null) {
                ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);
            }
            this.count = 0;
            this.last = System.currentTimeMillis();
            if (KafkaMessageListenerContainer.this.isRunning() && this.definedPartitions != null) {
                try {
                    initPartitionsIfNeeded();
                } catch (Exception e) {
                    this.logger.error("Failed to set initial offsets", e);
                }
            }
            long currentTimeMillis = System.currentTimeMillis();
            long j = currentTimeMillis;
            while (KafkaMessageListenerContainer.this.isRunning()) {
                try {
                    if (!this.autoCommit && !this.isRecordAck) {
                        processCommits();
                    }
                    processSeeks();
                    if (!this.consumerPaused && KafkaMessageListenerContainer.this.isPaused()) {
                        this.consumer.pause(this.consumer.assignment());
                        this.consumerPaused = true;
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Paused consumption from: " + this.consumer.paused());
                        }
                        KafkaMessageListenerContainer.this.publishConsumerPausedEvent(this.consumer.assignment());
                    }
                    ConsumerRecords<K, V> poll = this.consumer.poll(this.containerProperties.getPollTimeout());
                    this.lastPoll = System.currentTimeMillis();
                    if (this.consumerPaused && !KafkaMessageListenerContainer.this.isPaused()) {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Resuming consumption from: " + this.consumer.paused());
                        }
                        Set paused = this.consumer.paused();
                        this.consumer.resume(paused);
                        this.consumerPaused = false;
                        KafkaMessageListenerContainer.this.publishConsumerResumedEvent(paused);
                    }
                    if (poll != null && this.logger.isDebugEnabled()) {
                        this.logger.debug("Received: " + poll.count() + " records");
                        if (poll.count() > 0 && this.logger.isTraceEnabled()) {
                            this.logger.trace(poll.partitions().stream().flatMap(topicPartition -> {
                                return poll.records(topicPartition).stream();
                            }).map(consumerRecord -> {
                                return consumerRecord.topic() + "-" + consumerRecord.partition() + "@" + consumerRecord.offset();
                            }).collect(Collectors.toList()));
                        }
                    }
                    if (poll != null && poll.count() > 0) {
                        if (this.containerProperties.getIdleEventInterval() != null) {
                            currentTimeMillis = System.currentTimeMillis();
                        }
                        invokeListener(poll);
                    } else if (this.containerProperties.getIdleEventInterval() != null) {
                        long currentTimeMillis2 = System.currentTimeMillis();
                        if (currentTimeMillis2 > currentTimeMillis + this.containerProperties.getIdleEventInterval().longValue() && currentTimeMillis2 > j + this.containerProperties.getIdleEventInterval().longValue()) {
                            KafkaMessageListenerContainer.this.publishIdleContainerEvent(currentTimeMillis2 - currentTimeMillis, this.isConsumerAwareListener ? this.consumer : null, this.consumerPaused);
                            j = currentTimeMillis2;
                            if (this.genericListener instanceof ConsumerSeekAware) {
                                seekPartitions(KafkaMessageListenerContainer.this.getAssignedPartitions(), true);
                            }
                        }
                    }
                } catch (Exception e2) {
                    handleConsumerException(e2);
                } catch (NoOffsetForPartitionException e3) {
                    this.fatalError = true;
                    this.logger.error("No offset and no reset policy", e3);
                } catch (WakeupException e4) {
                }
            }
            ProducerFactoryUtils.clearConsumerGroupId();
            if (this.fatalError) {
                this.logger.error("No offset and no reset policy; stopping container");
                KafkaMessageListenerContainer.this.stop();
            } else if (this.kafkaTxManager == null) {
                commitPendingAcks();
                try {
                    this.consumer.unsubscribe();
                } catch (WakeupException e5) {
                }
            }
            this.monitorTask.cancel(true);
            if (!this.taskSchedulerExplicitlySet) {
                this.taskScheduler.destroy();
            }
            this.consumer.close();
            this.logger.info("Consumer stopped");
        }

        protected void handleConsumerException(Exception exc) {
            try {
                if (!this.isBatchListener && this.errorHandler != null) {
                    this.errorHandler.handle(exc, Collections.emptyList(), this.consumer, KafkaMessageListenerContainer.this);
                } else if (!this.isBatchListener || this.batchErrorHandler == null) {
                    this.logger.error("Consumer exception", exc);
                } else {
                    this.batchErrorHandler.handle(exc, new ConsumerRecords<>(Collections.emptyMap()), this.consumer, KafkaMessageListenerContainer.this);
                }
            } catch (Exception e) {
                this.logger.error("Consumer exception", e);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void commitPendingAcks() {
            processCommits();
            if (this.offsets.size() > 0) {
                commitIfNecessary();
            }
        }

        private void handleAcks() {
            ConsumerRecord<K, V> poll = this.acks.poll();
            while (true) {
                ConsumerRecord<K, V> consumerRecord = poll;
                if (consumerRecord == null) {
                    return;
                }
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Ack: " + consumerRecord);
                }
                processAck(consumerRecord);
                poll = this.acks.poll();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void processAck(ConsumerRecord<K, V> consumerRecord) {
            if (!Thread.currentThread().equals(this.consumerThread)) {
                try {
                    this.acks.put(consumerRecord);
                    return;
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new KafkaException("Interrupted while storing ack", e);
                }
            }
            if (!this.isManualImmediateAck) {
                addOffset(consumerRecord);
            } else {
                try {
                    ackImmediate(consumerRecord);
                } catch (WakeupException e2) {
                }
            }
        }

        private void ackImmediate(ConsumerRecord<K, V> consumerRecord) {
            Map singletonMap = Collections.singletonMap(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1));
            this.commitLogger.log(() -> {
                return "Committing: " + singletonMap;
            });
            if (this.containerProperties.isSyncCommits()) {
                this.consumer.commitSync(singletonMap);
            } else {
                this.consumer.commitAsync(singletonMap, this.commitCallback);
            }
        }

        private void invokeListener(ConsumerRecords<K, V> consumerRecords) {
            if (this.isBatchListener) {
                invokeBatchListener(consumerRecords);
            } else {
                invokeRecordListener(consumerRecords);
            }
        }

        private void invokeBatchListener(ConsumerRecords<K, V> consumerRecords) {
            LinkedList linkedList = new LinkedList();
            Iterator it = consumerRecords.iterator();
            while (it.hasNext()) {
                linkedList.add(it.next());
            }
            if (linkedList.size() > 0) {
                if (this.transactionTemplate != null) {
                    invokeBatchListenerInTx(consumerRecords, linkedList);
                } else {
                    doInvokeBatchListener(consumerRecords, linkedList, null);
                }
            }
        }

        private void invokeBatchListenerInTx(final ConsumerRecords<K, V> consumerRecords, final List<ConsumerRecord<K, V>> list) {
            try {
                this.transactionTemplate.execute(new TransactionCallbackWithoutResult() { // from class: org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer.3
                    public void doInTransactionWithoutResult(TransactionStatus transactionStatus) {
                        Producer<K, V> producer = null;
                        if (ListenerConsumer.this.kafkaTxManager != null) {
                            producer = ((KafkaResourceHolder) TransactionSynchronizationManager.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory())).getProducer();
                        }
                        RuntimeException doInvokeBatchListener = ListenerConsumer.this.doInvokeBatchListener(consumerRecords, list, producer);
                        if (doInvokeBatchListener != null) {
                            throw doInvokeBatchListener;
                        }
                    }
                });
            } catch (RuntimeException e) {
                this.logger.error("Transaction rolled back", e);
                KafkaMessageListenerContainer.this.getAfterRollbackProcessor().process(list, this.consumer);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public RuntimeException doInvokeBatchListener(ConsumerRecords<K, V> consumerRecords, List<ConsumerRecord<K, V>> list, Producer producer) throws Error {
            try {
                switch (this.listenerType) {
                    case ACKNOWLEDGING_CONSUMER_AWARE:
                        this.batchListener.onMessage(list, this.isAnyManualAck ? new ConsumerBatchAcknowledgment(list) : null, this.consumer);
                        break;
                    case ACKNOWLEDGING:
                        this.batchListener.onMessage((BatchMessageListener<K, V>) list, (Acknowledgment) (this.isAnyManualAck ? new ConsumerBatchAcknowledgment(list) : null));
                        break;
                    case CONSUMER_AWARE:
                        this.batchListener.onMessage((BatchMessageListener<K, V>) list, (Consumer<?, ?>) this.consumer);
                        break;
                    case SIMPLE:
                        this.batchListener.onMessage(list);
                        break;
                }
                if (!this.isAnyManualAck && !this.autoCommit) {
                    Iterator<ConsumerRecord<K, V>> it = getHighestOffsetRecords(list).iterator();
                    while (it.hasNext()) {
                        this.acks.put(it.next());
                    }
                    if (producer != null) {
                        sendOffsetsToTransaction(producer);
                    }
                }
                return null;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return null;
            } catch (RuntimeException e2) {
                if (this.containerProperties.isAckOnError() && !this.autoCommit && producer == null) {
                    Iterator<ConsumerRecord<K, V>> it2 = getHighestOffsetRecords(list).iterator();
                    while (it2.hasNext()) {
                        this.acks.add(it2.next());
                    }
                }
                if (this.batchErrorHandler == null) {
                    throw e2;
                }
                try {
                    if (this.batchErrorHandler instanceof ContainerAwareBatchErrorHandler) {
                        ((ContainerAwareBatchErrorHandler) this.batchErrorHandler).handle(e2, consumerRecords, this.consumer, KafkaMessageListenerContainer.this.container);
                    } else {
                        this.batchErrorHandler.handle(e2, consumerRecords, this.consumer);
                    }
                    if (producer != null) {
                        Iterator<ConsumerRecord<K, V>> it3 = getHighestOffsetRecords(list).iterator();
                        while (it3.hasNext()) {
                            this.acks.add(it3.next());
                        }
                        sendOffsetsToTransaction(producer);
                    }
                    return null;
                } catch (Error e3) {
                    this.logger.error("Error handler threw an error", e3);
                    throw e3;
                } catch (RuntimeException e4) {
                    this.logger.error("Error handler threw an exception", e4);
                    return e4;
                }
            }
        }

        private void invokeRecordListener(ConsumerRecords<K, V> consumerRecords) {
            if (this.transactionTemplate != null) {
                invokeRecordListenerInTx(consumerRecords);
            } else {
                doInvokeWithRecords(consumerRecords);
            }
        }

        private void invokeRecordListenerInTx(ConsumerRecords<K, V> consumerRecords) {
            final Iterator it = consumerRecords.iterator();
            while (it.hasNext()) {
                final ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Processing " + consumerRecord);
                }
                try {
                    this.transactionTemplate.execute(new TransactionCallbackWithoutResult() { // from class: org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer.4
                        public void doInTransactionWithoutResult(TransactionStatus transactionStatus) {
                            Producer<K, V> producer = null;
                            if (ListenerConsumer.this.kafkaTxManager != null) {
                                producer = ((KafkaResourceHolder) TransactionSynchronizationManager.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory())).getProducer();
                            }
                            RuntimeException doInvokeRecordListener = ListenerConsumer.this.doInvokeRecordListener(consumerRecord, producer, it);
                            if (doInvokeRecordListener != null) {
                                throw doInvokeRecordListener;
                            }
                        }
                    });
                } catch (RuntimeException e) {
                    this.logger.error("Transaction rolled back", e);
                    ArrayList arrayList = new ArrayList();
                    arrayList.add(consumerRecord);
                    while (it.hasNext()) {
                        arrayList.add(it.next());
                    }
                    KafkaMessageListenerContainer.this.getAfterRollbackProcessor().process(arrayList, this.consumer);
                }
            }
        }

        private void doInvokeWithRecords(ConsumerRecords<K, V> consumerRecords) throws Error {
            Iterator<ConsumerRecord<K, V>> it = consumerRecords.iterator();
            while (it.hasNext()) {
                ConsumerRecord<K, V> next = it.next();
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Processing " + next);
                }
                doInvokeRecordListener(next, null, it);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public RuntimeException doInvokeRecordListener(ConsumerRecord<K, V> consumerRecord, Producer producer, Iterator<ConsumerRecord<K, V>> it) throws Error {
            try {
                switch (this.listenerType) {
                    case ACKNOWLEDGING_CONSUMER_AWARE:
                        this.listener.onMessage(consumerRecord, this.isAnyManualAck ? new ConsumerAcknowledgment(consumerRecord) : null, this.consumer);
                        break;
                    case ACKNOWLEDGING:
                        this.listener.onMessage((MessageListener<K, V>) consumerRecord, (Acknowledgment) (this.isAnyManualAck ? new ConsumerAcknowledgment(consumerRecord) : null));
                        break;
                    case CONSUMER_AWARE:
                        this.listener.onMessage((MessageListener<K, V>) consumerRecord, (Consumer<?, ?>) this.consumer);
                        break;
                    case SIMPLE:
                        this.listener.onMessage(consumerRecord);
                        break;
                }
                ackCurrent(consumerRecord, producer);
                return null;
            } catch (RuntimeException e) {
                if (this.containerProperties.isAckOnError() && !this.autoCommit && producer == null) {
                    ackCurrent(consumerRecord, producer);
                }
                if (this.errorHandler == null) {
                    throw e;
                }
                try {
                    if (this.errorHandler instanceof ContainerAwareErrorHandler) {
                        if (producer == null) {
                            processCommits();
                        }
                        ArrayList arrayList = new ArrayList();
                        arrayList.add(consumerRecord);
                        while (it.hasNext()) {
                            arrayList.add(it.next());
                        }
                        ((ContainerAwareErrorHandler) this.errorHandler).handle(e, arrayList, this.consumer, KafkaMessageListenerContainer.this.container);
                    } else {
                        this.errorHandler.handle(e, consumerRecord, this.consumer);
                    }
                    if (producer != null) {
                        ackCurrent(consumerRecord, producer);
                    }
                    return null;
                } catch (Error e2) {
                    this.logger.error("Error handler threw an error", e2);
                    throw e2;
                } catch (RuntimeException e3) {
                    this.logger.error("Error handler threw an exception", e3);
                    return e3;
                }
            }
        }

        public void ackCurrent(ConsumerRecord<K, V> consumerRecord, Producer producer) {
            if (this.isRecordAck) {
                Map singletonMap = Collections.singletonMap(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1));
                if (producer == null) {
                    this.commitLogger.log(() -> {
                        return "Committing: " + singletonMap;
                    });
                    if (this.containerProperties.isSyncCommits()) {
                        this.consumer.commitSync(singletonMap);
                    } else {
                        this.consumer.commitAsync(singletonMap, this.commitCallback);
                    }
                } else {
                    this.acks.add(consumerRecord);
                }
            } else if (!this.isAnyManualAck && !this.autoCommit) {
                this.acks.add(consumerRecord);
            }
            if (producer != null) {
                try {
                    sendOffsetsToTransaction(producer);
                } catch (Exception e) {
                    this.logger.error("Send offsets to transaction failed", e);
                }
            }
        }

        private void sendOffsetsToTransaction(Producer producer) {
            handleAcks();
            Map<TopicPartition, OffsetAndMetadata> buildCommits = buildCommits();
            this.commitLogger.log(() -> {
                return "Sending offsets to transaction: " + buildCommits;
            });
            producer.sendOffsetsToTransaction(buildCommits, this.consumerGroupId);
        }

        private void processCommits() {
            this.count += this.acks.size();
            handleAcks();
            AbstractMessageListenerContainer.AckMode ackMode = this.containerProperties.getAckMode();
            if (this.isManualImmediateAck) {
                return;
            }
            if (!this.isManualAck) {
                updatePendingOffsets();
            }
            boolean z = this.count >= this.containerProperties.getAckCount();
            if (this.isManualAck || this.isBatchAck || this.isRecordAck || (ackMode.equals(AbstractMessageListenerContainer.AckMode.COUNT) && z)) {
                if (this.logger.isDebugEnabled() && ackMode.equals(AbstractMessageListenerContainer.AckMode.COUNT)) {
                    this.logger.debug("Committing in AckMode.COUNT because count " + this.count + " exceeds configured limit of " + this.containerProperties.getAckCount());
                }
                commitIfNecessary();
                this.count = 0;
                return;
            }
            long currentTimeMillis = System.currentTimeMillis();
            boolean z2 = currentTimeMillis - this.last > this.containerProperties.getAckTime();
            if (ackMode.equals(AbstractMessageListenerContainer.AckMode.TIME) && z2) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Committing in AckMode.TIME because time elapsed exceeds configured limit of " + this.containerProperties.getAckTime());
                }
                commitIfNecessary();
                this.last = currentTimeMillis;
                return;
            }
            if (ackMode.equals(AbstractMessageListenerContainer.AckMode.COUNT_TIME)) {
                if (z2 || z) {
                    if (this.logger.isDebugEnabled()) {
                        if (z2) {
                            this.logger.debug("Committing in AckMode.COUNT_TIME because time elapsed exceeds configured limit of " + this.containerProperties.getAckTime());
                        } else {
                            this.logger.debug("Committing in AckMode.COUNT_TIME because count " + this.count + " exceeds configured limit of" + this.containerProperties.getAckCount());
                        }
                    }
                    commitIfNecessary();
                    this.last = currentTimeMillis;
                    this.count = 0;
                }
            }
        }

        private void processSeeks() {
            TopicPartitionInitialOffset poll = this.seeks.poll();
            while (true) {
                TopicPartitionInitialOffset topicPartitionInitialOffset = poll;
                if (topicPartitionInitialOffset == null) {
                    return;
                }
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Seek: " + topicPartitionInitialOffset);
                }
                try {
                    TopicPartitionInitialOffset.SeekPosition position = topicPartitionInitialOffset.getPosition();
                    if (position == null) {
                        this.consumer.seek(topicPartitionInitialOffset.topicPartition(), topicPartitionInitialOffset.initialOffset().longValue());
                    } else if (position.equals(TopicPartitionInitialOffset.SeekPosition.BEGINNING)) {
                        this.consumer.seekToBeginning(Collections.singletonList(topicPartitionInitialOffset.topicPartition()));
                    } else {
                        this.consumer.seekToEnd(Collections.singletonList(topicPartitionInitialOffset.topicPartition()));
                    }
                } catch (Exception e) {
                    this.logger.error("Exception while seeking " + topicPartitionInitialOffset, e);
                }
                poll = this.seeks.poll();
            }
        }

        private void initPartitionsIfNeeded() {
            HashMap hashMap = new HashMap(this.definedPartitions);
            Set set = (Set) hashMap.entrySet().stream().filter(entry -> {
                return TopicPartitionInitialOffset.SeekPosition.BEGINNING.equals(((OffsetMetadata) entry.getValue()).seekPosition);
            }).map(entry2 -> {
                return (TopicPartition) entry2.getKey();
            }).collect(Collectors.toSet());
            set.forEach(topicPartition -> {
            });
            Set set2 = (Set) hashMap.entrySet().stream().filter(entry3 -> {
                return TopicPartitionInitialOffset.SeekPosition.END.equals(((OffsetMetadata) entry3.getValue()).seekPosition);
            }).map(entry4 -> {
                return (TopicPartition) entry4.getKey();
            }).collect(Collectors.toSet());
            set2.forEach(topicPartition2 -> {
            });
            if (set.size() > 0) {
                this.consumer.seekToBeginning(set);
            }
            if (set2.size() > 0) {
                this.consumer.seekToEnd(set2);
            }
            for (Map.Entry<K, V> entry5 : hashMap.entrySet()) {
                TopicPartition topicPartition3 = (TopicPartition) entry5.getKey();
                OffsetMetadata offsetMetadata = (OffsetMetadata) entry5.getValue();
                Long l = offsetMetadata.offset;
                if (l != null) {
                    long longValue = l.longValue();
                    if (l.longValue() < 0) {
                        if (!offsetMetadata.relativeToCurrent) {
                            this.consumer.seekToEnd(Arrays.asList(topicPartition3));
                        }
                        longValue = Math.max(0L, this.consumer.position(topicPartition3) + l.longValue());
                    } else if (offsetMetadata.relativeToCurrent) {
                        longValue = this.consumer.position(topicPartition3) + l.longValue();
                    }
                    try {
                        this.consumer.seek(topicPartition3, longValue);
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Reset " + topicPartition3 + " to offset " + longValue);
                        }
                    } catch (Exception e) {
                        this.logger.error("Failed to set initial offset for " + topicPartition3 + " at " + longValue + ". Position is " + this.consumer.position(topicPartition3), e);
                    }
                }
            }
        }

        private void updatePendingOffsets() {
            ConsumerRecord<K, V> poll = this.acks.poll();
            while (true) {
                ConsumerRecord<K, V> consumerRecord = poll;
                if (consumerRecord == null) {
                    return;
                }
                addOffset(consumerRecord);
                poll = this.acks.poll();
            }
        }

        private void addOffset(ConsumerRecord<K, V> consumerRecord) {
            this.offsets.computeIfAbsent(consumerRecord.topic(), str -> {
                return new ConcurrentHashMap();
            }).compute(Integer.valueOf(consumerRecord.partition()), (num, l) -> {
                return Long.valueOf(l == null ? consumerRecord.offset() : Math.max(l.longValue(), consumerRecord.offset()));
            });
        }

        private void commitIfNecessary() {
            Map<TopicPartition, OffsetAndMetadata> buildCommits = buildCommits();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Commit list: " + buildCommits);
            }
            if (buildCommits.isEmpty()) {
                return;
            }
            this.commitLogger.log(() -> {
                return "Committing: " + buildCommits;
            });
            try {
                if (this.containerProperties.isSyncCommits()) {
                    this.consumer.commitSync(buildCommits);
                } else {
                    this.consumer.commitAsync(buildCommits, this.commitCallback);
                }
            } catch (WakeupException e) {
                this.logger.debug("Woken up during commit");
            }
        }

        private Map<TopicPartition, OffsetAndMetadata> buildCommits() {
            HashMap hashMap = new HashMap();
            for (Map.Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) {
                for (Map.Entry<Integer, Long> entry2 : entry.getValue().entrySet()) {
                    hashMap.put(new TopicPartition(entry.getKey(), entry2.getKey().intValue()), new OffsetAndMetadata(entry2.getValue().longValue() + 1));
                }
            }
            this.offsets.clear();
            return hashMap;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Collection<ConsumerRecord<K, V>> getHighestOffsetRecords(List<ConsumerRecord<K, V>> list) {
            HashMap hashMap = new HashMap();
            list.forEach(consumerRecord -> {
                hashMap.compute(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), (topicPartition, consumerRecord) -> {
                    if (consumerRecord != null && consumerRecord.offset() <= consumerRecord.offset()) {
                        return consumerRecord;
                    }
                    return consumerRecord;
                });
            });
            return hashMap.values();
        }

        @Override // org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback
        public void seek(String str, int i, long j) {
            this.seeks.add(new TopicPartitionInitialOffset(str, i, Long.valueOf(j)));
        }

        @Override // org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback
        public void seekToBeginning(String str, int i) {
            this.seeks.add(new TopicPartitionInitialOffset(str, i, TopicPartitionInitialOffset.SeekPosition.BEGINNING));
        }

        @Override // org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback
        public void seekToEnd(String str, int i) {
            this.seeks.add(new TopicPartitionInitialOffset(str, i, TopicPartitionInitialOffset.SeekPosition.END));
        }

        public String toString() {
            return "KafkaMessageListenerContainer.ListenerConsumer [containerProperties=" + this.containerProperties + ", listenerType=" + this.listenerType + ", isConsumerAwareListener=" + this.isConsumerAwareListener + ", isBatchListener=" + this.isBatchListener + ", autoCommit=" + this.autoCommit + ", consumerGroupId=" + this.consumerGroupId + ", clientIdSuffix=" + KafkaMessageListenerContainer.this.clientIdSuffix + "]";
        }
    }

    /* loaded from: input_file:org/springframework/kafka/listener/KafkaMessageListenerContainer$LoggingCommitCallback.class */
    private static final class LoggingCommitCallback implements OffsetCommitCallback {
        private static final Log logger = LogFactory.getLog(LoggingCommitCallback.class);

        LoggingCommitCallback() {
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
            if (exc != null) {
                logger.error("Commit failed for " + map, exc);
            } else if (logger.isDebugEnabled()) {
                logger.debug("Commits for " + map + " completed");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/kafka/listener/KafkaMessageListenerContainer$OffsetMetadata.class */
    public static final class OffsetMetadata {
        private final Long offset;
        private final boolean relativeToCurrent;
        private final TopicPartitionInitialOffset.SeekPosition seekPosition;

        OffsetMetadata(Long l, boolean z, TopicPartitionInitialOffset.SeekPosition seekPosition) {
            this.offset = l;
            this.relativeToCurrent = z;
            this.seekPosition = seekPosition;
        }
    }

    public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties) {
        this(null, consumerFactory, containerProperties, (TopicPartitionInitialOffset[]) null);
    }

    public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties, TopicPartitionInitialOffset... topicPartitionInitialOffsetArr) {
        this(null, consumerFactory, containerProperties, topicPartitionInitialOffsetArr);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaMessageListenerContainer(AbstractMessageListenerContainer<K, V> abstractMessageListenerContainer, ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties) {
        this(abstractMessageListenerContainer, consumerFactory, containerProperties, (TopicPartitionInitialOffset[]) null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaMessageListenerContainer(AbstractMessageListenerContainer<K, V> abstractMessageListenerContainer, ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties, TopicPartitionInitialOffset... topicPartitionInitialOffsetArr) {
        super(consumerFactory, containerProperties);
        Assert.notNull(consumerFactory, "A ConsumerFactory must be provided");
        this.container = abstractMessageListenerContainer == null ? this : abstractMessageListenerContainer;
        if (topicPartitionInitialOffsetArr != null) {
            this.topicPartitions = (TopicPartitionInitialOffset[]) Arrays.copyOf(topicPartitionInitialOffsetArr, topicPartitionInitialOffsetArr.length);
        } else {
            this.topicPartitions = containerProperties.getTopicPartitions();
        }
    }

    public void setClientIdSuffix(String str) {
        this.clientIdSuffix = str;
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public Collection<TopicPartition> getAssignedPartitions() {
        KafkaMessageListenerContainer<K, V>.ListenerConsumer listenerConsumer = this.listenerConsumer;
        if (listenerConsumer == null) {
            return null;
        }
        if (((ListenerConsumer) listenerConsumer).definedPartitions != null) {
            return Collections.unmodifiableCollection(((ListenerConsumer) listenerConsumer).definedPartitions.keySet());
        }
        if (((ListenerConsumer) listenerConsumer).assignedPartitions != null) {
            return Collections.unmodifiableCollection(((ListenerConsumer) listenerConsumer).assignedPartitions);
        }
        return null;
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public boolean isContainerPaused() {
        return isPaused() && ((ListenerConsumer) this.listenerConsumer).consumerPaused;
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public Map<String, Map<MetricName, ? extends Metric>> metrics() {
        KafkaMessageListenerContainer<K, V>.ListenerConsumer listenerConsumer = this.listenerConsumer;
        if (listenerConsumer != null) {
            Map metrics = ((ListenerConsumer) listenerConsumer).consumer.metrics();
            Iterator<K> it = metrics.keySet().iterator();
            if (it.hasNext()) {
                return Collections.singletonMap((String) ((MetricName) it.next()).tags().get("client-id"), metrics);
            }
        }
        return Collections.emptyMap();
    }

    @Override // org.springframework.kafka.listener.AbstractMessageListenerContainer
    protected void doStart() {
        Object obj;
        if (isRunning()) {
            return;
        }
        ContainerProperties containerProperties = getContainerProperties();
        if (!this.consumerFactory.isAutoCommit()) {
            AbstractMessageListenerContainer.AckMode ackMode = containerProperties.getAckMode();
            if (ackMode.equals(AbstractMessageListenerContainer.AckMode.COUNT) || ackMode.equals(AbstractMessageListenerContainer.AckMode.COUNT_TIME)) {
                Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");
            }
            if ((ackMode.equals(AbstractMessageListenerContainer.AckMode.TIME) || ackMode.equals(AbstractMessageListenerContainer.AckMode.COUNT_TIME)) && containerProperties.getAckTime() == 0) {
                containerProperties.setAckTime(5000L);
            }
        }
        Object messageListener = containerProperties.getMessageListener();
        Assert.state(messageListener != null, "A MessageListener is required");
        if (containerProperties.getConsumerTaskExecutor() == null) {
            containerProperties.setConsumerTaskExecutor(new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-"));
        }
        Assert.state(messageListener instanceof GenericMessageListener, "Listener must be a GenericListener");
        this.listener = (GenericMessageListener) messageListener;
        ListenerType determineListenerType = ListenerUtils.determineListenerType(this.listener);
        if (this.listener instanceof DelegatingMessageListener) {
            Object obj2 = this.listener;
            while (true) {
                obj = obj2;
                if (!(obj instanceof DelegatingMessageListener)) {
                    break;
                } else {
                    obj2 = ((DelegatingMessageListener) obj).getDelegate();
                }
            }
            determineListenerType = ListenerUtils.determineListenerType(obj);
        }
        this.listenerConsumer = new ListenerConsumer(this.listener, determineListenerType);
        setRunning(true);
        this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);
    }

    @Override // org.springframework.kafka.listener.AbstractMessageListenerContainer
    protected void doStop(final Runnable runnable) {
        if (isRunning()) {
            this.listenerConsumerFuture.addCallback(new ListenableFutureCallback<Object>() { // from class: org.springframework.kafka.listener.KafkaMessageListenerContainer.1
                public void onFailure(Throwable th) {
                    KafkaMessageListenerContainer.this.logger.error("Error while stopping the container: ", th);
                    if (runnable != null) {
                        runnable.run();
                    }
                }

                public void onSuccess(Object obj) {
                    if (KafkaMessageListenerContainer.this.logger.isDebugEnabled()) {
                        KafkaMessageListenerContainer.this.logger.debug(KafkaMessageListenerContainer.this + " stopped normally");
                    }
                    if (runnable != null) {
                        runnable.run();
                    }
                }
            });
            setRunning(false);
            ((ListenerConsumer) this.listenerConsumer).consumer.wakeup();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void publishIdleContainerEvent(long j, Consumer<?, ?> consumer, boolean z) {
        if (getApplicationEventPublisher() != null) {
            getApplicationEventPublisher().publishEvent(new ListenerContainerIdleEvent(this, j, getBeanName(), getAssignedPartitions(), consumer, z));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void publishNonResponsiveConsumerEvent(long j, Consumer<?, ?> consumer) {
        if (getApplicationEventPublisher() != null) {
            getApplicationEventPublisher().publishEvent(new NonResponsiveConsumerEvent(this, j, getBeanName(), getAssignedPartitions(), consumer));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void publishConsumerPausedEvent(Collection<TopicPartition> collection) {
        if (getApplicationEventPublisher() != null) {
            getApplicationEventPublisher().publishEvent(new ConsumerPausedEvent(this, Collections.unmodifiableCollection(collection)));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void publishConsumerResumedEvent(Collection<TopicPartition> collection) {
        if (getApplicationEventPublisher() != null) {
            getApplicationEventPublisher().publishEvent(new ConsumerResumedEvent(this, Collections.unmodifiableCollection(collection)));
        }
    }

    public String toString() {
        return "KafkaMessageListenerContainer [id=" + getBeanName() + (this.clientIdSuffix != null ? ", clientIndex=" + this.clientIdSuffix : "") + ", topicPartitions=" + (getAssignedPartitions() == null ? "none assigned" : getAssignedPartitions()) + "]";
    }
}
