package org.duracloud.common.queue.aws;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
import com.amazonaws.services.sqs.model.DeleteMessageBatchResultEntry;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import com.amazonaws.services.sqs.model.GetQueueUrlRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.QueueAttributeName;
import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.duracloud.common.error.DuraCloudRuntimeException;
import org.duracloud.common.queue.TaskException;
import org.duracloud.common.queue.TaskNotFoundException;
import org.duracloud.common.queue.TaskQueue;
import org.duracloud.common.queue.TimeoutException;
import org.duracloud.common.queue.task.Task;
import org.duracloud.common.retry.Retriable;
import org.duracloud.common.retry.Retrier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/common-queue-4.0.1.jar:org/duracloud/common/queue/aws/SQSTaskQueue.class */
public class SQSTaskQueue implements TaskQueue {
    private static Logger log = LoggerFactory.getLogger(SQSTaskQueue.class);
    private AmazonSQSClient sqsClient;
    private String queueName;
    private String queueUrl;
    private Integer visibilityTimeout;

    /* loaded from: input_file:WEB-INF/lib/common-queue-4.0.1.jar:org/duracloud/common/queue/aws/SQSTaskQueue$MsgProp.class */
    public enum MsgProp {
        MSG_ID,
        RECEIPT_HANDLE
    }

    public SQSTaskQueue(String str) {
        this(new AmazonSQSClient(), str);
    }

    public SQSTaskQueue(AmazonSQSClient amazonSQSClient, String str) {
        this.sqsClient = amazonSQSClient;
        this.queueName = str;
        this.queueUrl = getQueueUrl();
        this.visibilityTimeout = getVisibilityTimeout();
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public String getName() {
        return this.queueName;
    }

    protected Task marshallTask(Message message) {
        Properties properties = new Properties();
        Task task = null;
        try {
            properties.load(new StringReader(message.getBody()));
            if (properties.containsKey("type")) {
                task = new Task();
                for (String str : properties.stringPropertyNames()) {
                    if (str.equals("type")) {
                        task.setType(Task.Type.valueOf(properties.getProperty(str)));
                    } else {
                        task.addProperty(str, properties.getProperty(str));
                    }
                }
                task.addProperty(MsgProp.MSG_ID.name(), message.getMessageId());
                task.addProperty(MsgProp.RECEIPT_HANDLE.name(), message.getReceiptHandle());
            } else {
                log.error("SQS message from queue: " + this.queueName + ", queueUrl: " + this.queueUrl + " does not contain a 'task type'");
            }
        } catch (IOException e) {
            log.error("Error creating Task", (Throwable) e);
        }
        return task;
    }

    protected String unmarshallTask(Task task) {
        Properties properties = new Properties();
        properties.setProperty("type", task.getType().name());
        for (String str : task.getProperties().keySet()) {
            String property = task.getProperty(str);
            if (null != property) {
                properties.setProperty(str, property);
            }
        }
        StringWriter stringWriter = new StringWriter();
        String str2 = null;
        try {
            properties.store(stringWriter, (String) null);
            str2 = stringWriter.toString();
        } catch (IOException e) {
            log.error("Error unmarshalling Task, queue: " + this.queueName + ", msgBody: " + str2, (Throwable) e);
        }
        return str2;
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public void put(Task task) {
        try {
            final String unmarshallTask = unmarshallTask(task);
            new Retrier(4, 10000, 2).execute(new Retriable() { // from class: org.duracloud.common.queue.aws.SQSTaskQueue.1
                @Override // org.duracloud.common.retry.Retriable
                public Object retry() throws Exception {
                    SQSTaskQueue.this.sqsClient.sendMessage(new SendMessageRequest(SQSTaskQueue.this.queueUrl, unmarshallTask));
                    return null;
                }
            });
            log.info("SQS message successfully placed {} on queue - queue: {}", task, this.queueName);
        } catch (Exception e) {
            log.error("failed to place {} on {} due to {}", task, this.queueName, e.getMessage());
            throw new DuraCloudRuntimeException(e);
        }
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public void put(Task... taskArr) {
        HashSet hashSet = new HashSet();
        hashSet.addAll(Arrays.asList(taskArr));
        put(hashSet);
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public void put(Set<Task> set) {
        HashSet hashSet = new HashSet();
        Iterator<Task> it = set.iterator();
        while (it.hasNext()) {
            hashSet.add(new SendMessageBatchRequestEntry().withMessageBody(unmarshallTask(it.next())).withId(hashSet.size() + ""));
            if (hashSet.size() == 10) {
                sendBatchMessages(hashSet);
                hashSet.clear();
            }
        }
        if (hashSet.isEmpty()) {
            return;
        }
        sendBatchMessages(hashSet);
    }

    private void sendBatchMessages(Set<SendMessageBatchRequestEntry> set) {
        try {
            final SendMessageBatchRequest withEntries = new SendMessageBatchRequest().withQueueUrl(this.queueUrl).withEntries(set);
            new Retrier(4, 5000, 2).execute(new Retriable() { // from class: org.duracloud.common.queue.aws.SQSTaskQueue.2
                @Override // org.duracloud.common.retry.Retriable
                public Object retry() throws Exception {
                    SQSTaskQueue.this.sqsClient.sendMessageBatch(withEntries);
                    return null;
                }
            });
            log.info("{} SQS messages successfully placed on queue: {}", Integer.valueOf(set.size()), this.queueName);
        } catch (Exception e) {
            log.error("failed to place {} on {} due to {}", set, this.queueName, e.getMessage());
            throw new DuraCloudRuntimeException(e);
        }
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public Set<Task> take(int i) throws TimeoutException {
        ReceiveMessageResult receiveMessage = this.sqsClient.receiveMessage(new ReceiveMessageRequest().withQueueUrl(this.queueUrl).withMaxNumberOfMessages(Integer.valueOf(i)).withAttributeNames("SentTimestamp", "ApproximateReceiveCount"));
        if (receiveMessage.getMessages() == null || receiveMessage.getMessages().size() <= 0) {
            throw new TimeoutException("No tasks available from queue: " + this.queueName + ", queueUrl: " + this.queueUrl);
        }
        HashSet hashSet = new HashSet();
        for (Message message : receiveMessage.getMessages()) {
            try {
                log.info("SQS message received - queue: {}, queueUrl: {}, msgId: {}, preworkQueueTime: {}, receiveCount: {}", this.queueName, this.queueUrl, message.getMessageId(), DurationFormatUtils.formatDuration(Long.valueOf(System.currentTimeMillis() - Long.valueOf(Long.parseLong(message.getAttributes().get("SentTimestamp"))).longValue()).longValue(), "HH:mm:ss,SSS"), message.getAttributes().get("ApproximateReceiveCount"));
            } catch (NumberFormatException e) {
                log.error("Error converting 'SentTimestamp' SQS message attribute to Long, messageId: " + message.getMessageId(), (Throwable) e);
            }
            Task marshallTask = marshallTask(message);
            marshallTask.setVisibilityTimeout(this.visibilityTimeout);
            hashSet.add(marshallTask);
        }
        return hashSet;
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public Task take() throws TimeoutException {
        return take(1).iterator().next();
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public void extendVisibilityTimeout(Task task) throws TaskNotFoundException {
        try {
            this.sqsClient.changeMessageVisibility(new ChangeMessageVisibilityRequest().withQueueUrl(this.queueUrl).withReceiptHandle(task.getProperty(MsgProp.RECEIPT_HANDLE.name())).withVisibilityTimeout(task.getVisibilityTimeout()));
            log.info("extended visibility timeout {} seconds for {}", task.getVisibilityTimeout(), task);
        } catch (ReceiptHandleIsInvalidException e) {
            log.error("failed to extend visibility timeout on task " + task + ": " + e.getMessage(), (Throwable) e);
            throw new TaskNotFoundException(e);
        }
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public void deleteTask(Task task) throws TaskNotFoundException {
        try {
            this.sqsClient.deleteMessage(new DeleteMessageRequest().withQueueUrl(this.queueUrl).withReceiptHandle(task.getProperty(MsgProp.RECEIPT_HANDLE.name())));
            log.info("successfully deleted {}", task);
        } catch (ReceiptHandleIsInvalidException e) {
            log.error("failed to delete task " + task + ": " + e.getMessage(), (Throwable) e);
            throw new TaskNotFoundException(e);
        }
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public void deleteTasks(Set<Task> set) throws TaskException {
        if (set.size() > 10) {
            throw new IllegalArgumentException("task set must contain 10 or fewer tasks");
        }
        try {
            ArrayList arrayList = new ArrayList(set.size());
            for (Task task : set) {
                arrayList.add(new DeleteMessageBatchRequestEntry().withId(task.getProperty(MsgProp.MSG_ID.name())).withReceiptHandle(task.getProperty(MsgProp.RECEIPT_HANDLE.name())));
            }
            DeleteMessageBatchResult deleteMessageBatch = this.sqsClient.deleteMessageBatch(new DeleteMessageBatchRequest().withQueueUrl(this.queueUrl).withEntries(arrayList));
            List<BatchResultErrorEntry> failed = deleteMessageBatch.getFailed();
            if (failed != null && failed.size() > 0) {
                Iterator<BatchResultErrorEntry> it = failed.iterator();
                while (it.hasNext()) {
                    log.info("failed to delete message: " + it.next());
                }
            }
            Iterator<DeleteMessageBatchResultEntry> it2 = deleteMessageBatch.getSuccessful().iterator();
            while (it2.hasNext()) {
                log.info("successfully deleted {}", it2.next());
            }
        } catch (AmazonServiceException e) {
            log.error("failed to batch delete tasks " + set + ": " + e.getMessage(), (Throwable) e);
            throw new TaskException(e);
        }
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public void requeue(Task task) {
        int attempts = task.getAttempts();
        task.incrementAttempts();
        try {
            deleteTask(task);
        } catch (TaskNotFoundException e) {
            log.error("unable to delete " + task + " ignoring - requeuing anyway");
        }
        put(task);
        log.warn("requeued {} after {} failed attempts.", task, Integer.valueOf(attempts));
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public Integer size() {
        return Integer.valueOf(Integer.parseInt(queryQueueAttributes(QueueAttributeName.ApproximateNumberOfMessages).getAttributes().get(QueueAttributeName.ApproximateNumberOfMessages.name())));
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public Integer sizeIncludingInvisibleAndDelayed() {
        Map<String, String> attributes = queryQueueAttributes(QueueAttributeName.ApproximateNumberOfMessages, QueueAttributeName.ApproximateNumberOfMessagesNotVisible, QueueAttributeName.ApproximateNumberOfMessagesDelayed).getAttributes();
        int i = 0;
        for (String str : attributes.keySet()) {
            String str2 = attributes.get(str);
            log.debug("retrieved attribute: {}={}", str, str2);
            i += Integer.parseInt(str2);
        }
        log.debug("calculated size: {}", Integer.valueOf(i));
        return Integer.valueOf(i);
    }

    private Integer getVisibilityTimeout() {
        return Integer.valueOf(Integer.parseInt(queryQueueAttributes(QueueAttributeName.VisibilityTimeout).getAttributes().get(QueueAttributeName.VisibilityTimeout.name())));
    }

    private String getQueueUrl() {
        return this.sqsClient.getQueueUrl(new GetQueueUrlRequest().withQueueName(this.queueName)).getQueueUrl();
    }

    private GetQueueAttributesResult queryQueueAttributes(QueueAttributeName... queueAttributeNameArr) {
        return this.sqsClient.getQueueAttributes(new GetQueueAttributesRequest().withQueueUrl(this.queueUrl).withAttributeNames(queueAttributeNameArr));
    }
}
