/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.aws.sqs;

import io.atleon.aws.sqs.AloReceivedSqsMessageDecorator;
import io.atleon.aws.sqs.AloReceivedSqsMessageSignalListenerFactory;
import io.atleon.aws.sqs.BodyDeserializer;
import io.atleon.aws.sqs.DeserializedSqsMessage;
import io.atleon.aws.sqs.NacknowledgerFactory;
import io.atleon.aws.sqs.ReceivedSqsMessage;
import io.atleon.aws.sqs.SqsConfig;
import io.atleon.aws.sqs.SqsConfigSource;
import io.atleon.aws.sqs.SqsMessage;
import io.atleon.aws.sqs.SqsReceiver;
import io.atleon.aws.sqs.SqsReceiverMessage;
import io.atleon.aws.sqs.SqsReceiverOptions;
import io.atleon.core.Alo;
import io.atleon.core.AloFactory;
import io.atleon.core.AloFactoryConfig;
import io.atleon.core.AloFlux;
import io.atleon.core.AloSignalListenerFactory;
import io.atleon.core.AloSignalListenerFactoryConfig;
import io.atleon.core.ErrorEmitter;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.observability.SignalListenerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class AloSqsReceiver<T> {
    public static final String CONFIG_PREFIX = "sqs.receiver.";
    public static final String BODY_DESERIALIZER_CONFIG = "sqs.receiver.body.deserializer";
    public static final String NACKNOWLEDGER_TYPE_CONFIG = "sqs.receiver.nacknowledger.type";
    public static final String NACKNOWLEDGER_TYPE_EMIT = "emit";
    public static final String NACKNOWLEDGER_TYPE_VISIBILITY_RESET = "visibility_reset";
    public static final String ERROR_EMISSION_TIMEOUT_CONFIG = "sqs.receiver.error.emission.timeout";
    public static final String MAX_MESSAGES_PER_RECEPTION_CONFIG = "sqs.receiver.max.messages.per.reception";
    public static final String MESSAGE_ATTRIBUTES_TO_REQUEST_CONFIG = "sqs.receiver.message.attributes.to.request";
    public static final String MESSAGE_SYSTEM_ATTRIBUTES_TO_REQUEST_CONFIG = "sqs.receiver.message.system.attributes.to.request";
    public static final String WAIT_TIME_SECONDS_PER_RECEPTION_CONFIG = "sqs.receiver.wait.time.seconds.per.reception";
    public static final String VISIBILITY_TIMEOUT_SECONDS_CONFIG = "sqs.receiver.visibility.timeout";
    public static final String MAX_IN_FLIGHT_PER_SUBSCRIPTION_CONFIG = "sqs.receiver.max.in.flight.per.subscription";
    public static final String DELETE_BATCH_SIZE_CONFIG = "sqs.receiver.delete.batch.size";
    public static final String DELETE_BATCH_INTERVAL_CONFIG = "sqs.receiver.delete.batch.interval";
    public static final String CLOSE_TIMEOUT_CONFIG = "sqs.receiver.close.timeout";
    private static final Logger LOGGER = LoggerFactory.getLogger(AloSqsReceiver.class);
    private final SqsConfigSource configSource;

    private AloSqsReceiver(SqsConfigSource configSource) {
        this.configSource = configSource;
    }

    public static <T> AloSqsReceiver<T> from(SqsConfigSource configSource) {
        return new AloSqsReceiver<T>(configSource);
    }

    public AloFlux<T> receiveAloBodies(String queueUrl) {
        return this.receiveAloMessages(queueUrl).map(SqsMessage::body);
    }

    public AloFlux<ReceivedSqsMessage<T>> receiveAloMessages(String queueUrl) {
        return (AloFlux)((Mono)this.configSource.create()).map(ReceiveResources::new).flatMapMany(resources -> resources.receive(queueUrl)).as(AloFlux::wrap);
    }

    private static final class ReceiveResources<T> {
        private final SqsConfig config;
        private final BodyDeserializer<T> bodyDeserializer;
        private final NacknowledgerFactory<T> nacknowledgerFactory;

        public ReceiveResources(SqsConfig config) {
            this.config = config;
            this.bodyDeserializer = config.loadConfiguredOrThrow(AloSqsReceiver.BODY_DESERIALIZER_CONFIG, BodyDeserializer.class);
            this.nacknowledgerFactory = ReceiveResources.createNacknowledgerFactory(config);
        }

        public Flux<Alo<ReceivedSqsMessage<T>>> receive(String queueUrl) {
            AloFactory<ReceivedSqsMessage<T>> aloFactory = this.loadAloFactory(queueUrl);
            ErrorEmitter<Alo<ReceivedSqsMessage<T>>> errorEmitter = this.newErrorEmitter();
            return this.newReceiver().receiveManual(queueUrl).map(message -> this.deserialize((SqsReceiverMessage)message, aloFactory, arg_0 -> ((ErrorEmitter)errorEmitter).safelyEmit(arg_0))).transform(arg_0 -> errorEmitter.applyTo(arg_0)).transform(aloMessages -> this.applySignalListenerFactories((Flux<Alo<ReceivedSqsMessage<T>>>)aloMessages, queueUrl));
        }

        private AloFactory<ReceivedSqsMessage<T>> loadAloFactory(String queueUrl) {
            Map<String, Object> factoryConfig = this.config.modifyAndGetProperties(it -> it.put("alo.decorator.sqs.queue.url", queueUrl));
            return AloFactoryConfig.loadDecorated(factoryConfig, AloReceivedSqsMessageDecorator.class);
        }

        private ErrorEmitter<Alo<ReceivedSqsMessage<T>>> newErrorEmitter() {
            Duration timeout = this.config.loadDuration(AloSqsReceiver.ERROR_EMISSION_TIMEOUT_CONFIG).orElse(ErrorEmitter.DEFAULT_TIMEOUT);
            return ErrorEmitter.create((Duration)timeout);
        }

        private SqsReceiver newReceiver() {
            SqsReceiverOptions receiverOptions = SqsReceiverOptions.newBuilder(this.config::buildClient).maxMessagesPerReception(this.config.loadInt(AloSqsReceiver.MAX_MESSAGES_PER_RECEPTION_CONFIG).orElse(10)).messageAttributesToRequest(this.config.loadSetOfStringOrEmpty(AloSqsReceiver.MESSAGE_ATTRIBUTES_TO_REQUEST_CONFIG)).messageSystemAttributesToRequest(this.config.loadSetOfStringOrEmpty(AloSqsReceiver.MESSAGE_SYSTEM_ATTRIBUTES_TO_REQUEST_CONFIG)).waitTimeSecondsPerReception(this.config.loadInt(AloSqsReceiver.WAIT_TIME_SECONDS_PER_RECEPTION_CONFIG).orElse(5)).visibilityTimeoutSeconds(this.config.loadInt(AloSqsReceiver.VISIBILITY_TIMEOUT_SECONDS_CONFIG).orElse(30)).maxInFlightPerSubscription(this.config.loadInt(AloSqsReceiver.MAX_IN_FLIGHT_PER_SUBSCRIPTION_CONFIG).orElse(4096)).deleteBatchSize(this.config.loadInt(AloSqsReceiver.DELETE_BATCH_SIZE_CONFIG).orElse(10)).deleteInterval(this.config.loadDuration(AloSqsReceiver.DELETE_BATCH_INTERVAL_CONFIG).orElse(SqsReceiverOptions.DEFAULT_DELETE_INTERVAL)).closeTimeout(this.config.loadDuration(AloSqsReceiver.CLOSE_TIMEOUT_CONFIG).orElse(SqsReceiverOptions.DEFAULT_CLOSE_TIMEOUT)).build();
            return SqsReceiver.create(receiverOptions);
        }

        private Flux<Alo<ReceivedSqsMessage<T>>> applySignalListenerFactories(Flux<Alo<ReceivedSqsMessage<T>>> aloMessages, String queueUrl) {
            Map<String, Object> factoryConfig = this.config.modifyAndGetProperties(properties -> properties.put("alo.signal.listener.factory.sqs.queue.url", queueUrl));
            List factories = AloSignalListenerFactoryConfig.loadList(factoryConfig, AloReceivedSqsMessageSignalListenerFactory.class);
            for (AloSignalListenerFactory factory : factories) {
                aloMessages = aloMessages.tap((SignalListenerFactory)factory);
            }
            return aloMessages;
        }

        private Alo<ReceivedSqsMessage<T>> deserialize(SqsReceiverMessage message, AloFactory<ReceivedSqsMessage<T>> aloFactory, Consumer<Throwable> errorEmitter) {
            DeserializedSqsMessage<T> deserialized = DeserializedSqsMessage.deserialize(message, this.bodyDeserializer);
            return aloFactory.create(deserialized, message.deleter(), this.nacknowledgerFactory.create(deserialized, message.deleter(), message.visibilityChanger(), errorEmitter));
        }

        private static <T> NacknowledgerFactory<T> createNacknowledgerFactory(SqsConfig config) {
            Optional<NacknowledgerFactory<NacknowledgerFactory>> nacknowledgerFactory = ReceiveResources.loadNacknowledgerFactory(config, AloSqsReceiver.NACKNOWLEDGER_TYPE_CONFIG, NacknowledgerFactory.class);
            return nacknowledgerFactory.orElseGet(NacknowledgerFactory.Emit::new);
        }

        private static <T, N extends NacknowledgerFactory<T>> Optional<NacknowledgerFactory<T>> loadNacknowledgerFactory(SqsConfig config, String key, Class<N> type) {
            return config.loadConfiguredWithPredefinedTypes(key, type, ReceiveResources::newPredefinedNacknowledgerFactory);
        }

        private static <T> Optional<NacknowledgerFactory<T>> newPredefinedNacknowledgerFactory(String typeName) {
            if (typeName.equalsIgnoreCase(AloSqsReceiver.NACKNOWLEDGER_TYPE_EMIT)) {
                return Optional.of(new NacknowledgerFactory.Emit());
            }
            if (typeName.equalsIgnoreCase(AloSqsReceiver.NACKNOWLEDGER_TYPE_VISIBILITY_RESET)) {
                return Optional.of(new NacknowledgerFactory.VisibilityReset(LOGGER));
            }
            return Optional.empty();
        }
    }
}

