/*
 * Decompiled with CFR 0.152.
 */
package tdl.participant.queue.connector;

import com.amazonaws.SdkClientException;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tdl.participant.queue.connector.EventDeserializationException;
import tdl.participant.queue.connector.EventProcessingException;
import tdl.participant.queue.connector.EventSerializationException;
import tdl.participant.queue.connector.HandleRule;
import tdl.participant.queue.connector.QueueEvent;
import tdl.participant.queue.connector.QueueEventHandlers;
import tdl.participant.queue.connector.QueueSize;

public class SqsEventQueue {
    private static final Logger log = LoggerFactory.getLogger(SqsEventQueue.class);
    private static final String ATTRIBUTE_EVENT_NAME = "name";
    private static final String ATTRIBUTE_EVENT_VERSION = "version";
    private final AmazonSQS client;
    private final String queueUrl;
    private final ObjectMapper mapper;
    private MessageProcessingThread messageProcessingThread;

    public SqsEventQueue(AmazonSQS client, String queueUrl) {
        this.client = client;
        this.queueUrl = queueUrl;
        this.mapper = new ObjectMapper();
    }

    public String getQueueUrl() {
        return this.queueUrl;
    }

    public QueueSize getQueueSize() {
        GetQueueAttributesResult queueAttributes = this.client.getQueueAttributes(this.queueUrl, Collections.singletonList("All"));
        int available = Integer.parseInt((String)queueAttributes.getAttributes().get("ApproximateNumberOfMessages"));
        int notVisible = Integer.parseInt((String)queueAttributes.getAttributes().get("ApproximateNumberOfMessagesNotVisible"));
        int delayed = Integer.parseInt((String)queueAttributes.getAttributes().get("ApproximateNumberOfMessagesDelayed"));
        return new QueueSize(available, notVisible, delayed);
    }

    public void send(Object object) throws EventSerializationException, EventProcessingException {
        this.send(object, 10000, 5000);
    }

    public void send(Object object, int sdkClientExecutionTimeout, int sdkRequestTimeout) throws EventSerializationException, EventProcessingException {
        QueueEvent annotation = object.getClass().getAnnotation(QueueEvent.class);
        if (annotation == null) {
            throw new EventSerializationException(object.getClass() + " not a QueueEvent");
        }
        String eventName = annotation.name();
        String eventVersion = annotation.version();
        try {
            SendMessageRequest sendMessageRequest = new SendMessageRequest();
            sendMessageRequest.setSdkClientExecutionTimeout(sdkClientExecutionTimeout);
            sendMessageRequest.setSdkRequestTimeout(sdkRequestTimeout);
            sendMessageRequest.setQueueUrl(this.queueUrl);
            sendMessageRequest.setMessageBody(this.mapper.writeValueAsString(object));
            sendMessageRequest.addMessageAttributesEntry(ATTRIBUTE_EVENT_NAME, new MessageAttributeValue().withDataType("String").withStringValue(eventName));
            sendMessageRequest.addMessageAttributesEntry(ATTRIBUTE_EVENT_VERSION, new MessageAttributeValue().withDataType("String").withStringValue(eventVersion));
            this.client.sendMessage(sendMessageRequest);
        }
        catch (JsonProcessingException e) {
            throw new EventSerializationException("Failed to serialize event of type " + object.getClass(), (Exception)((Object)e));
        }
        catch (SdkClientException e) {
            throw new EventProcessingException("Failed to send message due to connectivity issues.", e);
        }
    }

    public void subscribeToMessages(QueueEventHandlers eventHandlers) {
        this.messageProcessingThread = new MessageProcessingThread(this.client, this.queueUrl, this.mapper, eventHandlers);
        this.messageProcessingThread.start();
    }

    public void unsubscribeFromMessages() throws InterruptedException {
        if (this.messageProcessingThread == null) {
            throw new IllegalStateException("Cannot unsubscribe without first being subscribed");
        }
        this.messageProcessingThread.signalStop();
        this.messageProcessingThread.join();
    }

    static class MessageProcessingThread
    extends Thread {
        static final int MAX_NUMBER_OF_MESSAGES = 10;
        static final int MAX_AWS_WAIT = 20;
        private final ReceiveMessageRequest receiveMessageRequest;
        private final AmazonSQS client;
        private final ObjectMapper mapper;
        private final int maxProcessingThreads = 10;
        private final ExecutorService executorService;
        private final DeleteMessageBatchRequest deleteMessageBatchRequest;
        private final AtomicBoolean shouldContinue;
        private final QueueEventHandlers eventHandlers;

        MessageProcessingThread(AmazonSQS client, String queueUrl, ObjectMapper mapper, QueueEventHandlers eventHandlers) {
            this.client = client;
            this.mapper = mapper;
            this.eventHandlers = eventHandlers;
            this.receiveMessageRequest = new ReceiveMessageRequest();
            this.receiveMessageRequest.setMaxNumberOfMessages(Integer.valueOf(10));
            this.receiveMessageRequest.setQueueUrl(queueUrl);
            this.receiveMessageRequest.setWaitTimeSeconds(Integer.valueOf(20));
            this.receiveMessageRequest.setMessageAttributeNames(Arrays.asList(SqsEventQueue.ATTRIBUTE_EVENT_NAME, SqsEventQueue.ATTRIBUTE_EVENT_VERSION));
            this.deleteMessageBatchRequest = new DeleteMessageBatchRequest();
            this.deleteMessageBatchRequest.setQueueUrl(queueUrl);
            this.executorService = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory());
            this.shouldContinue = new AtomicBoolean(true);
        }

        @Override
        public void run() {
            while (this.shouldContinue.get()) {
                this.processBatch();
            }
        }

        void signalStop() throws InterruptedException {
            this.shouldContinue.set(false);
            this.client.shutdown();
            this.executorService.shutdown();
            this.executorService.awaitTermination(30L, TimeUnit.SECONDS);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void processBatch() {
            ExecutorCompletionService completionService = new ExecutorCompletionService(this.executorService);
            List messages = this.client.receiveMessage(this.receiveMessageRequest).getMessages();
            messages.forEach(message -> completionService.submit(() -> this.process((Message)message)));
            int completed = 0;
            ArrayList successfulMessages = new ArrayList();
            while (completed < messages.size()) {
                try {
                    Future resultFuture = completionService.take();
                    successfulMessages.add(resultFuture.get());
                }
                catch (Exception e) {
                    log.error("Failed to process queue message", (Throwable)e);
                }
                finally {
                    ++completed;
                }
            }
            if (successfulMessages.size() > 0) {
                List deleteMessageEntries = successfulMessages.stream().map(message -> new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getReceiptHandle())).collect(Collectors.toList());
                this.deleteMessageBatchRequest.setEntries(deleteMessageEntries);
                this.client.deleteMessageBatch(this.deleteMessageBatchRequest);
            }
        }

        private Message process(Message message) throws EventDeserializationException, EventProcessingException {
            if (!message.getMessageAttributes().containsKey(SqsEventQueue.ATTRIBUTE_EVENT_NAME)) {
                throw new EventDeserializationException("Message does not contain the name attribute");
            }
            if (!message.getMessageAttributes().containsKey(SqsEventQueue.ATTRIBUTE_EVENT_VERSION)) {
                throw new EventDeserializationException("Message does not contain the version attribute");
            }
            log.debug("Attributes for message:" + message.getBody() + " -> " + message.getMessageAttributes().entrySet());
            String eventName = ((MessageAttributeValue)message.getMessageAttributes().get(SqsEventQueue.ATTRIBUTE_EVENT_NAME)).getStringValue();
            String eventVersion = ((MessageAttributeValue)message.getMessageAttributes().get(SqsEventQueue.ATTRIBUTE_EVENT_VERSION)).getStringValue();
            if (!this.eventHandlers.canHandle(eventName, eventVersion)) {
                throw new EventProcessingException("No event handlers for: " + eventName + ", version: " + eventVersion);
            }
            HandleRule handleRule = this.eventHandlers.getHandleRuleFor(eventName, eventVersion);
            try {
                Object object = this.mapper.readerFor(handleRule.getType()).readValue(message.getBody());
                this.eventHandlers.getBeforeEventInspector().inspect(eventName, eventVersion, object);
                handleRule.getConsumer().accept(object);
                this.eventHandlers.getAfterEventInspector().inspect(eventName, eventVersion, object);
                return message;
            }
            catch (IOException e) {
                throw new EventDeserializationException("Cannot deserialize message into an instance of " + handleRule.getType() + "." + message.getBody(), e);
            }
            catch (Exception e) {
                throw new EventProcessingException("Exception while consuming individual message: " + message.getBody(), e);
            }
        }
    }
}

