/*
 * Decompiled with CFR 0.152.
 */
package io.awspring.cloud.sqs.listener.acknowledgement;

import io.awspring.cloud.sqs.LifecycleHandler;
import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.listener.ContainerOptions;
import io.awspring.cloud.sqs.listener.TaskExecutorAware;
import io.awspring.cloud.sqs.listener.acknowledgement.AbstractOrderingAcknowledgementProcessor;
import io.awspring.cloud.sqs.listener.acknowledgement.ImmediateAcknowledgementProcessor;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.Message;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;

public class BatchingAcknowledgementProcessor<T>
extends AbstractOrderingAcknowledgementProcessor<T>
implements TaskExecutorAware {
    private static final Logger logger = LoggerFactory.getLogger(BatchingAcknowledgementProcessor.class);
    private BufferingAcknowledgementProcessor<T> acknowledgementProcessor;
    private BlockingQueue<Message<T>> acks;
    private Integer ackThreshold;
    private Duration ackInterval;
    private TaskExecutor taskExecutor;
    private TaskScheduler taskScheduler;
    private Duration acknowledgementShutdownTimeout;

    @Override
    protected void doConfigure(ContainerOptions<?, ?> containerOptions) {
        this.ackInterval = containerOptions.getAcknowledgementInterval();
        this.ackThreshold = containerOptions.getAcknowledgementThreshold();
        this.acknowledgementShutdownTimeout = containerOptions.getAcknowledgementShutdownTimeout();
    }

    @Override
    public void setTaskExecutor(TaskExecutor taskExecutor) {
        Assert.notNull((Object)taskExecutor, (String)"taskExecutor cannot be null");
        this.taskExecutor = taskExecutor;
    }

    @Override
    protected CompletableFuture<Void> doOnAcknowledge(Message<T> message) {
        if (!this.acks.offer(message)) {
            logger.warn("Acknowledgement queue full, dropping acknowledgement for message {}", (Object)MessageHeaderUtils.getId(message));
        }
        logger.trace("Received message {} to ack in {}. Primary queue size: {}", new Object[]{MessageHeaderUtils.getId(message), this.getId(), this.acks.size()});
        return CompletableFuture.completedFuture(null);
    }

    @Override
    protected CompletableFuture<Void> doOnAcknowledge(Collection<Message<T>> messages) {
        messages.forEach(this::onAcknowledge);
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public void doStart() {
        Assert.notNull((Object)this.ackInterval, (String)"ackInterval not set");
        Assert.notNull((Object)this.ackThreshold, (String)"ackThreshold not set");
        Assert.notNull((Object)this.taskExecutor, (String)"executor not set");
        Assert.notNull((Object)this.acknowledgementShutdownTimeout, (String)"timeout not set");
        Assert.state((this.ackInterval != Duration.ZERO || this.ackThreshold > 0 ? 1 : 0) != 0, () -> this.getClass().getSimpleName() + " cannot be used with Duration.ZERO and acknowledgement threshold 0.Consider using a " + ImmediateAcknowledgementProcessor.class + "instead");
        this.acks = new LinkedBlockingQueue<Message<T>>();
        this.taskScheduler = this.createTaskScheduler();
        this.acknowledgementProcessor = this.createAcknowledgementProcessor();
        this.taskExecutor.execute(this.acknowledgementProcessor);
    }

    protected TaskScheduler createTaskScheduler() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setThreadNamePrefix(this.getId() + "-");
        scheduler.initialize();
        return scheduler;
    }

    protected BufferingAcknowledgementProcessor<T> createAcknowledgementProcessor() {
        return new BufferingAcknowledgementProcessor(this);
    }

    @Override
    public void doStop() {
        try {
            this.acknowledgementProcessor.waitAcknowledgementsToFinish();
        }
        catch (Exception e) {
            logger.error("Error waiting for acknowledgements to finish. Proceeding with shutdown.", (Throwable)e);
        }
        LifecycleHandler.get().dispose(this.taskScheduler);
    }

    private static class BufferingAcknowledgementProcessor<T>
    implements Runnable {
        private final BlockingQueue<Message<T>> acks;
        private final Integer ackThreshold;
        private final BatchingAcknowledgementProcessor<T> parent;
        private final Map<String, BlockingQueue<Message<T>>> acksBuffer;
        private final Duration ackShutdownTimeout;
        private final AcknowledgementExecutionContext<T> context;
        private final ScheduledAcknowledgementExecution<T> scheduledExecution;
        private final ThresholdAcknowledgementExecutor<T> thresholdAcknowledgementExecution;
        private final Function<Message<T>, String> messageGroupingFunction;

        private BufferingAcknowledgementProcessor(BatchingAcknowledgementProcessor<T> parent) {
            this.acks = parent.acks;
            this.ackThreshold = parent.ackThreshold;
            this.ackShutdownTimeout = parent.acknowledgementShutdownTimeout;
            this.parent = parent;
            this.acksBuffer = new ConcurrentHashMap<String, BlockingQueue<Message<T>>>();
            this.messageGroupingFunction = parent.getMessageGroupingFunction();
            this.context = new AcknowledgementExecutionContext<T>(parent.getId(), this.acksBuffer, new ReentrantLock(), parent::isRunning, parent::sendToExecutor);
            this.scheduledExecution = new ScheduledAcknowledgementExecution<T>(parent.ackInterval, parent.taskScheduler, this.context);
            this.thresholdAcknowledgementExecution = new ThresholdAcknowledgementExecutor<T>(parent.ackThreshold, this.context);
        }

        @Override
        public void run() {
            logger.debug("Starting acknowledgement processor thread with batchSize: {}", (Object)this.ackThreshold);
            this.scheduledExecution.start();
            while (this.shouldKeepPollingAcks()) {
                try {
                    Message<T> polledMessage = this.acks.poll(1L, TimeUnit.SECONDS);
                    if (polledMessage == null) continue;
                    this.addMessageToBuffer(polledMessage);
                    this.thresholdAcknowledgementExecution.checkAndExecute();
                }
                catch (Exception e) {
                    logger.error("Error while handling acknowledgements for {}, resuming.", (Object)this.parent.getId(), (Object)e);
                }
            }
            logger.debug("Acknowledgement processor thread stopped");
        }

        private boolean shouldKeepPollingAcks() {
            return this.parent.isRunning() || !this.context.isTimeoutElapsed;
        }

        private void addMessageToBuffer(Message<T> polledMessage) {
            this.context.lock();
            try {
                this.acksBuffer.computeIfAbsent(this.messageGroupingFunction.apply(polledMessage), newGroup -> new LinkedBlockingQueue()).add(polledMessage);
            }
            finally {
                this.context.unlock();
            }
        }

        public void waitAcknowledgementsToFinish() {
            this.waitOnAcknowledgementsIfTimeoutSet();
            this.context.isTimeoutElapsed = true;
            this.context.lock();
            try {
                this.context.acksBuffer.clear();
            }
            finally {
                this.context.unlock();
            }
            this.context.runningAcks.forEach(future -> future.cancel(true));
        }

        private void waitOnAcknowledgementsIfTimeoutSet() {
            if (Duration.ZERO.equals(this.ackShutdownTimeout)) {
                logger.debug("Not waiting for acknowledgements, shutting down.");
                return;
            }
            try {
                LocalDateTime endTime = LocalDateTime.now().plus(this.ackShutdownTimeout);
                logger.debug("Waiting until {} for acknowledgements to finish", (Object)endTime);
                while (this.hasAcksLeft() || this.hasUnfinishedAcks()) {
                    if (LocalDateTime.now().isAfter(endTime)) {
                        throw new TimeoutException();
                    }
                    Thread.sleep(200L);
                }
                logger.debug("All acknowledgements completed.");
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException("Interrupted while waiting for acknowledgements to finish");
            }
            catch (TimeoutException e) {
                logger.warn("Acknowledgements did not finish in {} ms. Proceeding with shutdown.", (Object)this.ackShutdownTimeout.toMillis());
            }
            catch (Exception e) {
                logger.warn("Error thrown when waiting for acknowledgement tasks to finish in {}. Continuing with shutdown.", (Object)this.parent.getId(), (Object)e);
            }
        }

        private boolean hasUnfinishedAcks() {
            int unfinishedAcks = this.context.runningAcks.stream().filter(Predicate.not(CompletableFuture::isDone)).toList().size();
            logger.trace("{} unfinished acknowledgement batches", (Object)unfinishedAcks);
            return unfinishedAcks > 0;
        }

        private boolean hasAcksLeft() {
            int messagesInAcks = this.acks.size();
            int messagesInAcksBuffer = this.context.acksBuffer.size();
            logger.trace("Acknowledgement queue has {} messages.", (Object)messagesInAcks);
            logger.trace("Acknowledgement buffer has {} messages.", (Object)messagesInAcksBuffer);
            return messagesInAcksBuffer > 0 || messagesInAcks > 0;
        }
    }

    private static class ScheduledAcknowledgementExecution<T> {
        private final AcknowledgementExecutionContext<T> context;
        private final TaskScheduler taskScheduler;
        private final Duration ackInterval;

        public ScheduledAcknowledgementExecution(Duration ackInterval, TaskScheduler taskScheduler, AcknowledgementExecutionContext<T> context) {
            this.ackInterval = ackInterval;
            this.taskScheduler = taskScheduler;
            this.context = context;
        }

        private void start() {
            if (this.ackInterval != Duration.ZERO) {
                logger.debug("Starting scheduled thread with interval of {}ms for {}", (Object)this.ackInterval.toMillis(), (Object)this.context.id);
                this.scheduleNextExecution(Instant.now().plus(this.ackInterval));
            }
        }

        private void scheduleNextExecution(Instant nextExecutionDelay) {
            if (!this.context.isRunning()) {
                logger.debug("AcknowledgementProcessor {} stopped, not scheduling next acknowledgement execution.", (Object)this.context.id);
                return;
            }
            try {
                logger.trace("Scheduling next acknowledgement execution in {}ms", (Object)(nextExecutionDelay.toEpochMilli() - Instant.now().toEpochMilli()));
                this.taskScheduler.schedule(this::executeScheduledAcknowledgement, nextExecutionDelay);
            }
            catch (Exception e) {
                if (this.context.isRunning()) {
                    logger.warn("Error thrown when scheduling next execution in {}. Resuming.", (Object)this.context.id, (Object)e);
                }
                this.scheduleNextExecution(this.context.lastAcknowledgement.plus(this.ackInterval));
            }
        }

        private void executeScheduledAcknowledgement() {
            this.context.lock();
            try {
                this.pollAndExecuteScheduled();
                this.scheduleNextExecution(this.context.lastAcknowledgement.plus(this.ackInterval));
            }
            catch (Exception e) {
                logger.error("Error executing scheduled acknowledgement in {}. Resuming.", (Object)this.context.id, (Object)e);
                this.scheduleNextExecution(this.context.lastAcknowledgement.plus(this.ackInterval));
            }
            finally {
                this.context.unlock();
            }
        }

        private void pollAndExecuteScheduled() {
            List<CompletableFuture<Void>> executionFutures;
            if (Instant.now().isAfter(this.context.lastAcknowledgement.plus(this.ackInterval)) && (executionFutures = this.context.executeAllAcks()).isEmpty()) {
                this.context.lastAcknowledgement = Instant.now();
            }
        }
    }

    private static class ThresholdAcknowledgementExecutor<T> {
        private final AcknowledgementExecutionContext<T> context;
        private final int ackThreshold;

        public ThresholdAcknowledgementExecutor(int ackThreshold, AcknowledgementExecutionContext<T> context) {
            this.context = context;
            this.ackThreshold = ackThreshold;
        }

        private void checkAndExecute() {
            if (this.ackThreshold == 0) {
                return;
            }
            while (!this.executeThresholdAcks().isEmpty()) {
            }
        }

        private List<CompletableFuture<Void>> executeThresholdAcks() {
            this.context.lock();
            try {
                logger.trace("Executing acknowledgement for threshold in {}.", (Object)this.context.id);
                List<CompletableFuture<Void>> list = this.context.executeAcksUpTo(this.ackThreshold, this.ackThreshold);
                return list;
            }
            finally {
                this.context.unlock();
            }
        }
    }

    private static class AcknowledgementExecutionContext<T> {
        private final String id;
        private final Lock ackLock;
        private final Supplier<Boolean> runningFunction;
        private final Map<String, BlockingQueue<Message<T>>> acksBuffer;
        private final Function<Collection<Message<T>>, CompletableFuture<Void>> executingFunction;
        private final Collection<CompletableFuture<Void>> runningAcks = Collections.synchronizedSet(new HashSet());
        private Instant lastAcknowledgement = Instant.now();
        private volatile boolean isTimeoutElapsed = false;

        private AcknowledgementExecutionContext(String id, Map<String, BlockingQueue<Message<T>>> acksBuffer, Lock ackLock, Supplier<Boolean> runningFunction, Function<Collection<Message<T>>, CompletableFuture<Void>> executingFunction) {
            this.id = id;
            this.acksBuffer = acksBuffer;
            this.ackLock = ackLock;
            this.runningFunction = runningFunction;
            this.executingFunction = executingFunction;
        }

        private List<CompletableFuture<Void>> executeAcksUpTo(int minSize, int maxSize) {
            this.verifyLock();
            List<CompletableFuture<Void>> futures = this.acksBuffer.entrySet().stream().filter(entry -> ((BlockingQueue)entry.getValue()).size() >= minSize).map(entry -> this.doExecute((String)entry.getKey(), (BlockingQueue)entry.getValue(), maxSize == Integer.MAX_VALUE ? ((BlockingQueue)entry.getValue()).size() : maxSize)).collect(Collectors.toList());
            if (!futures.isEmpty()) {
                this.purgeEmptyBuffers();
                return futures;
            }
            return Collections.emptyList();
        }

        private List<CompletableFuture<Void>> executeAllAcks() {
            this.verifyLock();
            List<CompletableFuture<Void>> futures = this.acksBuffer.entrySet().stream().filter(entry -> ((BlockingQueue)entry.getValue()).size() > 0).map(entry -> this.doExecute((String)entry.getKey(), (BlockingQueue)entry.getValue(), ((BlockingQueue)entry.getValue()).size())).collect(Collectors.toList());
            if (!futures.isEmpty()) {
                this.purgeEmptyBuffers();
            }
            return futures;
        }

        private void verifyLock() {
            if (this.ackLock instanceof ReentrantLock) {
                Assert.isTrue((boolean)((ReentrantLock)this.ackLock).isHeldByCurrentThread(), (String)"no lock for executing acknowledgements");
            }
        }

        private CompletableFuture<Void> doExecute(String groupKey, BlockingQueue<Message<T>> messages, int maxSize) {
            logger.trace("Executing acknowledgement for up to {} messages {} of group {} in {}.", new Object[]{maxSize, MessageHeaderUtils.getId(messages), groupKey, this.id});
            List<Message<T>> messagesToAck = this.pollUpToThreshold(groupKey, messages, maxSize);
            CompletableFuture<Void> future = this.manageFuture(this.execute(messagesToAck));
            this.lastAcknowledgement = Instant.now();
            return future;
        }

        private List<Message<T>> pollUpToThreshold(String groupKey, BlockingQueue<Message<T>> messages, int maxSize) {
            return IntStream.range(0, maxSize).mapToObj(index -> this.pollMessage(groupKey, messages)).collect(Collectors.toList());
        }

        private Message<T> pollMessage(String groupKey, BlockingQueue<Message<T>> messages) {
            Message polledMessage = (Message)messages.poll();
            Assert.notNull((Object)polledMessage, (String)"poll should never return null");
            logger.trace("Retrieved message {} from the buffer for group {}. Queue size: {} runningAcks: {}", new Object[]{MessageHeaderUtils.getId(polledMessage), groupKey, messages.size(), this.runningAcks});
            return polledMessage;
        }

        private CompletableFuture<Void> execute(Collection<Message<T>> messages) {
            Assert.notEmpty(messages, (String)"empty collection sent for acknowledgement");
            logger.trace("Executing {} acknowledgements for {}", (Object)messages.size(), (Object)this.id);
            return this.executingFunction.apply(messages);
        }

        private CompletableFuture<Void> manageFuture(CompletableFuture<Void> future) {
            this.runningAcks.add(future);
            logger.trace("Added future to runningAcks. Total: {}", (Object)this.runningAcks.size());
            future.whenComplete((v, t) -> {
                if (this.isRunning()) {
                    this.runningAcks.remove(future);
                    logger.trace("Removed future from runningAcks. Total: {}", (Object)this.runningAcks.size());
                }
            });
            return future;
        }

        private boolean isRunning() {
            return this.runningFunction.get();
        }

        private void purgeEmptyBuffers() {
            this.verifyLock();
            List<String> emptyAcks = this.acksBuffer.entrySet().stream().filter(entry -> ((BlockingQueue)entry.getValue()).isEmpty()).map(Map.Entry::getKey).collect(Collectors.toList());
            logger.trace("Removing groups {} from buffer in {}", emptyAcks, (Object)this.id);
            emptyAcks.forEach(this.acksBuffer::remove);
        }

        private void lock() {
            this.ackLock.lock();
        }

        private void unlock() {
            this.ackLock.unlock();
        }
    }
}

