/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.aws.sqs;

import io.atleon.aws.sqs.SqsSenderMessage;
import io.atleon.aws.sqs.SqsSenderOptions;
import io.atleon.aws.sqs.SqsSenderResult;
import io.atleon.core.Batcher;
import java.io.Closeable;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.retry.Retry;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResultEntry;
import software.amazon.awssdk.utils.SdkAutoCloseable;

public final class SqsSender
implements Closeable {
    private static final Retry DEFAULT_RETRY = Retry.backoff((long)3L, (Duration)Duration.ofMillis(10L));
    private final Mono<SqsAsyncClient> futureClient;
    private final Batcher batcher;
    private final int maxRequestsInFlight;
    private final Sinks.Many<Long> closeSink = Sinks.many().multicast().directBestEffort();

    private SqsSender(SqsSenderOptions options) {
        this.futureClient = Mono.fromSupplier(options::createClient).cacheInvalidateWhen(client -> this.closeSink.asFlux().next().then(), SdkAutoCloseable::close);
        this.batcher = Batcher.create((int)options.batchSize(), (Duration)options.batchDuration(), (int)options.batchPrefetch());
        this.maxRequestsInFlight = options.maxRequestsInFlight();
    }

    public static SqsSender create(SqsSenderOptions options) {
        return new SqsSender(options);
    }

    public <C> Mono<SqsSenderResult<C>> send(SqsSenderMessage<C> message, String queueUrl) {
        return this.futureClient.flatMapMany(client -> this.send((SqsAsyncClient)client, SqsSendEntry.singletonList(message), queueUrl)).next();
    }

    public <C> Flux<SqsSenderResult<C>> send(Publisher<SqsSenderMessage<C>> messages, String queueUrl) {
        return this.futureClient.flatMapMany(client -> this.send((SqsAsyncClient)client, (Flux)Flux.from((Publisher)messages), queueUrl));
    }

    @Override
    public void close() {
        this.closeSink.tryEmitNext((Object)System.currentTimeMillis());
    }

    private <C> Flux<SqsSenderResult<C>> send(SqsAsyncClient client, Flux<SqsSenderMessage<C>> messages, String topicArn) {
        return this.batcher.applyMapping((Publisher)messages.map(SqsSendEntry::create), entries -> this.send(client, (List)entries, topicArn), this.maxRequestsInFlight);
    }

    private <C> Flux<SqsSenderResult<C>> send(SqsAsyncClient client, List<SqsSendEntry<C>> entries, String queueUrl) {
        SendMessageBatchRequest request = (SendMessageBatchRequest)SendMessageBatchRequest.builder().queueUrl(queueUrl).entries((Collection)entries.stream().map(SqsSendEntry::requestEntry).collect(Collectors.toList())).build();
        Map<String, Object> correlationMetadataByRequestId = entries.stream().filter(message -> message.correlationMetadata() != null).collect(Collectors.toMap(SqsSendEntry::requestId, SqsSendEntry::correlationMetadata));
        return Mono.fromFuture(() -> client.sendMessageBatch(request)).retryWhen(DEFAULT_RETRY).flatMapIterable(response -> this.createResults((SendMessageBatchResponse)response, (Map)correlationMetadataByRequestId));
    }

    private <C> List<SqsSenderResult<C>> createResults(SendMessageBatchResponse response, Map<String, C> correlationMetadataByRequestId) {
        Stream<SqsSenderResult> failures = response.failed().stream().map(entry -> this.createFailureResult((BatchResultErrorEntry)entry, (Object)correlationMetadataByRequestId.get(entry.id())));
        Stream<SqsSenderResult> successes = response.successful().stream().map(entry -> this.createSuccessResult((SendMessageBatchResultEntry)entry, (Object)correlationMetadataByRequestId.get(entry.id())));
        return Stream.concat(failures, successes).collect(Collectors.toList());
    }

    private <C> SqsSenderResult<C> createFailureResult(BatchResultErrorEntry entry, C correlationMetadata) {
        MessageSendFailedException error = new MessageSendFailedException(entry.code(), entry.message());
        return SqsSenderResult.failure(entry.id(), error, correlationMetadata);
    }

    private <C> SqsSenderResult<C> createSuccessResult(SendMessageBatchResultEntry entry, C correlationMetadata) {
        return SqsSenderResult.success(entry.id(), entry.messageId(), entry.sequenceNumber(), correlationMetadata);
    }

    private static final class SqsSendEntry<C> {
        private final SendMessageBatchRequestEntry requestEntry;
        private final C correlationMetadata;

        private SqsSendEntry(SendMessageBatchRequestEntry requestEntry, C correlationMetadata) {
            this.requestEntry = requestEntry;
            this.correlationMetadata = correlationMetadata;
        }

        public static <C> List<SqsSendEntry<C>> singletonList(SqsSenderMessage<C> message) {
            return Collections.singletonList(SqsSendEntry.create(message));
        }

        public static <C> SqsSendEntry<C> create(SqsSenderMessage<C> message) {
            SendMessageBatchRequestEntry requestEntry = (SendMessageBatchRequestEntry)SendMessageBatchRequestEntry.builder().id(message.requestId()).messageDeduplicationId((String)message.messageDeduplicationId().orElse(null)).messageGroupId((String)message.messageGroupId().orElse(null)).messageAttributes(message.messageAttributes()).messageSystemAttributesWithStrings(message.messageSystemAttributes()).messageBody((String)message.body()).delaySeconds((Integer)message.senderDelaySeconds().orElse(null)).build();
            return new SqsSendEntry<C>(requestEntry, message.correlationMetadata());
        }

        public String requestId() {
            return this.requestEntry.id();
        }

        public SendMessageBatchRequestEntry requestEntry() {
            return this.requestEntry;
        }

        public C correlationMetadata() {
            return this.correlationMetadata;
        }
    }

    public static final class MessageSendFailedException
    extends RuntimeException {
        private final String code;
        private final String message;

        public MessageSendFailedException(String code, String message) {
            super(String.format("Sending message failed with code=%s: %s", code, message));
            this.code = code;
            this.message = message;
        }

        public String code() {
            return this.code;
        }

        public String message() {
            return this.message;
        }
    }
}

