/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.aws2.sqs;

import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.clock.Clock;
import org.apache.camel.component.aws2.sqs.Sqs2Configuration;
import org.apache.camel.component.aws2.sqs.Sqs2Endpoint;
import org.apache.camel.component.aws2.sqs.Sqs2MessageHelper;
import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.ScheduledPollConsumerScheduler;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.support.DefaultScheduledPollConsumerScheduler;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
import org.apache.commons.io.function.IOConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResponse;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
import software.amazon.awssdk.services.sqs.model.QueueDeletedRecentlyException;
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;

public class Sqs2Consumer
extends ScheduledBatchPollingConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(Sqs2Consumer.class);
    private TimeoutExtender timeoutExtender;
    private ScheduledFuture<?> scheduledFuture;
    private ScheduledExecutorService scheduledExecutor;
    private PollingTask pollingTask;
    private final String sqsConsumerToString;

    public Sqs2Consumer(Sqs2Endpoint endpoint, Processor processor) {
        super((Endpoint)endpoint, processor);
        this.sqsConsumerToString = "SqsConsumer[%s]".formatted(URISupport.sanitizeUri((String)endpoint.getEndpointUri()));
    }

    protected int poll() throws Exception {
        this.shutdownRunningTask = null;
        this.pendingExchanges = 0;
        Object messages = this.pollingTask.call();
        this.forceConsumerAsReady();
        Queue<Exchange> exchanges = this.createExchanges((List<software.amazon.awssdk.services.sqs.model.Message>)messages);
        return this.processBatch(CastUtils.cast(exchanges));
    }

    protected Queue<Exchange> createExchanges(List<software.amazon.awssdk.services.sqs.model.Message> messages) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Received {} messages in this poll", (Object)messages.size());
        }
        LinkedList<Exchange> answer = new LinkedList<Exchange>();
        for (software.amazon.awssdk.services.sqs.model.Message message : messages) {
            Exchange exchange = this.createExchange(message);
            answer.add(exchange);
        }
        return answer;
    }

    public int processBatch(Queue<Object> exchanges) throws Exception {
        int total = exchanges.size();
        for (int index = 0; index < total && this.isBatchAllowed(); ++index) {
            Exchange exchange = (Exchange)ObjectHelper.cast(Exchange.class, (Object)exchanges.poll());
            exchange.setProperty(ExchangePropertyKey.BATCH_INDEX, (Object)index);
            exchange.setProperty(ExchangePropertyKey.BATCH_SIZE, (Object)total);
            exchange.setProperty(ExchangePropertyKey.BATCH_COMPLETE, (Object)(index == total - 1 ? 1 : 0));
            this.pendingExchanges = total - index - 1;
            if (this.timeoutExtender != null) {
                this.timeoutExtender.add(exchange);
            }
            exchange.getExchangeExtension().addOnCompletion(new Synchronization(){

                public void onComplete(Exchange exchange) {
                    Sqs2Consumer.this.processCommit(exchange);
                }

                public void onFailure(Exchange exchange) {
                    Sqs2Consumer.this.processRollback(exchange);
                }

                public String toString() {
                    return "SqsConsumerOnCompletion";
                }
            });
            AsyncCallback cb = this.defaultConsumerCallback(exchange, true);
            try {
                this.getAsyncProcessor().process(exchange, cb);
                continue;
            }
            catch (Error e) {
                if (this.timeoutExtender != null) {
                    this.timeoutExtender.cancel();
                    this.timeoutExtender.entries.clear();
                }
                throw e;
            }
        }
        return total;
    }

    protected void processCommit(Exchange exchange) {
        try {
            if (this.shouldDelete(exchange)) {
                String receiptHandle = (String)exchange.getIn().getHeader("CamelAwsSqsReceiptHandle", String.class);
                DeleteMessageRequest.Builder deleteRequest = DeleteMessageRequest.builder().queueUrl(this.getQueueUrl()).receiptHandle(receiptHandle);
                LOG.trace("Deleting message with receipt handle {}...", (Object)receiptHandle);
                this.getClient().deleteMessage((DeleteMessageRequest)deleteRequest.build());
                LOG.trace("Deleted message with receipt handle {}...", (Object)receiptHandle);
            }
        }
        catch (SdkException e) {
            this.getExceptionHandler().handleException("Error occurred during deleting message. This exception is ignored.", exchange, (Throwable)e);
        }
    }

    private boolean shouldDelete(Exchange exchange) {
        boolean shouldDeleteByFilter = exchange.getProperty("CamelAwsSqsDeleteFiltered") != null && this.getConfiguration().isDeleteIfFiltered() && Sqs2Consumer.passedThroughFilter(exchange);
        return this.getConfiguration().isDeleteAfterRead() || shouldDeleteByFilter;
    }

    private static boolean passedThroughFilter(Exchange exchange) {
        return (Boolean)exchange.getProperty("CamelAwsSqsDeleteFiltered", (Object)false, Boolean.class);
    }

    protected void processRollback(Exchange exchange) {
        Exception cause = exchange.getException();
        if (cause != null) {
            this.getExceptionHandler().handleException("Error during processing exchange. Will attempt to process the message on next poll.", exchange, (Throwable)cause);
        }
    }

    protected Sqs2Configuration getConfiguration() {
        return this.getEndpoint().getConfiguration();
    }

    protected SqsClient getClient() {
        return this.getEndpoint().getClient();
    }

    protected String getQueueUrl() {
        return this.getEndpoint().getQueueUrl();
    }

    public Sqs2Endpoint getEndpoint() {
        return (Sqs2Endpoint)super.getEndpoint();
    }

    public Exchange createExchange(software.amazon.awssdk.services.sqs.model.Message msg) {
        return this.createExchange(this.getEndpoint().getExchangePattern(), msg);
    }

    private Exchange createExchange(ExchangePattern pattern, software.amazon.awssdk.services.sqs.model.Message msg) {
        Exchange exchange = this.createExchange(true);
        exchange.setPattern(pattern);
        Message message = exchange.getIn();
        message.setBody((Object)msg.body());
        message.setHeaders(new HashMap(msg.attributesAsStrings()));
        message.setHeader("CamelAwsSqsMessageId", (Object)msg.messageId());
        message.setHeader("CamelAwsSqsMD5OfBody", (Object)msg.md5OfBody());
        message.setHeader("CamelAwsSqsReceiptHandle", (Object)msg.receiptHandle());
        message.setHeader("CamelAwsSqsAttributes", (Object)msg.attributes());
        message.setHeader("CamelAwsSqsMessageAttributes", (Object)msg.messageAttributes());
        HeaderFilterStrategy headerFilterStrategy = this.getEndpoint().getHeaderFilterStrategy();
        for (Map.Entry entry : msg.messageAttributes().entrySet()) {
            Object value;
            String header = (String)entry.getKey();
            if (headerFilterStrategy.applyFilterToExternalHeaders(header, value = Sqs2MessageHelper.fromMessageAttributeValue((MessageAttributeValue)entry.getValue()), exchange)) continue;
            message.setHeader(header, value);
        }
        return exchange;
    }

    public String toString() {
        return this.sqsConsumerToString;
    }

    protected void afterConfigureScheduler(ScheduledPollConsumerScheduler scheduler, boolean newScheduler) {
        if (newScheduler && scheduler instanceof DefaultScheduledPollConsumerScheduler) {
            DefaultScheduledPollConsumerScheduler defaultScheduledPollConsumerScheduler = (DefaultScheduledPollConsumerScheduler)scheduler;
            defaultScheduledPollConsumerScheduler.setConcurrentConsumers(this.getConfiguration().getConcurrentConsumers());
            int poolSize = Math.max(defaultScheduledPollConsumerScheduler.getPoolSize(), this.getConfiguration().getConcurrentConsumers());
            defaultScheduledPollConsumerScheduler.setPoolSize(poolSize);
        }
    }

    protected void doStart() throws Exception {
        this.pollingTask = new PollingTask(this.getEndpoint());
        if (this.getConfiguration().isExtendMessageVisibility() && this.scheduledExecutor == null) {
            ThreadPoolProfile profile = new ThreadPoolProfile("SqsTimeoutExtender");
            profile.setPoolSize(Integer.valueOf(1));
            profile.setAllowCoreThreadTimeOut(Boolean.valueOf(false));
            profile.setMaxQueueSize(Integer.valueOf(-1));
            this.scheduledExecutor = this.getEndpoint().getCamelContext().getExecutorServiceManager().newScheduledThreadPool((Object)this, "SqsTimeoutExtender", profile);
            Integer visibilityTimeout = this.getConfiguration().getVisibilityTimeout();
            if (visibilityTimeout != null && visibilityTimeout > 0) {
                int delay = Math.max(1, visibilityTimeout / 2);
                this.timeoutExtender = new TimeoutExtender(visibilityTimeout, delay);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} delay/repeat (seconds)", new Object[]{delay, delay, visibilityTimeout});
                }
                this.scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate(this.timeoutExtender, delay, delay, TimeUnit.SECONDS);
            }
        }
        super.doStart();
    }

    protected void doShutdown() throws Exception {
        if (this.timeoutExtender != null) {
            this.timeoutExtender.cancel();
            this.timeoutExtender = null;
        }
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(true);
            this.scheduledFuture = null;
        }
        if (this.scheduledExecutor != null) {
            this.getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow((ExecutorService)this.scheduledExecutor);
            this.scheduledExecutor = null;
        }
        if (this.pollingTask != null) {
            this.pollingTask.close();
            this.pollingTask = null;
        }
        super.doShutdown();
    }

    private static class PollingTask
    implements Callable<List<software.amazon.awssdk.services.sqs.model.Message>>,
    Closeable {
        private static final int MAX_NUMBER_OF_MESSAGES_PER_REQUEST = 10;
        private static final long RECENTLY_DELETED_QUEUE_BACKOFF_TIME_MS = 30000L;
        private static final Pattern COMMA_SEPARATED_PATTERN = Pattern.compile(",");
        private final AtomicLong queueAutoCreationScheduleTime = new AtomicLong(0L);
        private final Lock lock = new ReentrantLock();
        private final AtomicBoolean closed = new AtomicBoolean();
        private final Clock clock;
        private final SqsClient sqsClient;
        private final ExecutorService requestExecutor;
        private final ExecutorServiceManager executorServiceManager;
        private final IOConsumer<SqsClient> createQueueOperation;
        private final String queueName;
        private final String queueUrl;
        private final int maxMessagesPerPoll;
        private final Integer visibilityTimeout;
        private final Integer waitTimeSeconds;
        private final Collection<MessageSystemAttributeName> attributeNames;
        private final Collection<String> messageAttributeNames;
        private final int numberOfRequestsPerPoll;
        private final boolean queueAutoCreationEnabled;
        private final MessageSystemAttributeName sortAttributeName;

        private PollingTask(Sqs2Endpoint endpoint) {
            this.clock = endpoint.getClock();
            this.sqsClient = endpoint.getClient();
            this.executorServiceManager = endpoint.getCamelContext().getExecutorServiceManager();
            this.createQueueOperation = endpoint::createQueue;
            this.queueName = endpoint.getConfiguration().getQueueName();
            this.queueUrl = endpoint.getQueueUrl();
            this.visibilityTimeout = endpoint.getConfiguration().getVisibilityTimeout();
            this.waitTimeSeconds = endpoint.getConfiguration().getWaitTimeSeconds();
            this.messageAttributeNames = PollingTask.splitCommaSeparatedValues(endpoint.getConfiguration().getMessageAttributeNames());
            this.sortAttributeName = PollingTask.getSortAttributeName(endpoint.getConfiguration());
            this.attributeNames = PollingTask.getAttributeNames(endpoint.getConfiguration(), this.sortAttributeName);
            this.queueAutoCreationEnabled = endpoint.getConfiguration().isAutoCreateQueue();
            this.maxMessagesPerPoll = Math.max(1, endpoint.getMaxMessagesPerPoll());
            this.numberOfRequestsPerPoll = PollingTask.computeNumberOfRequestPerPoll(this.maxMessagesPerPoll);
            this.requestExecutor = this.executorServiceManager.newFixedThreadPool((Object)this, "%s[%s]".formatted(this.getClass().getSimpleName(), this.queueName), Math.min(this.numberOfRequestsPerPoll, Math.max(1, endpoint.getConfiguration().getConcurrentRequestLimit())));
        }

        @Override
        public void close() {
            this.closed.set(true);
            this.executorServiceManager.shutdownNow(this.requestExecutor);
        }

        @Override
        public List<software.amazon.awssdk.services.sqs.model.Message> call() throws IOException {
            if (this.isClosed() || this.processScheduledQueueAutoCreation()) {
                return Collections.emptyList();
            }
            PollingContext context = new PollingContext();
            List<software.amazon.awssdk.services.sqs.model.Message> messages = this.poll(context);
            if (context.errorCount() == this.numberOfRequestsPerPoll) {
                if (context.errorCount() == 1) {
                    context.rethrowIfFirstErrorIsRuntimeException();
                    throw new IOException("Error while polling", context.firstError());
                }
                throw new IOException("Error while polling - all %s requests resulted in an error, please check the logs for more details".formatted(this.numberOfRequestsPerPoll));
            }
            return messages;
        }

        private List<software.amazon.awssdk.services.sqs.model.Message> poll(PollingContext pollContext) throws IOException {
            if (this.numberOfRequestsPerPoll == 1) {
                return this.poll(this.maxMessagesPerPoll, pollContext);
            }
            try {
                CompletableFuture future = CompletableFuture.completedFuture(Collections.emptyList());
                for (int remaining = this.maxMessagesPerPoll; remaining > 0; remaining -= 10) {
                    int numberOfMessages = Math.min(remaining, 10);
                    future = PollingTask.mergeResults(future, CompletableFuture.supplyAsync(() -> this.poll(numberOfMessages, pollContext), this.requestExecutor));
                }
                return (List)((CompletableFuture)future.thenApply(this::sortIfNeeded)).get();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOG.debug("Polling interrupted", (Throwable)e);
                return Collections.emptyList();
            }
            catch (ExecutionException e) {
                throw new IOException("Error while polling", e.getCause());
            }
        }

        private List<software.amazon.awssdk.services.sqs.model.Message> poll(int maxNumberOfMessages, PollingContext context) {
            if (context.isQueueMissing()) {
                return Collections.emptyList();
            }
            try {
                return this.sqsClient.receiveMessage(this.createReceiveRequest(maxNumberOfMessages)).messages();
            }
            catch (QueueDoesNotExistException e) {
                return this.handleMissingQueueError(context, e);
            }
            catch (Exception e) {
                LOG.error("Error while polling", (Throwable)e);
                context.firePollingError(e);
                return Collections.emptyList();
            }
        }

        private List<software.amazon.awssdk.services.sqs.model.Message> handleMissingQueueError(PollingContext context, QueueDoesNotExistException error) {
            if (context.isQueueMissing()) {
                return Collections.emptyList();
            }
            UUID requestId = UUID.randomUUID();
            context.fireQueueMissing(requestId);
            if (this.queueAutoCreationEnabled) {
                this.createQueue(requestId, context);
                return Collections.emptyList();
            }
            LOG.error("Error while polling {} queue does not exists", (Object)this.queueName, (Object)error);
            context.firePollingError((Exception)((Object)error));
            return Collections.emptyList();
        }

        private ReceiveMessageRequest createReceiveRequest(int maxNumberOfMessages) {
            ReceiveMessageRequest.Builder requestBuilder = ReceiveMessageRequest.builder().queueUrl(this.queueUrl).maxNumberOfMessages(Integer.valueOf(maxNumberOfMessages)).visibilityTimeout(this.visibilityTimeout).waitTimeSeconds(this.waitTimeSeconds);
            if (!this.attributeNames.isEmpty()) {
                requestBuilder.messageSystemAttributeNames(this.attributeNames);
            }
            if (!this.messageAttributeNames.isEmpty()) {
                requestBuilder.messageAttributeNames(this.messageAttributeNames);
            }
            LOG.trace("Receiving messages with request [{}]...", (Object)requestBuilder);
            return (ReceiveMessageRequest)requestBuilder.build();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void createQueue(UUID requestId, PollingContext context) {
            this.lock.lock();
            try {
                if (this.isClosed() || context.isMissingQueueHandledInAnotherRequest(requestId)) {
                    return;
                }
                try {
                    this.createQueueOperation.accept((Object)this.sqsClient);
                }
                catch (QueueDeletedRecentlyException e) {
                    LOG.debug("Queue recently deleted, will retry after at least 30 seconds on next polling request.", (Throwable)e);
                    this.scheduleQueueAutoCreation();
                }
                catch (Exception e) {
                    LOG.error("Error while creating queue.", (Throwable)e);
                    context.firePollingError(e);
                }
            }
            finally {
                this.lock.unlock();
            }
        }

        private boolean processScheduledQueueAutoCreation() throws IOException {
            long scheduleTimeMs = this.queueAutoCreationScheduleTime.get();
            if (scheduleTimeMs == 0L) {
                return false;
            }
            long elapsedTimeMillis = this.clock.elapsed();
            if (scheduleTimeMs > elapsedTimeMillis) {
                LOG.debug("{}ms remaining until queue auto-creation is triggered", (Object)(scheduleTimeMs - elapsedTimeMillis));
                return true;
            }
            PollingContext context = new PollingContext();
            this.createQueue(UUID.randomUUID(), context);
            if (context.hasErrors()) {
                context.rethrowIfFirstErrorIsRuntimeException();
                throw new IOException("Error while creating %s queue".formatted(this.queueName), context.firstError());
            }
            this.cancelScheduledQueueAutoCreation();
            return true;
        }

        private void scheduleQueueAutoCreation() {
            this.queueAutoCreationScheduleTime.set(this.clock.elapsed() + 30000L);
        }

        private void cancelScheduledQueueAutoCreation() {
            this.queueAutoCreationScheduleTime.set(0L);
        }

        private boolean isClosed() {
            return this.closed.get();
        }

        private static List<String> splitCommaSeparatedValues(String value) {
            if (value == null || value.isEmpty()) {
                return Collections.emptyList();
            }
            return COMMA_SEPARATED_PATTERN.splitAsStream(value).map(String::trim).filter(it -> !it.isEmpty()).toList();
        }

        private static Optional<MessageSystemAttributeName> parseMessageSystemAttributeName(String attribute) {
            if (attribute == null || attribute.isEmpty()) {
                return Optional.empty();
            }
            MessageSystemAttributeName result = MessageSystemAttributeName.fromValue((String)attribute);
            if (result == MessageSystemAttributeName.UNKNOWN_TO_SDK_VERSION) {
                LOG.warn("Unsupported attribute name '{}' use one of {}", (Object)attribute, (Object)MessageSystemAttributeName.knownValues());
                return Optional.empty();
            }
            return Optional.of(result);
        }

        private static MessageSystemAttributeName getSortAttributeName(Sqs2Configuration configuration) {
            return PollingTask.parseMessageSystemAttributeName(configuration.getSortAttributeName()).filter(attribute -> {
                if (attribute == MessageSystemAttributeName.ALL) {
                    LOG.warn("The {} attribute cannot be used for sorting the received messages", (Object)MessageSystemAttributeName.ALL);
                    return false;
                }
                return true;
            }).orElse(null);
        }

        private static List<MessageSystemAttributeName> getAttributeNames(Sqs2Configuration configuration, MessageSystemAttributeName sortAttributeName) {
            ArrayList<MessageSystemAttributeName> result = new ArrayList<MessageSystemAttributeName>();
            for (String attributeName : PollingTask.splitCommaSeparatedValues(configuration.getAttributeNames())) {
                PollingTask.parseMessageSystemAttributeName(attributeName).filter(it -> !result.contains(it)).ifPresent(result::add);
            }
            if (sortAttributeName != null && !result.contains(MessageSystemAttributeName.ALL) && !result.contains(sortAttributeName)) {
                result.add(sortAttributeName);
            }
            return Collections.unmodifiableList(result);
        }

        private static int computeNumberOfRequestPerPoll(int maxMessagesPerPoll) {
            return (int)Math.ceil((double)Math.max(1, maxMessagesPerPoll) / 10.0);
        }

        private static <T> CompletableFuture<List<T>> mergeResults(CompletableFuture<List<T>> future1, CompletableFuture<List<T>> future2) {
            return future1.thenCombine(future2, (messages1, messages2) -> {
                ArrayList allMessages = new ArrayList(messages1);
                allMessages.addAll(messages2);
                return allMessages;
            });
        }

        private List<software.amazon.awssdk.services.sqs.model.Message> sortIfNeeded(List<software.amazon.awssdk.services.sqs.model.Message> messages) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Received {} messages in {} requests", (Object)messages.size(), (Object)this.numberOfRequestsPerPoll);
            }
            if (this.sortAttributeName != null) {
                return messages.stream().sorted(Comparator.comparing(message -> message.attributes().getOrDefault(this.sortAttributeName, ""))).toList();
            }
            return messages;
        }
    }

    private class TimeoutExtender
    implements Runnable {
        private static final String RECEIPT_HANDLE_IS_INVALID = "ReceiptHandleIsInvalid";
        private static final int MAX_REQUESTS = 10;
        private final int visibilityTimeout;
        private final int delayBetweenExecutions;
        private final AtomicBoolean run = new AtomicBoolean(true);
        private final Map<String, TimeoutExtenderEntry> entries = new ConcurrentHashMap<String, TimeoutExtenderEntry>();

        TimeoutExtender(int visibilityTimeout, int delayBetweenExecutions) {
            this.visibilityTimeout = visibilityTimeout;
            this.delayBetweenExecutions = delayBetweenExecutions;
        }

        public void add(Exchange exchange) {
            exchange.getExchangeExtension().addOnCompletion(new Synchronization(){

                public void onComplete(Exchange exchange) {
                    this.remove(exchange);
                }

                public void onFailure(Exchange exchange) {
                    this.remove(exchange);
                }

                private void remove(Exchange exchange) {
                    LOG.trace("Removing exchangeId {} from the TimeoutExtender, processing done", (Object)exchange.getExchangeId());
                    TimeoutExtender.this.entries.remove(exchange.getExchangeId());
                }
            });
            ChangeMessageVisibilityBatchRequestEntry entry = (ChangeMessageVisibilityBatchRequestEntry)ChangeMessageVisibilityBatchRequestEntry.builder().id(exchange.getExchangeId()).visibilityTimeout(Integer.valueOf(this.visibilityTimeout)).receiptHandle((String)exchange.getIn().getHeader("CamelAwsSqsReceiptHandle", String.class)).build();
            this.entries.put(exchange.getExchangeId(), new TimeoutExtenderEntry(entry));
        }

        public void cancel() {
            this.run.set(false);
        }

        @Override
        public void run() {
            if (this.run.get()) {
                Instant nextExpectedExecution = Instant.now().plusSeconds(Math.max(1, this.delayBetweenExecutions));
                LinkedList<TimeoutExtenderEntry> entryQueue = new LinkedList<TimeoutExtenderEntry>(this.entries.values());
                while (!entryQueue.isEmpty()) {
                    ArrayList<ChangeMessageVisibilityBatchRequestEntry> batchEntries = new ArrayList<ChangeMessageVisibilityBatchRequestEntry>();
                    while (!entryQueue.isEmpty() && batchEntries.size() < 10) {
                        TimeoutExtenderEntry nextEntry = (TimeoutExtenderEntry)entryQueue.poll();
                        if (!nextEntry.isDeadlineReachedAt(nextExpectedExecution)) continue;
                        batchEntries.add(nextEntry.extendRequest);
                    }
                    if (batchEntries.isEmpty()) continue;
                    ChangeMessageVisibilityBatchRequest request = (ChangeMessageVisibilityBatchRequest)ChangeMessageVisibilityBatchRequest.builder().queueUrl(Sqs2Consumer.this.getQueueUrl()).entries(batchEntries).build();
                    try {
                        LOG.trace("Extending visibility window by {} seconds for request entries: {}", (Object)this.visibilityTimeout, batchEntries);
                        ChangeMessageVisibilityBatchResponse br = Sqs2Consumer.this.getEndpoint().getClient().changeMessageVisibilityBatch(request);
                        if (br.hasFailed()) {
                            br.failed().forEach(failedEntry -> {
                                if (failedEntry.code().equals(RECEIPT_HANDLE_IS_INVALID)) {
                                    LOG.debug("Extended visibility window for request entry failed with invalid handle.", (Object)br.failed());
                                } else {
                                    LOG.warn("Extended visibility window for request entry failed: {}", (Object)br.failed());
                                }
                            });
                        }
                        if (!br.hasSuccessful()) continue;
                        br.successful().forEach(successEntry -> {
                            LOG.debug("Extended visibility window for request entry: {}", (Object)successEntry.id());
                            this.entries.computeIfPresent(successEntry.id(), (t, u) -> u.extendDeadline());
                        });
                    }
                    catch (SdkException e) {
                        this.logException((Exception)((Object)e), batchEntries);
                    }
                }
            }
        }

        private void logException(Exception e, List<ChangeMessageVisibilityBatchRequestEntry> entries) {
            LOG.warn("Extending visibility window failed for entries {}. Will not attempt to extend visibility further. This exception will be ignored.", entries, (Object)e);
        }

        private final class TimeoutExtenderEntry {
            private final Instant deadline;
            private final ChangeMessageVisibilityBatchRequestEntry extendRequest;

            TimeoutExtenderEntry(ChangeMessageVisibilityBatchRequestEntry extendRequest) {
                this.deadline = Instant.now().plusMillis(extendRequest.visibilityTimeout() * 800);
                this.extendRequest = extendRequest;
            }

            TimeoutExtenderEntry extendDeadline() {
                return new TimeoutExtenderEntry(this.extendRequest);
            }

            boolean isDeadlineReachedAt(Instant time) {
                return this.deadline.isBefore(time);
            }
        }
    }

    private record PollingContext(AtomicReference<UUID> missingQueueHandlerRequestId, Queue<Exception> errors) {
        private PollingContext() {
            this(new AtomicReference<UUID>(), new ConcurrentLinkedQueue<Exception>());
        }

        PollingContext {
            Objects.requireNonNull(missingQueueHandlerRequestId);
            Objects.requireNonNull(errors);
        }

        private void fireQueueMissing(UUID requestId) {
            this.missingQueueHandlerRequestId.compareAndSet(null, requestId);
        }

        private void firePollingError(Exception error) {
            this.errors.offer(error);
        }

        private boolean isQueueMissing() {
            return this.missingQueueHandlerRequestId.get() != null;
        }

        private boolean isMissingQueueHandledInAnotherRequest(UUID requestId) {
            UUID handlingRequestId = this.missingQueueHandlerRequestId.get();
            return handlingRequestId != null && !requestId.equals(handlingRequestId);
        }

        private boolean hasErrors() {
            return !this.errors.isEmpty();
        }

        private int errorCount() {
            return this.errors.size();
        }

        private Exception firstError() {
            return this.errors.peek();
        }

        private void rethrowIfFirstErrorIsRuntimeException() {
            Exception exception = this.firstError();
            if (exception instanceof RuntimeException) {
                RuntimeException runtimeError = (RuntimeException)exception;
                throw runtimeError;
            }
        }
    }
}

