/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.aws.sqs;

import io.atleon.aws.sqs.BodyDeserializer;
import io.atleon.aws.sqs.DeserializedSqsMessage;
import io.atleon.aws.sqs.NacknowledgerFactory;
import io.atleon.aws.sqs.ReceivedSqsMessage;
import io.atleon.aws.sqs.SqsConfig;
import io.atleon.aws.sqs.SqsConfigSource;
import io.atleon.aws.sqs.SqsMessage;
import io.atleon.aws.sqs.SqsReceiver;
import io.atleon.aws.sqs.SqsReceiverMessage;
import io.atleon.aws.sqs.SqsReceiverOptions;
import io.atleon.core.Alo;
import io.atleon.core.AloFactory;
import io.atleon.core.AloFlux;
import java.util.Optional;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public class AloSqsReceiver<T> {
    public static final String CONFIG_PREFIX = "sqs.receiver.";
    public static final String BODY_DESERIALIZER_CONFIG = "sqs.receiver.body.deserializer";
    public static final String NACKNOWLEDGER_TYPE_CONFIG = "sqs.receiver.nacknowledger.type";
    public static final String NACKNOWLEDGER_TYPE_EMIT = "emit";
    public static final String NACKNOWLEDGER_TYPE_VISIBILITY_RESET = "visibility_reset";
    public static final String MAX_MESSAGES_PER_RECEPTION_CONFIG = "sqs.receiver.max.messages.per.reception";
    public static final String MESSAGE_ATTRIBUTES_TO_REQUEST_CONFIG = "sqs.receiver.message.attributes.to.request";
    public static final String MESSAGE_SYSTEM_ATTRIBUTES_TO_REQUEST_CONFIG = "sqs.receiver.message.system.attributes.to.request";
    public static final String WAIT_TIME_SECONDS_PER_RECEPTION_CONFIG = "sqs.receiver.wait.time.seconds.per.reception";
    public static final String VISIBILITY_TIMEOUT_SECONDS_CONFIG = "sqs.receiver.visibility.timeout";
    public static final String MAX_IN_FLIGHT_PER_SUBSCRIPTION_CONFIG = "sqs.receiver.max.in.flight.per.subscription";
    public static final String DELETE_BATCH_SIZE_CONFIG = "sqs.receiver.delete.batch.size";
    public static final String DELETE_BATCH_INTERVAL_CONFIG = "sqs.receiver.delete.batch.interval";
    public static final String CLOSE_TIMEOUT_CONFIG = "sqs.receiver.close.timeout";
    private static final Logger LOGGER = LoggerFactory.getLogger(AloSqsReceiver.class);
    private final SqsConfigSource configSource;

    private AloSqsReceiver(SqsConfigSource configSource) {
        this.configSource = configSource;
    }

    public static <T> AloSqsReceiver<T> from(SqsConfigSource configSource) {
        return new AloSqsReceiver<T>(configSource);
    }

    public AloFlux<T> receiveAloBodies(String queueUrl) {
        return this.receiveAloMessages(queueUrl).map(SqsMessage::body);
    }

    public AloFlux<ReceivedSqsMessage<T>> receiveAloMessages(String queueUrl) {
        return (AloFlux)((Mono)this.configSource.create()).flatMapMany(config -> this.receiveMessages((SqsConfig)config, queueUrl)).as(AloFlux::wrap);
    }

    private Flux<Alo<ReceivedSqsMessage<T>>> receiveMessages(SqsConfig config, String queueUrl) {
        SqsReceiverOptions options = AloSqsReceiver.newReceiverOptions(config);
        AloFactory aloFactory = config.loadAloFactory();
        BodyDeserializer bodyDeserializer = config.loadConfiguredOrThrow(BODY_DESERIALIZER_CONFIG, BodyDeserializer.class);
        NacknowledgerFactory nacknowledgerFactory = AloSqsReceiver.createNacknowledgerFactory(config);
        Sinks.Empty sink = Sinks.empty();
        return SqsReceiver.create(options).receiveManual(queueUrl).mergeWith((Publisher)sink.asMono()).map(message -> AloSqsReceiver.toAlo(aloFactory, message, bodyDeserializer, nacknowledgerFactory, arg_0 -> ((Sinks.Empty)sink).tryEmitError(arg_0)));
    }

    private static SqsReceiverOptions newReceiverOptions(SqsConfig config) {
        return SqsReceiverOptions.newBuilder(config::buildClient).maxMessagesPerReception(config.loadInt(MAX_MESSAGES_PER_RECEPTION_CONFIG).orElse(10)).messageAttributesToRequest(config.loadSetOfStringOrEmpty(MESSAGE_ATTRIBUTES_TO_REQUEST_CONFIG)).messageSystemAttributesToRequest(config.loadSetOfStringOrEmpty(MESSAGE_SYSTEM_ATTRIBUTES_TO_REQUEST_CONFIG)).waitTimeSecondsPerReception(config.loadInt(WAIT_TIME_SECONDS_PER_RECEPTION_CONFIG).orElse(5)).visibilityTimeoutSeconds(config.loadInt(VISIBILITY_TIMEOUT_SECONDS_CONFIG).orElse(30)).maxInFlightPerSubscription(config.loadInt(MAX_IN_FLIGHT_PER_SUBSCRIPTION_CONFIG).orElse(4096)).deleteBatchSize(config.loadInt(DELETE_BATCH_SIZE_CONFIG).orElse(10)).deleteInterval(config.loadDuration(DELETE_BATCH_INTERVAL_CONFIG).orElse(SqsReceiverOptions.DEFAULT_DELETE_INTERVAL)).closeTimeout(config.loadDuration(CLOSE_TIMEOUT_CONFIG).orElse(SqsReceiverOptions.DEFAULT_CLOSE_TIMEOUT)).build();
    }

    private static <T> NacknowledgerFactory<T> createNacknowledgerFactory(SqsConfig config) {
        Optional<NacknowledgerFactory<NacknowledgerFactory>> nacknowledgerFactory = AloSqsReceiver.loadNacknowledgerFactory(config, NACKNOWLEDGER_TYPE_CONFIG, NacknowledgerFactory.class);
        return nacknowledgerFactory.orElseGet(NacknowledgerFactory.Emit::new);
    }

    private static <T, N extends NacknowledgerFactory<T>> Optional<NacknowledgerFactory<T>> loadNacknowledgerFactory(SqsConfig config, String key, Class<N> type) {
        return config.loadConfiguredWithPredefinedTypes(key, type, AloSqsReceiver::instantiatePredefinedNacknowledgerFactory);
    }

    private static <T> Optional<NacknowledgerFactory<T>> instantiatePredefinedNacknowledgerFactory(String typeName) {
        if (typeName.equalsIgnoreCase(NACKNOWLEDGER_TYPE_EMIT)) {
            return Optional.of(new NacknowledgerFactory.Emit());
        }
        if (typeName.equalsIgnoreCase(NACKNOWLEDGER_TYPE_VISIBILITY_RESET)) {
            return Optional.of(new NacknowledgerFactory.VisibilityReset(LOGGER));
        }
        return Optional.empty();
    }

    private static <T> Alo<ReceivedSqsMessage<T>> toAlo(AloFactory<ReceivedSqsMessage<T>> aloFactory, SqsReceiverMessage message, BodyDeserializer<T> bodyDeserializer, NacknowledgerFactory<T> nacknowledgerFactory, Consumer<Throwable> errorEmitter) {
        DeserializedSqsMessage<T> deserialized = DeserializedSqsMessage.create(message.receiptHandle(), message.messageId(), message.messageAttributes(), message.messageSystemAttributes(), bodyDeserializer.deserialize((String)message.body()));
        return aloFactory.create(deserialized, message.deleter(), nacknowledgerFactory.create(deserialized, message.deleter(), message.visibilityChanger(), errorEmitter));
    }
}

