/*
 * Decompiled with CFR 0.152.
 */
package io.awspring.cloud.messaging.listener;

import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.MessageSystemAttributeName;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import io.awspring.cloud.messaging.core.QueueMessageUtils;
import io.awspring.cloud.messaging.listener.AbstractMessageListenerContainer;
import io.awspring.cloud.messaging.listener.DeleteMessageHandler;
import io.awspring.cloud.messaging.listener.QueueMessageAcknowledgment;
import io.awspring.cloud.messaging.listener.QueueMessageVisibility;
import io.awspring.cloud.messaging.listener.SqsMessageDeletionPolicy;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.messaging.MessagingException;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

public class SimpleMessageListenerContainer
extends AbstractMessageListenerContainer {
    private static final int DEFAULT_WORKER_THREADS = 2;
    private static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(SimpleMessageListenerContainer.class) + "-";
    private boolean defaultTaskExecutor;
    private long backOffTime = 10000L;
    private long queueStopTimeout = 20000L;
    private AsyncTaskExecutor taskExecutor;
    private ConcurrentHashMap<String, Future<?>> scheduledFutureByQueue;
    private ConcurrentHashMap<String, Boolean> runningStateByQueue;

    protected AsyncTaskExecutor getTaskExecutor() {
        return this.taskExecutor;
    }

    public void setTaskExecutor(AsyncTaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    public long getBackOffTime() {
        return this.backOffTime;
    }

    public void setBackOffTime(long backOffTime) {
        this.backOffTime = backOffTime;
    }

    public long getQueueStopTimeout() {
        return this.queueStopTimeout;
    }

    public void setQueueStopTimeout(long queueStopTimeout) {
        this.queueStopTimeout = queueStopTimeout;
    }

    @Override
    protected void initialize() {
        super.initialize();
        if (this.taskExecutor == null) {
            this.defaultTaskExecutor = true;
            this.taskExecutor = this.createDefaultTaskExecutor();
        }
        this.initializeRunningStateByQueue();
        this.scheduledFutureByQueue = new ConcurrentHashMap(this.getRegisteredQueues().size());
    }

    private void initializeRunningStateByQueue() {
        this.runningStateByQueue = new ConcurrentHashMap(this.getRegisteredQueues().size());
        for (String queueName : this.getRegisteredQueues().keySet()) {
            this.runningStateByQueue.put(queueName, false);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doStart() {
        Object object = this.getLifecycleMonitor();
        synchronized (object) {
            this.scheduleMessageListeners();
        }
    }

    @Override
    protected void doStop() {
        this.notifyRunningQueuesToStop();
        this.waitForRunningQueuesToStop();
    }

    private void notifyRunningQueuesToStop() {
        for (Map.Entry<String, Boolean> runningStateByQueue : this.runningStateByQueue.entrySet()) {
            if (!runningStateByQueue.getValue().booleanValue()) continue;
            this.stopQueue(runningStateByQueue.getKey());
        }
    }

    private void waitForRunningQueuesToStop() {
        for (Map.Entry<String, Boolean> queueRunningState : this.runningStateByQueue.entrySet()) {
            String logicalQueueName = queueRunningState.getKey();
            Future<?> queueSpinningThread = this.scheduledFutureByQueue.get(logicalQueueName);
            if (queueSpinningThread == null) continue;
            try {
                queueSpinningThread.get(this.getQueueStopTimeout(), TimeUnit.MILLISECONDS);
            }
            catch (ExecutionException | TimeoutException e) {
                this.getLogger().warn("An exception occurred while stopping queue '" + logicalQueueName + "'", (Throwable)e);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override
    protected void doDestroy() {
        if (this.defaultTaskExecutor) {
            ((ThreadPoolTaskExecutor)this.taskExecutor).destroy();
        }
    }

    protected AsyncTaskExecutor createDefaultTaskExecutor() {
        String beanName = this.getBeanName();
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setThreadNamePrefix(beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
        int spinningThreads = this.getRegisteredQueues().size();
        if (spinningThreads > 0) {
            threadPoolTaskExecutor.setCorePoolSize(spinningThreads * 2);
            int maxNumberOfMessagePerBatch = this.getMaxNumberOfMessages() != null ? this.getMaxNumberOfMessages() : 10;
            threadPoolTaskExecutor.setMaxPoolSize(spinningThreads * (maxNumberOfMessagePerBatch + 1));
        }
        threadPoolTaskExecutor.setQueueCapacity(0);
        threadPoolTaskExecutor.afterPropertiesSet();
        return threadPoolTaskExecutor;
    }

    private void scheduleMessageListeners() {
        for (Map.Entry<String, AbstractMessageListenerContainer.QueueAttributes> registeredQueue : this.getRegisteredQueues().entrySet()) {
            this.startQueue(registeredQueue.getKey(), registeredQueue.getValue());
        }
    }

    protected void executeMessage(org.springframework.messaging.Message<String> stringMessage) {
        this.getMessageHandler().handleMessage(stringMessage);
    }

    public void stop(String logicalQueueName) {
        this.stopQueue(logicalQueueName);
        try {
            Future<?> future;
            if (this.isRunning(logicalQueueName) && (future = this.scheduledFutureByQueue.remove(logicalQueueName)) != null) {
                future.get(this.queueStopTimeout, TimeUnit.MILLISECONDS);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException | TimeoutException e) {
            this.getLogger().warn("Error stopping queue with name: '" + logicalQueueName + "'", (Throwable)e);
        }
    }

    protected void stopQueue(String logicalQueueName) {
        Assert.isTrue((boolean)this.runningStateByQueue.containsKey(logicalQueueName), (String)("Queue with name '" + logicalQueueName + "' does not exist"));
        this.runningStateByQueue.put(logicalQueueName, false);
    }

    public void start(String logicalQueueName) {
        Assert.isTrue((boolean)this.runningStateByQueue.containsKey(logicalQueueName), (String)("Queue with name '" + logicalQueueName + "' does not exist"));
        AbstractMessageListenerContainer.QueueAttributes queueAttributes = this.getRegisteredQueues().get(logicalQueueName);
        this.startQueue(logicalQueueName, queueAttributes);
    }

    public boolean isRunning(String logicalQueueName) {
        Future<?> future = this.scheduledFutureByQueue.get(logicalQueueName);
        return future != null && !future.isCancelled() && !future.isDone();
    }

    protected void startQueue(String queueName, AbstractMessageListenerContainer.QueueAttributes queueAttributes) {
        if (this.isQueueRunning(queueName)) {
            return;
        }
        this.runningStateByQueue.put(queueName, true);
        Future future = this.getTaskExecutor().submit((Runnable)new AsynchronousMessageListener(queueName, queueAttributes));
        this.scheduledFutureByQueue.put(queueName, future);
    }

    protected boolean isQueueRunning(String logicalQueueName) {
        if (this.runningStateByQueue.containsKey(logicalQueueName)) {
            return this.runningStateByQueue.get(logicalQueueName);
        }
        this.getLogger().warn("Stopped queue '" + logicalQueueName + "' because it was not listed as running queue.");
        return false;
    }

    private final class MessageGroupExecutor
    implements Runnable {
        private final MessageGroup messageGroup;
        private final String logicalQueueName;
        private final String queueUrl;
        private final boolean hasRedrivePolicy;
        private final SqsMessageDeletionPolicy deletionPolicy;

        private MessageGroupExecutor(String logicalQueueName, MessageGroup messageGroup, AbstractMessageListenerContainer.QueueAttributes queueAttributes) {
            this.logicalQueueName = logicalQueueName;
            this.messageGroup = messageGroup;
            this.queueUrl = queueAttributes.getReceiveMessageRequest().getQueueUrl();
            this.hasRedrivePolicy = queueAttributes.hasRedrivePolicy();
            this.deletionPolicy = queueAttributes.getDeletionPolicy();
        }

        @Override
        public void run() {
            for (Message message : this.messageGroup.getMessages()) {
                String receiptHandle = message.getReceiptHandle();
                org.springframework.messaging.Message<String> queueMessage = this.getMessageForExecution(message);
                try {
                    SimpleMessageListenerContainer.this.executeMessage(queueMessage);
                    this.applyDeletionPolicyOnSuccess(receiptHandle);
                }
                catch (MessagingException messagingException) {
                    SimpleMessageListenerContainer.this.getLogger().warn("An exception occurred while handling message with id: {}", (Object)message.getMessageId(), (Object)messagingException);
                    this.applyDeletionPolicyOnError(receiptHandle);
                }
            }
        }

        private void applyDeletionPolicyOnSuccess(String receiptHandle) {
            if (this.deletionPolicy == SqsMessageDeletionPolicy.ON_SUCCESS || this.deletionPolicy == SqsMessageDeletionPolicy.ALWAYS || this.deletionPolicy == SqsMessageDeletionPolicy.NO_REDRIVE) {
                this.deleteMessage(receiptHandle);
            }
        }

        private void applyDeletionPolicyOnError(String receiptHandle) {
            if (this.deletionPolicy == SqsMessageDeletionPolicy.ALWAYS || this.deletionPolicy == SqsMessageDeletionPolicy.NO_REDRIVE && !this.hasRedrivePolicy) {
                this.deleteMessage(receiptHandle);
            }
        }

        private void deleteMessage(String receiptHandle) {
            SimpleMessageListenerContainer.this.getAmazonSqs().deleteMessageAsync(new DeleteMessageRequest(this.queueUrl, receiptHandle), (AsyncHandler)new DeleteMessageHandler(receiptHandle));
        }

        private org.springframework.messaging.Message<String> getMessageForExecution(Message message) {
            HashMap<String, Object> additionalHeaders = new HashMap<String, Object>();
            additionalHeaders.put("LogicalResourceId", this.logicalQueueName);
            if (this.deletionPolicy == SqsMessageDeletionPolicy.NEVER) {
                String receiptHandle = message.getReceiptHandle();
                QueueMessageAcknowledgment acknowledgment = new QueueMessageAcknowledgment(SimpleMessageListenerContainer.this.getAmazonSqs(), this.queueUrl, receiptHandle);
                additionalHeaders.put("Acknowledgment", acknowledgment);
            }
            additionalHeaders.put("Visibility", new QueueMessageVisibility(SimpleMessageListenerContainer.this.getAmazonSqs(), this.queueUrl, message.getReceiptHandle()));
            return QueueMessageUtils.createMessage(message, additionalHeaders);
        }
    }

    private static final class MessageGroup {
        private final List<Message> messages;

        MessageGroup(Message message) {
            this.messages = Collections.singletonList(message);
        }

        MessageGroup(List<Message> messages) {
            this.messages = messages;
        }

        public List<Message> getMessages() {
            return this.messages;
        }
    }

    private final class AsynchronousMessageListener
    implements Runnable {
        private final AbstractMessageListenerContainer.QueueAttributes queueAttributes;
        private final String logicalQueueName;

        private AsynchronousMessageListener(String logicalQueueName, AbstractMessageListenerContainer.QueueAttributes queueAttributes) {
            this.logicalQueueName = logicalQueueName;
            this.queueAttributes = queueAttributes;
        }

        @Override
        public void run() {
            while (SimpleMessageListenerContainer.this.isQueueRunning(this.logicalQueueName)) {
                try {
                    ReceiveMessageResult receiveMessageResult = SimpleMessageListenerContainer.this.getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest());
                    List<MessageGroup> messageGroups = this.queueAttributes.isFifo() ? this.groupByMessageGroupId(receiveMessageResult) : this.groupByMessage(receiveMessageResult);
                    CountDownLatch messageBatchLatch = new CountDownLatch(messageGroups.size());
                    for (MessageGroup messageGroup : messageGroups) {
                        MessageGroupExecutor messageGroupExecutor = new MessageGroupExecutor(this.logicalQueueName, messageGroup, this.queueAttributes);
                        SimpleMessageListenerContainer.this.getTaskExecutor().execute((Runnable)new SignalExecutingRunnable(messageBatchLatch, messageGroupExecutor));
                    }
                    try {
                        messageBatchLatch.await();
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                catch (Exception e) {
                    SimpleMessageListenerContainer.this.getLogger().warn("An Exception occurred while polling queue '{}'. The failing operation will be retried in {} milliseconds", new Object[]{this.logicalQueueName, SimpleMessageListenerContainer.this.getBackOffTime(), e});
                    try {
                        Thread.sleep(SimpleMessageListenerContainer.this.getBackOffTime());
                    }
                    catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
            SimpleMessageListenerContainer.this.scheduledFutureByQueue.remove(this.logicalQueueName);
        }

        private List<MessageGroup> groupByMessageGroupId(ReceiveMessageResult receiveMessageResult) {
            return receiveMessageResult.getMessages().stream().collect(Collectors.groupingBy(message -> (String)message.getAttributes().get(MessageSystemAttributeName.MessageGroupId.name()))).values().stream().map(MessageGroup::new).collect(Collectors.toList());
        }

        private List<MessageGroup> groupByMessage(ReceiveMessageResult receiveMessageResult) {
            return receiveMessageResult.getMessages().stream().map(MessageGroup::new).collect(Collectors.toList());
        }
    }

    private static final class SignalExecutingRunnable
    implements Runnable {
        private final CountDownLatch countDownLatch;
        private final Runnable runnable;

        private SignalExecutingRunnable(CountDownLatch endSignal, Runnable runnable) {
            this.countDownLatch = endSignal;
            this.runnable = runnable;
        }

        @Override
        public void run() {
            try {
                this.runnable.run();
            }
            finally {
                this.countDownLatch.countDown();
            }
        }
    }
}

