/*
 * Decompiled with CFR 0.152.
 */
package io.awspring.cloud.sqs.operations;

import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.operations.AsyncMessagingOperations;
import io.awspring.cloud.sqs.operations.MessagingOperationFailedException;
import io.awspring.cloud.sqs.operations.MessagingOperations;
import io.awspring.cloud.sqs.operations.MessagingTemplateOptions;
import io.awspring.cloud.sqs.operations.SendBatchFailureHandlingStrategy;
import io.awspring.cloud.sqs.operations.SendBatchOperationFailedException;
import io.awspring.cloud.sqs.operations.SendResult;
import io.awspring.cloud.sqs.operations.TemplateAcknowledgementMode;
import io.awspring.cloud.sqs.support.converter.ContextAwareMessagingMessageConverter;
import io.awspring.cloud.sqs.support.converter.MessageConversionContext;
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;

public abstract class AbstractMessagingTemplate<S>
implements MessagingOperations,
AsyncMessagingOperations {
    private static final Logger logger = LoggerFactory.getLogger(AbstractMessagingTemplate.class);
    private static final TemplateAcknowledgementMode DEFAULT_ACKNOWLEDGEMENT_MODE = TemplateAcknowledgementMode.ACKNOWLEDGE;
    private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofSeconds(10L);
    private static final SendBatchFailureHandlingStrategy DEFAULT_SEND_BATCH_OPERATION_FAILURE_STRATEGY = SendBatchFailureHandlingStrategy.THROW;
    private static final int DEFAULT_MAX_NUMBER_OF_MESSAGES = 10;
    private static final String DEFAULT_ENDPOINT_NAME = "";
    private static final int MAX_ONE_MESSAGE = 1;
    private final Map<String, Object> defaultAdditionalHeaders;
    private final Duration defaultPollTimeout;
    private final int defaultMaxNumberOfMessages;
    private final String defaultEndpointName;
    private final TemplateAcknowledgementMode acknowledgementMode;
    private final SendBatchFailureHandlingStrategy sendBatchFailureHandlingStrategy;
    @Nullable
    private final Class<?> defaultPayloadClass;
    private final MessagingMessageConverter<S> messageConverter;

    protected AbstractMessagingTemplate(MessagingMessageConverter<S> messageConverter, AbstractMessagingTemplateOptions<?> options) {
        Assert.notNull(messageConverter, (String)"messageConverter must not be null");
        Assert.notNull(options, (String)"options must not be null");
        this.messageConverter = messageConverter;
        this.defaultAdditionalHeaders = options.defaultAdditionalHeaders;
        this.defaultMaxNumberOfMessages = options.defaultMaxNumberOfMessages;
        this.defaultPollTimeout = options.defaultPollTimeout;
        this.defaultPayloadClass = options.defaultPayloadClass;
        this.defaultEndpointName = options.defaultEndpointName;
        this.acknowledgementMode = options.acknowledgementMode;
        this.sendBatchFailureHandlingStrategy = options.sendBatchFailureHandlingStrategy;
    }

    @Override
    public Optional<Message<?>> receive() {
        return this.unwrapCompletionException(this.receiveAsync());
    }

    @Override
    public <T> Optional<Message<T>> receive(String queue, Class<T> payloadClass) {
        return this.unwrapCompletionException(this.receiveAsync(queue, payloadClass));
    }

    @Override
    public Collection<Message<?>> receiveMany() {
        return this.unwrapCompletionException(this.receiveManyAsync());
    }

    @Override
    public <T> Collection<Message<T>> receiveMany(String queue, Class<T> payloadClass) {
        Assert.notNull((Object)queue, (String)"queue cannot be null");
        Assert.notNull(payloadClass, (String)"payloadClass cannot be null");
        return this.unwrapCompletionException(this.receiveManyAsync(queue, payloadClass));
    }

    @Override
    public CompletableFuture<Optional<Message<?>>> receiveAsync() {
        return this.receiveAsync(null, null, null, null);
    }

    @Override
    public <T> CompletableFuture<Optional<Message<T>>> receiveAsync(String queue, Class<T> payloadClass) {
        Assert.notNull((Object)queue, (String)"queue cannot be null");
        Assert.notNull(payloadClass, (String)"payloadClass cannot be null");
        return this.receiveAsync(queue, payloadClass, null, null).thenApply(this::castFromOptional);
    }

    protected <T> Optional<Message<T>> castFromOptional(Optional<Message<?>> optional) {
        return optional.map(msg -> msg);
    }

    protected <T> List<Message<T>> castFromCollection(Collection<Message<?>> msgs) {
        return msgs.stream().map(msg -> msg).toList();
    }

    protected CompletableFuture<Optional<Message<?>>> receiveAsync(@Nullable String endpoint, @Nullable Class<?> payloadClass, @Nullable Duration pollTimeout, @Nullable Map<String, Object> additionalHeaders) {
        return this.receiveManyAsync(endpoint, payloadClass, pollTimeout, 1, additionalHeaders).thenApply(msgs -> msgs.isEmpty() ? Optional.empty() : Optional.of((Message)msgs.iterator().next()));
    }

    @Override
    public CompletableFuture<Collection<Message<?>>> receiveManyAsync() {
        return this.receiveManyAsync(null, null, null, null, null);
    }

    @Override
    public <T> CompletableFuture<Collection<Message<T>>> receiveManyAsync(String queue, Class<T> payloadClass) {
        return this.receiveManyAsync(queue, payloadClass, null, null, null).thenApply(this::castFromCollection);
    }

    protected CompletableFuture<Collection<Message<?>>> receiveManyAsync(@Nullable String endpoint, @Nullable Class<?> payloadClass, @Nullable Duration pollTimeout, @Nullable Integer maxNumberOfMessages, @Nullable Map<String, Object> additionalHeaders) {
        String endpointToUse = this.getEndpointName(endpoint);
        logger.trace("Receiving messages from endpoint {}", (Object)endpointToUse);
        Map<String, Object> headers = this.getAdditionalHeadersToReceive(endpointToUse, additionalHeaders);
        Duration pollTimeoutToUse = this.getOrDefault(pollTimeout, this.defaultPollTimeout, "pollTimeout");
        Integer maxNumberOfMessagesToUse = this.getOrDefault(maxNumberOfMessages, this.defaultMaxNumberOfMessages, "defaultMaxNumberOfMessages");
        return ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.doReceiveAsync(endpointToUse, pollTimeoutToUse, maxNumberOfMessagesToUse, headers).thenApply(messages -> this.convertReceivedMessages(endpointToUse, payloadClass, (Collection<S>)messages, headers))).thenCompose(messages -> this.handleAcknowledgement(endpointToUse, (Collection<Message<?>>)messages))).exceptionallyCompose(t -> CompletableFuture.failedFuture((Throwable)((Object)new MessagingOperationFailedException("Message receive operation failed for endpoint %s".formatted(endpointToUse), endpointToUse, t instanceof CompletionException ? t.getCause() : t))))).whenComplete((v, t) -> this.logReceiveMessageResult(endpointToUse, (Collection<Message<?>>)v, (Throwable)t));
    }

    protected abstract Map<String, Object> preProcessHeadersForReceive(String var1, Map<String, Object> var2);

    private Map<String, Object> getAdditionalHeadersToReceive(String endpointToUse, @Nullable Map<String, Object> additionalHeaders) {
        HashMap<String, Object> headers = new HashMap<String, Object>(this.defaultAdditionalHeaders);
        if (additionalHeaders != null) {
            headers.putAll(additionalHeaders);
        }
        return this.preProcessHeadersForReceive(endpointToUse, headers);
    }

    private Collection<Message<?>> convertReceivedMessages(String endpoint, @Nullable Class<?> payloadClass, Collection<S> messages, Map<String, Object> additionalHeaders) {
        return messages.stream().map(message -> this.convertReceivedMessage(this.getEndpointName(endpoint), message, payloadClass != null ? payloadClass : this.defaultPayloadClass)).map(message -> this.addAdditionalHeaders((Message)message, additionalHeaders)).collect(Collectors.toList());
    }

    protected <T> Message<T> addAdditionalHeaders(Message<T> message, Map<String, Object> additionalHeaders) {
        Map<String, Object> headersToAdd = this.handleAdditionalHeaders(additionalHeaders);
        return headersToAdd.isEmpty() ? message : MessageHeaderUtils.addHeadersIfAbsent(message, headersToAdd);
    }

    protected abstract Map<String, Object> handleAdditionalHeaders(Map<String, Object> var1);

    private CompletableFuture<Collection<Message<?>>> handleAcknowledgement(@Nullable String endpoint, Collection<Message<?>> messages) {
        return TemplateAcknowledgementMode.ACKNOWLEDGE.equals((Object)this.acknowledgementMode) && !messages.isEmpty() ? this.doAcknowledgeMessages(this.getEndpointName(endpoint), messages).thenApply(theVoid -> messages) : CompletableFuture.completedFuture(messages);
    }

    protected abstract CompletableFuture<Void> doAcknowledgeMessages(String var1, Collection<Message<?>> var2);

    private String getEndpointName(@Nullable String endpoint) {
        String endpointName = this.getOrDefault(endpoint, this.defaultEndpointName, "endpointName");
        Assert.hasText((String)endpointName, (String)"No endpoint name informed and no default value available");
        return endpointName;
    }

    private <V> V getOrDefault(@Nullable V value, V defaultValue, String valueName) {
        return Objects.requireNonNull(value != null ? value : defaultValue, valueName + " not set and no default value provided");
    }

    private Message<?> convertReceivedMessage(String endpoint, S message, @Nullable Class<?> payloadClass) {
        Message<?> message2;
        MessagingMessageConverter<S> messagingMessageConverter = this.messageConverter;
        if (messagingMessageConverter instanceof ContextAwareMessagingMessageConverter) {
            ContextAwareMessagingMessageConverter contextConverter = (ContextAwareMessagingMessageConverter)messagingMessageConverter;
            message2 = contextConverter.toMessagingMessage(message, this.getReceiveMessageConversionContext(endpoint, payloadClass));
        } else {
            message2 = this.messageConverter.toMessagingMessage(message);
        }
        return message2;
    }

    private void logReceiveMessageResult(String endpoint, @Nullable Collection<Message<?>> messages, @Nullable Throwable t) {
        if (messages != null) {
            logger.trace("Received messages {} from endpoint {}", (Object)MessageHeaderUtils.getId(this.addTypeToMessages(messages)), (Object)endpoint);
        } else {
            logger.error("Error receiving messages", t);
        }
    }

    protected Collection<Message<Object>> addTypeToMessages(Collection<Message<?>> messages) {
        return messages.stream().map(message -> message).collect(Collectors.toList());
    }

    protected abstract CompletableFuture<Collection<S>> doReceiveAsync(String var1, Duration var2, Integer var3, Map<String, Object> var4);

    @Override
    public <T> SendResult<T> send(T payload) {
        return this.unwrapCompletionException(this.sendAsync(payload));
    }

    @Override
    public <T> SendResult<T> send(@Nullable String endpointName, T payload) {
        return this.unwrapCompletionException(this.sendAsync(endpointName, payload));
    }

    @Override
    public <T> SendResult<T> send(@Nullable String endpointName, Message<T> message) {
        return this.unwrapCompletionException(this.sendAsync(endpointName, message));
    }

    @Override
    public <T> SendResult.Batch<T> sendMany(@Nullable String endpointName, Collection<Message<T>> messages) {
        return this.unwrapCompletionException(this.sendManyAsync(endpointName, messages));
    }

    @Override
    public <T> CompletableFuture<SendResult<T>> sendAsync(T payload) {
        return this.sendAsync(null, payload);
    }

    @Override
    public <T> CompletableFuture<SendResult<T>> sendAsync(@Nullable String endpointName, T payload) {
        return this.sendAsync(endpointName, payload instanceof Message ? (Message)payload : MessageBuilder.withPayload(payload).build());
    }

    @Override
    public <T> CompletableFuture<SendResult<T>> sendAsync(@Nullable String endpointName, Message<T> message) {
        String endpointToUse = this.getEndpointName(endpointName);
        Message<T> messageToUse = this.preProcessMessageForSend(endpointToUse, message);
        logger.trace("Sending message {} to endpoint {}", (Object)MessageHeaderUtils.getId(message), (Object)endpointName);
        return ((CompletableFuture)this.doSendAsync(endpointToUse, this.convertMessageToSend(messageToUse), messageToUse).exceptionallyCompose(t -> CompletableFuture.failedFuture((Throwable)((Object)new MessagingOperationFailedException("Message send operation failed for message %s to endpoint %s".formatted(MessageHeaderUtils.getId(message), endpointToUse), endpointToUse, message, (Throwable)t))))).whenComplete((v, t) -> this.logSendMessageResult(endpointToUse, message, (Throwable)t));
    }

    protected abstract <T> Message<T> preProcessMessageForSend(String var1, Message<T> var2);

    @Override
    public <T> CompletableFuture<SendResult.Batch<T>> sendManyAsync(@Nullable String endpointName, Collection<Message<T>> messages) {
        logger.trace("Sending messages {} to endpoint {}", (Object)MessageHeaderUtils.getId(messages), (Object)endpointName);
        String endpointToUse = this.getEndpointName(endpointName);
        Collection messagesToUse = this.preProcessMessagesForSend(endpointToUse, messages);
        return ((CompletableFuture)((CompletableFuture)this.doSendBatchAsync(endpointToUse, this.convertMessagesToSend(messagesToUse), messagesToUse).exceptionallyCompose(t -> this.wrapSendException(messagesToUse, endpointToUse, (Throwable)t))).thenCompose(result -> this.handleFailedMessages(endpointToUse, (SendResult.Batch)result))).whenComplete((v, t) -> this.logSendMessageBatchResult(endpointToUse, messagesToUse, (Throwable)t));
    }

    protected abstract <T> Collection<Message<T>> preProcessMessagesForSend(String var1, Collection<Message<T>> var2);

    private <T> CompletableFuture<SendResult.Batch<T>> handleFailedMessages(String endpointToUse, SendResult.Batch<T> result) {
        return !result.failed().isEmpty() && SendBatchFailureHandlingStrategy.THROW.equals((Object)this.sendBatchFailureHandlingStrategy) ? this.handleFailedSendBatch(endpointToUse, result) : CompletableFuture.completedFuture(result);
    }

    private <T> CompletableFuture<SendResult.Batch<T>> wrapSendException(Collection<Message<T>> messages, String endpointToUse, Throwable t) {
        return CompletableFuture.failedFuture((Throwable)((Object)new MessagingOperationFailedException("Message send operation failed for messages %s to endpoint %s".formatted(MessageHeaderUtils.getId(messages), endpointToUse), endpointToUse, messages, t)));
    }

    private <T> CompletableFuture<SendResult.Batch<T>> handleFailedSendBatch(String endpoint, SendResult.Batch<T> result) {
        return CompletableFuture.failedFuture((Throwable)((Object)new SendBatchOperationFailedException(DEFAULT_ENDPOINT_NAME, endpoint, result)));
    }

    private <T> Collection<S> convertMessagesToSend(Collection<Message<T>> messages) {
        return messages.stream().map(this::convertMessageToSend).collect(Collectors.toList());
    }

    private <T> S convertMessageToSend(Message<T> message) {
        S s;
        MessagingMessageConverter<S> messagingMessageConverter = this.messageConverter;
        if (messagingMessageConverter instanceof ContextAwareMessagingMessageConverter) {
            ContextAwareMessagingMessageConverter contextConverter = (ContextAwareMessagingMessageConverter)messagingMessageConverter;
            s = contextConverter.fromMessagingMessage(message, this.getSendMessageConversionContext(message));
        } else {
            s = this.messageConverter.fromMessagingMessage(message);
        }
        return s;
    }

    protected abstract <T> CompletableFuture<SendResult<T>> doSendAsync(String var1, S var2, Message<T> var3);

    protected abstract <T> CompletableFuture<SendResult.Batch<T>> doSendBatchAsync(String var1, Collection<S> var2, Collection<Message<T>> var3);

    @Nullable
    protected <T> MessageConversionContext getReceiveMessageConversionContext(String endpointName, @Nullable Class<T> payloadClass) {
        return null;
    }

    @Nullable
    protected <T> MessageConversionContext getSendMessageConversionContext(Message<T> message) {
        return null;
    }

    private <T> void logSendMessageResult(String endpointToUse, Message<T> message, @Nullable Throwable t) {
        if (t == null) {
            logger.trace("Message {} successfully sent to endpoint {} with id {}", new Object[]{message, endpointToUse, MessageHeaderUtils.getId(message)});
        } else {
            logger.error("Error sending message {} to endpoint {}", new Object[]{MessageHeaderUtils.getId(message), endpointToUse, this.unwrapCompletionException(t)});
        }
    }

    private Throwable unwrapCompletionException(Throwable t) {
        return t instanceof CompletionException && t.getCause() != null ? t.getCause() : t;
    }

    private <T> void logSendMessageBatchResult(String endpointToUse, Collection<Message<T>> messages, @Nullable Throwable t) {
        if (t == null) {
            logger.trace("Messages {} successfully sent to endpoint {} with id {}", new Object[]{messages, endpointToUse, MessageHeaderUtils.getId(messages)});
        } else {
            logger.error("Error sending messages {} to endpoint {}", new Object[]{MessageHeaderUtils.getId(messages), endpointToUse, this.unwrapCompletionException(t)});
        }
    }

    protected <V> V unwrapCompletionException(CompletableFuture<V> future) {
        try {
            return future.join();
        }
        catch (CompletionException ex) {
            Throwable throwable = ex.getCause();
            if (throwable instanceof RuntimeException) {
                RuntimeException re = (RuntimeException)throwable;
                throw re;
            }
            throw new RuntimeException("Unexpected exception", ex);
        }
    }

    protected static abstract class AbstractMessagingTemplateOptions<O extends MessagingTemplateOptions<O>>
    implements MessagingTemplateOptions<O> {
        private Duration defaultPollTimeout = DEFAULT_POLL_TIMEOUT;
        private int defaultMaxNumberOfMessages = 10;
        private String defaultEndpointName = "";
        private TemplateAcknowledgementMode acknowledgementMode = DEFAULT_ACKNOWLEDGEMENT_MODE;
        private SendBatchFailureHandlingStrategy sendBatchFailureHandlingStrategy = DEFAULT_SEND_BATCH_OPERATION_FAILURE_STRATEGY;
        private final Map<String, Object> defaultAdditionalHeaders = new HashMap<String, Object>();
        @Nullable
        private Class<?> defaultPayloadClass;

        protected AbstractMessagingTemplateOptions() {
        }

        @Override
        public O acknowledgementMode(TemplateAcknowledgementMode defaultAcknowledgementMode) {
            Assert.notNull((Object)((Object)defaultAcknowledgementMode), (String)"defaultAcknowledgementMode must not be null");
            this.acknowledgementMode = defaultAcknowledgementMode;
            return this.self();
        }

        @Override
        public O sendBatchFailureHandlingStrategy(SendBatchFailureHandlingStrategy sendBatchFailureHandlingStrategy) {
            Assert.notNull((Object)((Object)sendBatchFailureHandlingStrategy), (String)"sendBatchFailureStrategy must not be null");
            this.sendBatchFailureHandlingStrategy = sendBatchFailureHandlingStrategy;
            return this.self();
        }

        @Override
        public O defaultPollTimeout(Duration defaultPollTimeout) {
            Assert.notNull((Object)defaultPollTimeout, (String)"pollTimeout must not be null");
            this.defaultPollTimeout = defaultPollTimeout;
            return this.self();
        }

        @Override
        public O defaultMaxNumberOfMessages(Integer defaultMaxNumberOfMessages) {
            Assert.isTrue((defaultMaxNumberOfMessages > 0 ? 1 : 0) != 0, (String)"defaultMaxNumberOfMessages must be greater than zero");
            this.defaultMaxNumberOfMessages = defaultMaxNumberOfMessages;
            return this.self();
        }

        protected void defaultEndpointName(String defaultEndpointName) {
            Assert.notNull((Object)defaultEndpointName, (String)"defaultEndpointName must not be null");
            this.defaultEndpointName = defaultEndpointName;
        }

        @Override
        public O defaultPayloadClass(Class<?> defaultPayloadClass) {
            Assert.notNull(defaultPayloadClass, (String)"defaultPayloadClass must not be null");
            this.defaultPayloadClass = defaultPayloadClass;
            return this.self();
        }

        @Override
        public O additionalHeaderForReceive(String name, Object value) {
            Assert.notNull((Object)name, (String)"name must not be null");
            Assert.notNull((Object)value, (String)"value must not be null");
            this.defaultAdditionalHeaders.put(name, value);
            return this.self();
        }

        @Override
        public O additionalHeadersForReceive(Map<String, Object> defaultAdditionalHeaders) {
            Assert.notNull(defaultAdditionalHeaders, (String)"defaultAdditionalHeaders must not be null");
            this.defaultAdditionalHeaders.putAll(defaultAdditionalHeaders);
            return this.self();
        }

        public O self() {
            return (O)this;
        }
    }
}

