/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.aws.sqs;

import io.atleon.aws.sqs.BodySerializer;
import io.atleon.aws.sqs.SqsConfig;
import io.atleon.aws.sqs.SqsConfigSource;
import io.atleon.aws.sqs.SqsMessage;
import io.atleon.aws.sqs.SqsMessageCreator;
import io.atleon.aws.sqs.SqsSender;
import io.atleon.aws.sqs.SqsSenderMessage;
import io.atleon.aws.sqs.SqsSenderOptions;
import io.atleon.aws.sqs.SqsSenderResult;
import io.atleon.core.Alo;
import io.atleon.core.AloFlux;
import io.atleon.core.SenderResult;
import java.io.Closeable;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.SynchronousSink;

public class AloSqsSender<T>
implements Closeable {
    public static final String CONFIG_PREFIX = "sqs.sender.";
    public static final String BODY_SERIALIZER_CONFIG = "sqs.sender.body.serializer";
    public static final String BATCH_SIZE_CONFIG = "sqs.sender.batch.size";
    public static final String BATCH_DURATION_CONFIG = "sqs.sender.batch.duration";
    public static final String BATCH_PREFETCH_CONFIG = "sqs.sender.batch.prefetch";
    public static final String MAX_REQUESTS_IN_FLIGHT_CONFIG = "sqs.sender.max.requests.in.flight";
    private static final Logger LOGGER = LoggerFactory.getLogger(AloSqsSender.class);
    private final Mono<SendResources<T>> futureResources;
    private final Sinks.Many<Long> closeSink = Sinks.many().multicast().directBestEffort();

    private AloSqsSender(SqsConfigSource configSource) {
        this.futureResources = ((Mono)configSource.create()).map(SendResources::fromConfig).cacheInvalidateWhen(client -> this.closeSink.asFlux().next().then(), SendResources::close);
    }

    public static <T> AloSqsSender<T> from(SqsConfigSource configSource) {
        return new AloSqsSender<T>(configSource);
    }

    public Flux<SqsSenderResult<T>> sendBodies(Publisher<T> bodies, SqsMessageCreator<T> messageCreator, String queueUrl) {
        return this.futureResources.flatMapMany(resources -> resources.send(bodies, messageCreator, queueUrl));
    }

    public Mono<SqsSenderResult<SqsMessage<T>>> sendMessage(SqsMessage<T> message, String queueUrl) {
        return this.futureResources.flatMap(resources -> resources.send(message, queueUrl));
    }

    public Flux<SqsSenderResult<SqsMessage<T>>> sendMessages(Publisher<SqsMessage<T>> messages, String queueUrl) {
        return this.futureResources.flatMapMany(resources -> resources.send(messages, Function.identity(), queueUrl));
    }

    public Function<Publisher<Alo<T>>, AloFlux<SqsSenderResult<T>>> sendAloBodies(SqsMessageCreator<T> messageCreator, String queueUrl) {
        return aloBodies -> this.sendAloBodies((Publisher<Alo<T>>)aloBodies, messageCreator, queueUrl);
    }

    public AloFlux<SqsSenderResult<T>> sendAloBodies(Publisher<Alo<T>> aloBodies, SqsMessageCreator<T> messageCreator, String queueUrl) {
        return ((AloFlux)this.futureResources.flatMapMany(resources -> resources.sendAlos(aloBodies, messageCreator, queueUrl)).as(AloFlux::wrap)).processFailure(SenderResult::isFailure, SenderResult::toError);
    }

    public Function<Publisher<Alo<SqsMessage<T>>>, AloFlux<SqsSenderResult<SqsMessage<T>>>> sendAloMessages(String queueUrl) {
        return aloMessages -> this.sendAloMessages((Publisher<Alo<SqsMessage<T>>>)aloMessages, queueUrl);
    }

    public AloFlux<SqsSenderResult<SqsMessage<T>>> sendAloMessages(Publisher<Alo<SqsMessage<T>>> aloMessages, String queueUrl) {
        return ((AloFlux)this.futureResources.flatMapMany(resources -> resources.sendAlos(aloMessages, Function.identity(), queueUrl)).as(AloFlux::wrap)).processFailure(SenderResult::isFailure, SenderResult::toError);
    }

    public void close(Object reason) {
        LOGGER.info("Closing AloSqsSender due to reason={}", reason);
        this.close();
    }

    @Override
    public void close() {
        this.closeSink.tryEmitNext((Object)System.currentTimeMillis());
    }

    private static final class SendResources<T> {
        private final SqsSender sender;
        private final BodySerializer<T> bodySerializer;

        private SendResources(SqsSender sender, BodySerializer<T> bodySerializer) {
            this.sender = sender;
            this.bodySerializer = bodySerializer;
        }

        public static <T> SendResources<T> fromConfig(SqsConfig config) {
            SqsSenderOptions options = SqsSenderOptions.newBuilder(config::buildClient).batchSize(config.loadInt(AloSqsSender.BATCH_SIZE_CONFIG).orElse(1)).batchDuration(config.loadDuration(AloSqsSender.BATCH_DURATION_CONFIG).orElse(SqsSenderOptions.DEFAULT_BATCH_DURATION)).batchPrefetch(config.loadInt(AloSqsSender.BATCH_PREFETCH_CONFIG).orElse(32)).maxRequestsInFlight(config.loadInt(AloSqsSender.MAX_REQUESTS_IN_FLIGHT_CONFIG).orElse(1)).build();
            return new SendResources<T>(SqsSender.create(options), config.loadConfiguredOrThrow(AloSqsSender.BODY_SERIALIZER_CONFIG, BodySerializer.class));
        }

        public Mono<SqsSenderResult<SqsMessage<T>>> send(SqsMessage<T> message, String queueUrl) {
            return this.sender.send(this.toSenderMessage(message, Function.identity()), queueUrl);
        }

        public <R> Flux<SqsSenderResult<R>> send(Publisher<R> items, Function<R, SqsMessage<T>> messageCreator, String queueUrl) {
            return Flux.from(items).map(item -> this.toSenderMessage(item, messageCreator)).transform(senderMessages -> this.sender.send(senderMessages, queueUrl));
        }

        public <R> Flux<Alo<SqsSenderResult<R>>> sendAlos(Publisher<Alo<R>> alos, Function<R, SqsMessage<T>> messageCreator, String queueUrl) {
            return AloFlux.toFlux(alos).handle(this.newAloEmitter(messageCreator.compose(Alo::get))).transform(senderMessages -> this.sender.send(senderMessages, queueUrl)).map(result -> ((Alo)result.correlationMetadata()).map(result::replaceCorrelationMetadata));
        }

        public void close() {
            this.sender.close();
        }

        private <R> BiConsumer<Alo<R>, SynchronousSink<SqsSenderMessage<Alo<R>>>> newAloEmitter(Function<Alo<R>, SqsMessage<T>> aloToSqsMessage) {
            return (alo, sink) -> alo.runInContext(() -> sink.next(this.toSenderMessage(alo, aloToSqsMessage)));
        }

        private <R> SqsSenderMessage<R> toSenderMessage(R data, Function<R, SqsMessage<T>> dataToSqsMessage) {
            SqsMessage<T> sqsMessage = dataToSqsMessage.apply(data);
            return SqsSenderMessage.newBuilder().messageDeduplicationId(sqsMessage.messageDeduplicationId().orElse(null)).messageGroupId(sqsMessage.messageGroupId().orElse(null)).messageAttributes(sqsMessage.messageAttributes()).messageSystemAttributes(sqsMessage.messageSystemAttributes()).body(this.bodySerializer.serialize(sqsMessage.body())).delaySeconds(sqsMessage.senderDelaySeconds().orElse(null)).correlationMetadata(data).build();
        }
    }
}

