/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.aws.sqs;

import io.atleon.aws.sqs.SqsMessageVisibilityChanger;
import io.atleon.aws.sqs.SqsReceiverMessage;
import io.atleon.aws.sqs.SqsReceiverOptions;
import io.atleon.core.ReactivePhaser;
import io.atleon.core.SerialQueue;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.IntPredicate;
import java.util.stream.Collectors;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.retry.Retry;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResponse;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

public final class SqsReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(SqsReceiver.class);
    private static final Retry DEFAULT_RETRY = Retry.backoff((long)3L, (Duration)Duration.ofMillis(10L));
    private static final ReceiveMessageResponse EMPTY_RECEIVE_MESSAGE_RESPONSE = (ReceiveMessageResponse)ReceiveMessageResponse.builder().build();
    private final SqsReceiverOptions options;

    private SqsReceiver(SqsReceiverOptions options) {
        this.options = options;
    }

    public static SqsReceiver create(SqsReceiverOptions options) {
        return new SqsReceiver(options);
    }

    public Flux<SqsReceiverMessage> receiveManual(String queueUrl) {
        return Flux.from(subscriber -> subscriber.onSubscribe((Subscription)new Poller(queueUrl, (Subscriber<? super SqsReceiverMessage>)subscriber)));
    }

    private final class Poller
    implements Subscription {
        private final String queueUrl;
        private final Subscriber<? super SqsReceiverMessage> subscriber;
        private final SqsAsyncClient client;
        private final ReactivePhaser executionPhaser = new ReactivePhaser(1);
        private final AtomicLong requestOutstanding = new AtomicLong(0L);
        private final AtomicBoolean receptionPending = new AtomicBoolean(false);
        private final AtomicBoolean done = new AtomicBoolean(false);
        private final Set<String> inProcessReceiptHandles = Collections.newSetFromMap(new ConcurrentHashMap());
        private final Set<String> inFlightReceiptHandles = Collections.newSetFromMap(new ConcurrentHashMap());
        private final Sinks.Many<String> receiptHandlesToDelete = Sinks.unsafe().many().unicast().onBackpressureError();
        private final SerialQueue<String> receiptHandlesToDeleteQueue = SerialQueue.onEmitNext(this.receiptHandlesToDelete);

        public Poller(String queueUrl, Subscriber<? super SqsReceiverMessage> subscriber) {
            this.queueUrl = queueUrl;
            this.subscriber = subscriber;
            this.client = SqsReceiver.this.options.createClient();
            this.receiptHandlesToDelete.asFlux().doOnComplete(() -> this.executionPhaser.register()).transform(receiptHandles -> this.batch((Flux)receiptHandles, SqsReceiver.this.options.deleteBatchSize(), SqsReceiver.this.options.deleteInterval())).doAfterTerminate(() -> this.executionPhaser.arriveAndDeregister()).subscribe(this::deleteMessages, this::doError);
        }

        public void request(long n) {
            this.requestOutstanding.addAndGet(n);
            this.maybeScheduleMessageReception();
        }

        public void cancel() {
            this.dispose().subscribe();
        }

        private Mono<Boolean> dispose() {
            return Mono.fromSupplier(() -> this.done.compareAndSet(false, true)).flatMap(shouldDispose -> shouldDispose != false ? this.doDispose().thenReturn((Object)true) : Mono.just((Object)false));
        }

        private Mono<?> doDispose() {
            return this.executionPhaser.arriveAndAwaitAdvanceReactively().then(Mono.fromRunnable(() -> this.receiptHandlesToDelete.tryEmitComplete())).then(Mono.defer(() -> this.createChangeMessageVisibilities(this.inProcessReceiptHandles, Duration.ZERO, __ -> true))).then(this.executionPhaser.arriveAndAwaitAdvanceReactively()).timeout(SqsReceiver.this.options.closeTimeout()).doFinally(__ -> this.client.close()).doOnError(error -> LOGGER.error("Encountered error while disposing SQS Poller", error)).onErrorResume(error -> Mono.empty());
        }

        private void maybeScheduleMessageReception() {
            int maxNumberOfMessagesToRequest = this.calculateMaxNumberOfMessagesToRequest();
            if (maxNumberOfMessagesToRequest > 0 && !this.done.get() && this.receptionPending.compareAndSet(false, true)) {
                ReceiveMessageRequest request = (ReceiveMessageRequest)ReceiveMessageRequest.builder().receiveRequestAttemptId(UUID.randomUUID().toString()).queueUrl(this.queueUrl).maxNumberOfMessages(Integer.valueOf(maxNumberOfMessagesToRequest)).messageAttributeNames(SqsReceiver.this.options.messageAttributesToRequest()).attributeNamesWithStrings(SqsReceiver.this.options.messageSystemAttributesToRequest()).waitTimeSeconds(Integer.valueOf(SqsReceiver.this.options.waitTimeSecondsPerReception())).visibilityTimeout(Integer.valueOf(SqsReceiver.this.options.visibilityTimeoutSeconds())).build();
                this.maybeExecute(SqsAsyncClient::receiveMessage, request, phase -> phase == 0).defaultIfEmpty((Object)EMPTY_RECEIVE_MESSAGE_RESPONSE).subscribe(this::handleMessagesReceived, this::handleMessagesReceivedError);
            }
        }

        private int calculateMaxNumberOfMessagesToRequest() {
            int remainingInFlightCapacity = SqsReceiver.this.options.maxInFlightPerSubscription() - this.inFlightReceiptHandles.size();
            int maxNumberOfMessagesToEmit = (int)Math.min(this.requestOutstanding.get(), (long)remainingInFlightCapacity);
            return Math.min(SqsReceiver.this.options.maxMessagesPerReception(), maxNumberOfMessagesToEmit);
        }

        private void handleMessagesReceived(ReceiveMessageResponse response) {
            response.messages().forEach(this::emit);
            this.receptionPending.set(false);
            this.maybeScheduleMessageReception();
        }

        private void handleMessagesReceivedError(Throwable error) {
            this.doError(error);
            this.receptionPending.set(false);
        }

        private void emit(Message message) {
            String receiptHandle = message.receiptHandle();
            Runnable deleter = () -> {
                if (this.executionPhaser.register() == 0 && !this.done.get() && this.inProcessReceiptHandles.remove(receiptHandle)) {
                    this.receiptHandlesToDeleteQueue.addAndDrain((Object)receiptHandle);
                }
                this.executionPhaser.arriveAndDeregister();
            };
            SqsMessageVisibilityChanger visibilityChanger = (timeout, stillInProcess) -> {
                if (this.executionPhaser.register() == 0 && !this.done.get()) {
                    if (stillInProcess && this.inProcessReceiptHandles.contains(receiptHandle)) {
                        this.maybeChangeMessageVisibility(receiptHandle, timeout);
                    } else if (!stillInProcess && this.inProcessReceiptHandles.remove(receiptHandle)) {
                        this.maybeChangeMessageVisibilityAndMarkNotInFlight(receiptHandle, timeout);
                    }
                }
                this.executionPhaser.arriveAndDeregister();
            };
            this.inFlightReceiptHandles.add(receiptHandle);
            this.inProcessReceiptHandles.add(receiptHandle);
            this.doNext(SqsReceiverMessage.create(message, deleter, visibilityChanger));
        }

        private void deleteMessages(Collection<String> receiptHandles) {
            if (receiptHandles.isEmpty()) {
                return;
            }
            List entries = receiptHandles.stream().map(it -> (DeleteMessageBatchRequestEntry)DeleteMessageBatchRequestEntry.builder().id(this.newReceiptHandleId()).receiptHandle(it).build()).collect(Collectors.toList());
            DeleteMessageBatchRequest request = (DeleteMessageBatchRequest)DeleteMessageBatchRequest.builder().queueUrl(this.queueUrl).entries(entries).build();
            this.maybeExecute(SqsAsyncClient::deleteMessageBatch, request, __ -> true).subscribe(response -> this.handleMessagesDeleted((DeleteMessageBatchResponse)response, receiptHandles), this::doError);
        }

        private void handleMessagesDeleted(DeleteMessageBatchResponse response, Collection<String> receiptHandles) {
            if (response.hasFailed()) {
                this.doError(new BatchRequestFailedException("DeleteMessage", response.failed()));
            } else if (this.inFlightReceiptHandles.removeAll(receiptHandles)) {
                this.maybeScheduleMessageReception();
            }
        }

        private void maybeChangeMessageVisibility(String receiptHandle, Duration timeout) {
            this.createChangeMessageVisibilities(Collections.singletonList(receiptHandle), timeout, phase -> phase == 0).subscribe(response -> this.handleMessageVisibilitiesChanged((ChangeMessageVisibilityBatchResponse)response, (Collection<String>)Collections.emptyList()), this::doError);
        }

        private void maybeChangeMessageVisibilityAndMarkNotInFlight(String receiptHandle, Duration timeout) {
            List<String> receiptHandles = Collections.singletonList(receiptHandle);
            this.createChangeMessageVisibilities(receiptHandles, timeout, phase -> phase == 0).subscribe(response -> this.handleMessageVisibilitiesChanged((ChangeMessageVisibilityBatchResponse)response, (Collection<String>)receiptHandles), this::doError);
        }

        private Mono<ChangeMessageVisibilityBatchResponse> createChangeMessageVisibilities(Collection<String> receiptHandles, Duration timeout, IntPredicate phaseMustMatch) {
            if (receiptHandles.isEmpty()) {
                return Mono.empty();
            }
            int timeoutInSeconds = Math.toIntExact(timeout.getSeconds());
            List entries = receiptHandles.stream().map(receiptHandle -> this.createChangeMessageVisibilityRequestEntry((String)receiptHandle, timeoutInSeconds)).collect(Collectors.toList());
            return this.maybeExecute(SqsAsyncClient::changeMessageVisibilityBatch, (ChangeMessageVisibilityBatchRequest)ChangeMessageVisibilityBatchRequest.builder().queueUrl(this.queueUrl).entries(entries).build(), phaseMustMatch);
        }

        private ChangeMessageVisibilityBatchRequestEntry createChangeMessageVisibilityRequestEntry(String receiptHandle, int timeoutInSeconds) {
            return (ChangeMessageVisibilityBatchRequestEntry)ChangeMessageVisibilityBatchRequestEntry.builder().id(this.newReceiptHandleId()).receiptHandle(receiptHandle).visibilityTimeout(Integer.valueOf(timeoutInSeconds)).build();
        }

        private void handleMessageVisibilitiesChanged(ChangeMessageVisibilityBatchResponse response, Collection<String> receiptHandlesNoLongerInFlight) {
            if (response.hasFailed()) {
                this.doError(new BatchRequestFailedException("ChangeMessageVisibility", response.failed()));
            } else if (this.inFlightReceiptHandles.removeAll(receiptHandlesNoLongerInFlight)) {
                this.maybeScheduleMessageReception();
            }
        }

        private <T, V> Mono<V> maybeExecute(BiFunction<SqsAsyncClient, T, CompletableFuture<V>> method, T request, IntPredicate phaseMustMatch) {
            return Mono.fromSupplier(() -> phaseMustMatch.test(this.executionPhaser.register())).cache().flatMap(phaseMatched -> phaseMatched != false ? Mono.fromFuture((CompletableFuture)((CompletableFuture)method.apply(this.client, request))) : Mono.empty()).retryWhen(DEFAULT_RETRY).doFinally(__ -> this.executionPhaser.arriveAndDeregister());
        }

        private void doNext(SqsReceiverMessage sqsReceiverMessage) {
            try {
                this.subscriber.onNext((Object)sqsReceiverMessage);
            }
            catch (Throwable error) {
                this.doError(error);
            }
            if (this.requestOutstanding.get() != Long.MAX_VALUE) {
                this.requestOutstanding.decrementAndGet();
            }
        }

        private void doError(Throwable error) {
            this.dispose().subscribe(wasDisposed -> {
                if (wasDisposed.booleanValue()) {
                    this.subscriber.onError(error);
                }
            });
        }

        private <T> Flux<List<T>> batch(Flux<T> flux, int maxSize, Duration maxDuration) {
            return maxSize <= 1 ? flux.map(Collections::singletonList) : flux.bufferTimeout(maxSize, maxDuration);
        }

        private String newReceiptHandleId() {
            return UUID.randomUUID().toString();
        }
    }

    public static final class BatchRequestFailedException
    extends RuntimeException {
        private BatchRequestFailedException(String type, List<BatchResultErrorEntry> entries) {
            super(String.format("Batch request failed! type=%s errors=%s", type, entries));
        }
    }
}

