/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.inbound;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.springframework.core.AttributeAccessor;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.core.Pausable;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.BatchMessageListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.FilteringBatchMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;

public class KafkaMessageDrivenChannelAdapter<K, V>
extends MessageProducerSupport
implements OrderlyShutdownCapable,
Pausable {
    private static final ThreadLocal<AttributeAccessor> attributesHolder = new ThreadLocal();
    private final AbstractMessageListenerContainer<K, V> messageListenerContainer;
    private final IntegrationRecordMessageListener recordListener = new IntegrationRecordMessageListener();
    private final IntegrationBatchMessageListener batchListener = new IntegrationBatchMessageListener();
    private final ListenerMode mode;
    private RecordFilterStrategy<K, V> recordFilterStrategy;
    private boolean ackDiscarded;
    private RetryTemplate retryTemplate;
    private RecoveryCallback<? extends Object> recoveryCallback;
    private boolean filterInRetry;
    private BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback;
    private boolean bindSourceRecord;
    private boolean containerDeliveryAttemptPresent;

    public KafkaMessageDrivenChannelAdapter(AbstractMessageListenerContainer<K, V> messageListenerContainer) {
        this(messageListenerContainer, ListenerMode.record);
    }

    public KafkaMessageDrivenChannelAdapter(AbstractMessageListenerContainer<K, V> messageListenerContainer, ListenerMode mode) {
        Assert.notNull(messageListenerContainer, (String)"messageListenerContainer is required");
        Assert.isNull((Object)messageListenerContainer.getContainerProperties().getMessageListener(), (String)"Container must not already have a listener");
        this.messageListenerContainer = messageListenerContainer;
        this.messageListenerContainer.setAutoStartup(false);
        this.mode = mode;
        this.setErrorMessageStrategy(new RawRecordHeaderErrorMessageStrategy());
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        if (messageConverter instanceof RecordMessageConverter) {
            this.recordListener.setMessageConverter((RecordMessageConverter)messageConverter);
        } else if (messageConverter instanceof BatchMessageConverter) {
            this.batchListener.setBatchMessageConverter((BatchMessageConverter)messageConverter);
        } else {
            throw new IllegalArgumentException("Message converter must be a 'RecordMessageConverter' or 'BatchMessageConverter'");
        }
    }

    public void setRecordMessageConverter(RecordMessageConverter messageConverter) {
        this.recordListener.setMessageConverter(messageConverter);
    }

    public void setBatchMessageConverter(BatchMessageConverter messageConverter) {
        this.batchListener.setBatchMessageConverter(messageConverter);
    }

    public void setRecordFilterStrategy(RecordFilterStrategy<K, V> recordFilterStrategy) {
        this.recordFilterStrategy = recordFilterStrategy;
    }

    public void setAckDiscarded(boolean ackDiscarded) {
        this.ackDiscarded = ackDiscarded;
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        Assert.isTrue((retryTemplate == null || this.mode.equals((Object)ListenerMode.record) ? 1 : 0) != 0, (String)"Retry is not supported with mode=batch");
        this.retryTemplate = retryTemplate;
    }

    public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
        this.recoveryCallback = recoveryCallback;
    }

    public void setFilterInRetry(boolean filterInRetry) {
        this.filterInRetry = filterInRetry;
    }

    public void setPayloadType(Class<?> payloadType) {
        this.recordListener.setFallbackType(payloadType);
        this.batchListener.setFallbackType(payloadType);
    }

    public void setOnPartitionsAssignedSeekCallback(BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) {
        this.onPartitionsAssignedSeekCallback = onPartitionsAssignedCallback;
    }

    public void setBindSourceRecord(boolean bindSourceRecord) {
        this.bindSourceRecord = bindSourceRecord;
    }

    public String getComponentType() {
        return "kafka:message-driven-channel-adapter";
    }

    protected void onInit() {
        super.onInit();
        if (this.retryTemplate != null) {
            Assert.state((this.getErrorChannel() == null ? 1 : 0) != 0, (String)"Cannot have an 'errorChannel' property when a 'RetryTemplate' is provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to send an error message when retries are exhausted");
        }
        if (this.mode.equals((Object)ListenerMode.record)) {
            boolean doFilterInRetry;
            IntegrationRecordMessageListener listener = this.recordListener;
            boolean bl = doFilterInRetry = this.filterInRetry && this.retryTemplate != null && this.recordFilterStrategy != null;
            if (doFilterInRetry) {
                listener = new FilteringMessageListenerAdapter((MessageListener)listener, this.recordFilterStrategy, this.ackDiscarded);
                listener = new RetryingMessageListenerAdapter((MessageListener)listener, this.retryTemplate, this.recoveryCallback);
                this.retryTemplate.registerListener((RetryListener)this.recordListener);
            } else {
                if (this.retryTemplate != null) {
                    listener = new RetryingMessageListenerAdapter((MessageListener)listener, this.retryTemplate, this.recoveryCallback);
                    this.retryTemplate.registerListener((RetryListener)this.recordListener);
                }
                if (this.recordFilterStrategy != null) {
                    listener = new FilteringMessageListenerAdapter((MessageListener)listener, this.recordFilterStrategy, this.ackDiscarded);
                }
            }
            this.messageListenerContainer.getContainerProperties().setMessageListener((Object)listener);
        } else {
            IntegrationBatchMessageListener listener = this.batchListener;
            if (this.recordFilterStrategy != null) {
                listener = new FilteringBatchMessageListenerAdapter((BatchMessageListener)listener, this.recordFilterStrategy, this.ackDiscarded);
            }
            this.messageListenerContainer.getContainerProperties().setMessageListener((Object)listener);
        }
        this.containerDeliveryAttemptPresent = this.messageListenerContainer.getContainerProperties().isDeliveryAttemptHeader();
    }

    protected void doStart() {
        this.messageListenerContainer.start();
    }

    protected void doStop() {
        this.messageListenerContainer.stop();
    }

    public void pause() {
        this.messageListenerContainer.pause();
    }

    public void resume() {
        this.messageListenerContainer.resume();
    }

    public int beforeShutdown() {
        this.messageListenerContainer.stop();
        return this.getPhase();
    }

    public int afterShutdown() {
        return this.getPhase();
    }

    private void setAttributesIfNecessary(Object record, Message<?> message) {
        AttributeAccessor attributes;
        boolean needHolder = this.getErrorChannel() != null && this.retryTemplate == null;
        boolean needAttributes = needHolder | this.retryTemplate != null;
        if (needHolder) {
            attributesHolder.set(ErrorMessageUtils.getAttributeAccessor(null, null));
        }
        if (needAttributes && (attributes = attributesHolder.get()) != null) {
            attributes.setAttribute("inputMessage", message);
            attributes.setAttribute("kafka_data", record);
        }
    }

    protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
        AttributeAccessor attributes = attributesHolder.get();
        if (attributes == null) {
            return super.getErrorMessageAttributes(message);
        }
        return attributes;
    }

    private void sendMessageIfAny(Message<?> message, Object kafkaConsumedObject) {
        if (message != null) {
            try {
                this.sendMessage(message);
            }
            finally {
                if (this.retryTemplate == null) {
                    attributesHolder.remove();
                }
            }
        } else {
            this.logger.debug((Object)("Converter returned a null message for: " + kafkaConsumedObject));
        }
    }

    private class IntegrationBatchMessageListener
    extends BatchMessagingMessageListenerAdapter<K, V>
    implements RetryListener {
        IntegrationBatchMessageListener() {
            super(null, null);
        }

        public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback) {
            if (KafkaMessageDrivenChannelAdapter.this.onPartitionsAssignedSeekCallback != null) {
                KafkaMessageDrivenChannelAdapter.this.onPartitionsAssignedSeekCallback.accept(assignments, callback);
            }
        }

        public void onMessage(List<ConsumerRecord<K, V>> records, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
            Message message;
            block2: {
                message = null;
                try {
                    message = this.toMessagingMessage(records, acknowledgment, consumer);
                    KafkaMessageDrivenChannelAdapter.this.setAttributesIfNecessary(records, message);
                }
                catch (RuntimeException e) {
                    ConversionException exception = new ConversionException("Failed to convert to message for: " + records, (Throwable)e);
                    if (KafkaMessageDrivenChannelAdapter.this.getErrorChannel() == null) break block2;
                    KafkaMessageDrivenChannelAdapter.this.getMessagingTemplate().send((Object)KafkaMessageDrivenChannelAdapter.this.getErrorChannel(), (Message)new ErrorMessage((Throwable)exception));
                }
            }
            KafkaMessageDrivenChannelAdapter.this.sendMessageIfAny(message, records);
        }

        public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
            if (KafkaMessageDrivenChannelAdapter.this.retryTemplate != null) {
                attributesHolder.set(context);
            }
            return true;
        }

        public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
            attributesHolder.remove();
        }

        public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
        }
    }

    private class IntegrationRecordMessageListener
    extends RecordMessagingMessageListenerAdapter<K, V>
    implements RetryListener {
        IntegrationRecordMessageListener() {
            super(null, null);
        }

        public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback) {
            if (KafkaMessageDrivenChannelAdapter.this.onPartitionsAssignedSeekCallback != null) {
                KafkaMessageDrivenChannelAdapter.this.onPartitionsAssignedSeekCallback.accept(assignments, callback);
            }
        }

        public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
            Message<?> message = null;
            try {
                message = this.enhanceHeaders(this.toMessagingMessage(record, acknowledgment, consumer), record);
                KafkaMessageDrivenChannelAdapter.this.setAttributesIfNecessary(record, message);
            }
            catch (RuntimeException e) {
                ConversionException exception = new ConversionException("Failed to convert to message for: " + record, (Throwable)e);
                KafkaMessageDrivenChannelAdapter.this.sendErrorMessageIfNecessary(null, (Exception)exception);
            }
            KafkaMessageDrivenChannelAdapter.this.sendMessageIfAny(message, record);
        }

        private Message<?> enhanceHeaders(Message<?> message, ConsumerRecord<K, V> record) {
            Message messageToReturn = message;
            if (message.getHeaders() instanceof KafkaMessageHeaders) {
                Map rawHeaders = ((KafkaMessageHeaders)message.getHeaders()).getRawHeaders();
                if (KafkaMessageDrivenChannelAdapter.this.retryTemplate != null) {
                    AtomicInteger deliveryAttempt = new AtomicInteger(((RetryContext)attributesHolder.get()).getRetryCount() + 1);
                    rawHeaders.put("deliveryAttempt", deliveryAttempt);
                } else if (KafkaMessageDrivenChannelAdapter.this.containerDeliveryAttemptPresent) {
                    Header header = record.headers().lastHeader("kafka_deliveryAttempt");
                    rawHeaders.put("deliveryAttempt", new AtomicInteger(ByteBuffer.wrap(header.value()).getInt()));
                }
                if (KafkaMessageDrivenChannelAdapter.this.bindSourceRecord) {
                    rawHeaders.put("sourceData", record);
                }
            } else {
                MessageBuilder builder = MessageBuilder.fromMessage(message);
                if (KafkaMessageDrivenChannelAdapter.this.retryTemplate != null) {
                    AtomicInteger deliveryAttempt = new AtomicInteger(((RetryContext)attributesHolder.get()).getRetryCount() + 1);
                    builder.setHeader("deliveryAttempt", (Object)deliveryAttempt);
                } else if (KafkaMessageDrivenChannelAdapter.this.containerDeliveryAttemptPresent) {
                    Header header = record.headers().lastHeader("kafka_deliveryAttempt");
                    builder.setHeader("deliveryAttempt", (Object)new AtomicInteger(ByteBuffer.wrap(header.value()).getInt()));
                }
                if (KafkaMessageDrivenChannelAdapter.this.bindSourceRecord) {
                    builder.setHeader("sourceData", record);
                }
                messageToReturn = builder.build();
            }
            return messageToReturn;
        }

        public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
            if (KafkaMessageDrivenChannelAdapter.this.retryTemplate != null) {
                attributesHolder.set(context);
            }
            return true;
        }

        public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
            attributesHolder.remove();
        }

        public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
        }
    }

    public static enum ListenerMode {
        record,
        batch;

    }
}

