/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.inbound;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.jspecify.annotations.Nullable;
import org.springframework.core.AttributeAccessor;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.core.Pausable;
import org.springframework.integration.gateway.MessagingGatewaySupport;
import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer;
import org.springframework.integration.kafka.inbound.KafkaInboundEndpoint;
import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.json.JacksonJsonUtils;
import org.springframework.integration.support.json.JacksonMessagingUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.JacksonPresent;
import org.springframework.kafka.support.JsonKafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;

public class KafkaInboundGateway<K, V, R>
extends MessagingGatewaySupport
implements KafkaInboundEndpoint,
Pausable,
OrderlyShutdownCapable {
    private final IntegrationRecordMessageListener listener = new IntegrationRecordMessageListener();
    private final AbstractMessageListenerContainer<K, V> messageListenerContainer;
    private final KafkaTemplate<K, R> kafkaTemplate;
    private @Nullable RetryTemplate retryTemplate;
    private @Nullable RecoveryCallback<?> recoveryCallback;
    private @Nullable BiConsumer<Map<TopicPartition, Long>, // Could not load outer class - annotation placement on inner may be incorrect
    ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback;
    private boolean bindSourceRecord;
    private boolean containerDeliveryAttemptPresent;

    public KafkaInboundGateway(AbstractMessageListenerContainer<K, V> messageListenerContainer, KafkaTemplate<K, R> kafkaTemplate) {
        Assert.notNull(messageListenerContainer, (String)"messageListenerContainer is required");
        Assert.notNull(kafkaTemplate, (String)"kafkaTemplate is required");
        Assert.isNull((Object)messageListenerContainer.getContainerProperties().getMessageListener(), (String)"Container must not already have a listener");
        this.messageListenerContainer = messageListenerContainer;
        this.messageListenerContainer.setAutoStartup(false);
        this.kafkaTemplate = kafkaTemplate;
        this.setErrorMessageStrategy(new RawRecordHeaderErrorMessageStrategy());
        if (JacksonPresent.isJackson3Present()) {
            MessagingMessageConverter messageConverter = new MessagingMessageConverter();
            JsonKafkaHeaderMapper headerMapper = new JsonKafkaHeaderMapper();
            headerMapper.addTrustedPackages(JacksonMessagingUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
            messageConverter.setHeaderMapper((KafkaHeaderMapper)headerMapper);
            this.listener.setMessageConverter((RecordMessageConverter)messageConverter);
        } else if (JacksonPresent.isJackson2Present()) {
            MessagingMessageConverter messageConverter = new MessagingMessageConverter();
            DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
            headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
            messageConverter.setHeaderMapper((KafkaHeaderMapper)headerMapper);
            this.listener.setMessageConverter((RecordMessageConverter)messageConverter);
        }
    }

    public void setMessageConverter(RecordMessageConverter messageConverter) {
        this.listener.setMessageConverter(messageConverter);
    }

    public void setPayloadType(Class<?> payloadType) {
        this.listener.setFallbackType(payloadType);
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        this.retryTemplate = retryTemplate;
    }

    public void setRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
        this.recoveryCallback = recoveryCallback;
    }

    public void setOnPartitionsAssignedSeekCallback(BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) {
        this.onPartitionsAssignedSeekCallback = onPartitionsAssignedCallback;
    }

    public void setBindSourceRecord(boolean bindSourceRecord) {
        this.bindSourceRecord = bindSourceRecord;
    }

    protected void onInit() {
        super.onInit();
        if (this.retryTemplate != null) {
            MessageChannel errorChannel = this.getErrorChannel();
            if (this.recoveryCallback != null && errorChannel != null) {
                this.recoveryCallback = new ErrorMessageSendingRecoverer(errorChannel, this.getErrorMessageStrategy());
            }
        }
        ContainerProperties containerProperties = this.messageListenerContainer.getContainerProperties();
        containerProperties.setMessageListener((Object)this.listener);
        this.containerDeliveryAttemptPresent = containerProperties.isDeliveryAttemptHeader();
    }

    protected void doStart() {
        super.doStart();
        this.messageListenerContainer.start();
    }

    protected void doStop() {
        super.doStop();
        this.messageListenerContainer.stop();
    }

    public void pause() {
        this.messageListenerContainer.pause();
    }

    public void resume() {
        this.messageListenerContainer.resume();
    }

    public boolean isPaused() {
        return this.messageListenerContainer.isContainerPaused();
    }

    public String getComponentType() {
        return "kafka:inbound-gateway";
    }

    public int beforeShutdown() {
        this.messageListenerContainer.stop();
        return this.getPhase();
    }

    public int afterShutdown() {
        return this.getPhase();
    }

    private void setAttributesIfNecessary(Object record, @Nullable Message<?> message, boolean conversionError) {
        AttributeAccessor attributes;
        boolean needAttributes;
        boolean needHolder = ATTRIBUTES_HOLDER.get() == null && this.getErrorChannel() != null && (this.retryTemplate == null || conversionError);
        boolean bl = needAttributes = needHolder || this.retryTemplate != null;
        if (needHolder) {
            ATTRIBUTES_HOLDER.set(ErrorMessageUtils.getAttributeAccessor(null, null));
        }
        if (needAttributes && (attributes = (AttributeAccessor)ATTRIBUTES_HOLDER.get()) != null) {
            attributes.setAttribute("inputMessage", message);
            attributes.setAttribute("kafka_data", record);
        }
    }

    protected AttributeAccessor getErrorMessageAttributes(@Nullable Message<?> message) {
        AttributeAccessor attributes = (AttributeAccessor)ATTRIBUTES_HOLDER.get();
        if (attributes == null) {
            return super.getErrorMessageAttributes(message);
        }
        return attributes;
    }

    private class IntegrationRecordMessageListener
    extends RecordMessagingMessageListenerAdapter<K, V> {
        IntegrationRecordMessageListener() {
            super(null, null);
        }

        public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback) {
            if (KafkaInboundGateway.this.onPartitionsAssignedSeekCallback != null) {
                KafkaInboundGateway.this.onPartitionsAssignedSeekCallback.accept(assignments, callback);
            }
        }

        public void onMessage(ConsumerRecord<K, V> record, @Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer) {
            Message message = null;
            try {
                message = this.toMessagingMessage(record, acknowledgment, consumer);
            }
            catch (RuntimeException ex) {
                MessageChannel errorChannel;
                if (KafkaInboundGateway.this.retryTemplate == null) {
                    KafkaInboundGateway.this.setAttributesIfNecessary(record, null, true);
                }
                if ((errorChannel = KafkaInboundGateway.this.getErrorChannel()) != null) {
                    KafkaInboundGateway.this.messagingTemplate.send((Object)errorChannel, (Message)KafkaInboundGateway.this.buildErrorMessage(null, (Throwable)new ConversionException("Failed to convert to message", record, (Throwable)ex)));
                }
                throw ex;
            }
            if (message != null) {
                this.sendAndReceive(record, message, acknowledgment, consumer);
            } else {
                KafkaInboundGateway.this.logger.debug(() -> "Converter returned a null message for: " + String.valueOf(record));
            }
        }

        private void sendAndReceive(ConsumerRecord<K, V> record, Message<?> message, @Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer) {
            RetryTemplate template = KafkaInboundGateway.this.retryTemplate;
            if (template != null) {
                KafkaInboundGateway.this.doWithRetry(template, KafkaInboundGateway.this.recoveryCallback, record, acknowledgment, consumer, () -> this.doSendAndReceive(this.enhanceHeadersAndSaveAttributes(message, record)));
            } else {
                this.doSendAndReceive(this.enhanceHeadersAndSaveAttributes(message, record));
            }
        }

        private void doSendAndReceive(Message<?> message) {
            try {
                Message<?> reply = KafkaInboundGateway.this.sendAndReceiveMessage(message);
                if (reply != null) {
                    reply = this.enhanceReply(message, reply);
                    KafkaInboundGateway.this.kafkaTemplate.send(reply);
                } else {
                    this.logger.debug(() -> "No reply received for " + String.valueOf(message));
                }
            }
            finally {
                if (KafkaInboundGateway.this.retryTemplate == null) {
                    KafkaInboundEndpoint.ATTRIBUTES_HOLDER.remove();
                }
            }
        }

        private Message<?> enhanceHeadersAndSaveAttributes(Message<?> message, ConsumerRecord<K, V> record) {
            Message messageToReturn = message;
            if (message.getHeaders() instanceof KafkaMessageHeaders) {
                Map rawHeaders = ((KafkaMessageHeaders)message.getHeaders()).getRawHeaders();
                if (KafkaInboundGateway.this.retryTemplate != null) {
                    AtomicInteger deliveryAttempt = new AtomicInteger(((RetryContext)KafkaInboundEndpoint.ATTRIBUTES_HOLDER.get()).getRetryCount() + 1);
                    rawHeaders.put("deliveryAttempt", deliveryAttempt);
                } else if (KafkaInboundGateway.this.containerDeliveryAttemptPresent) {
                    Header header = record.headers().lastHeader("kafka_deliveryAttempt");
                    rawHeaders.put("deliveryAttempt", new AtomicInteger(ByteBuffer.wrap(header.value()).getInt()));
                }
                if (KafkaInboundGateway.this.bindSourceRecord) {
                    rawHeaders.put("sourceData", record);
                }
            } else {
                MessageBuilder builder = MessageBuilder.fromMessage(message);
                if (KafkaInboundGateway.this.retryTemplate != null) {
                    AtomicInteger deliveryAttempt = new AtomicInteger(((RetryContext)KafkaInboundEndpoint.ATTRIBUTES_HOLDER.get()).getRetryCount() + 1);
                    builder.setHeader("deliveryAttempt", (Object)deliveryAttempt);
                } else if (KafkaInboundGateway.this.containerDeliveryAttemptPresent) {
                    Header header = record.headers().lastHeader("kafka_deliveryAttempt");
                    builder.setHeader("deliveryAttempt", (Object)new AtomicInteger(ByteBuffer.wrap(header.value()).getInt()));
                }
                if (KafkaInboundGateway.this.bindSourceRecord) {
                    builder.setHeader("sourceData", record);
                }
                messageToReturn = builder.build();
            }
            KafkaInboundGateway.this.setAttributesIfNecessary(record, messageToReturn, false);
            return messageToReturn;
        }

        private Message<?> enhanceReply(Message<?> message, Message<?> reply) {
            AbstractIntegrationMessageBuilder builder = null;
            MessageHeaders replyHeaders = reply.getHeaders();
            MessageHeaders requestHeaders = message.getHeaders();
            if (replyHeaders.get((Object)"kafka_correlationId") == null && requestHeaders.get((Object)"kafka_correlationId") != null) {
                builder = KafkaInboundGateway.this.getMessageBuilderFactory().fromMessage(reply).setHeader("kafka_correlationId", requestHeaders.get((Object)"kafka_correlationId"));
            }
            if (replyHeaders.get((Object)"kafka_topic") == null && requestHeaders.get((Object)"kafka_replyTopic") != null) {
                if (builder == null) {
                    builder = KafkaInboundGateway.this.getMessageBuilderFactory().fromMessage(reply);
                }
                builder.setHeader("kafka_topic", requestHeaders.get((Object)"kafka_replyTopic"));
            }
            if (replyHeaders.get((Object)"kafka_partitionId") == null && requestHeaders.get((Object)"kafka_replyPartition") != null) {
                if (builder == null) {
                    builder = KafkaInboundGateway.this.getMessageBuilderFactory().fromMessage(reply);
                }
                builder.setHeader("kafka_partitionId", requestHeaders.get((Object)"kafka_replyPartition"));
            }
            if (builder != null) {
                return builder.build();
            }
            return reply;
        }
    }
}

