/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.rabbitmq;

import io.atleon.core.Acknowledgement;
import io.atleon.core.Alo;
import io.atleon.core.AloFactory;
import io.atleon.core.AloFactoryConfig;
import io.atleon.core.AloFlux;
import io.atleon.core.AloSignalListenerFactory;
import io.atleon.core.AloSignalListenerFactoryConfig;
import io.atleon.core.ErrorEmitter;
import io.atleon.rabbitmq.AloReceivedRabbitMQMessageDecorator;
import io.atleon.rabbitmq.AloReceivedRabbitMQMessageSignalListenerFactory;
import io.atleon.rabbitmq.BodyDeserializer;
import io.atleon.rabbitmq.NacknowledgerFactory;
import io.atleon.rabbitmq.RabbitMQConfig;
import io.atleon.rabbitmq.RabbitMQConfigSource;
import io.atleon.rabbitmq.RabbitMQMessage;
import io.atleon.rabbitmq.ReceivedRabbitMQMessage;
import io.atleon.rabbitmq.SerializedBody;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.observability.SignalListenerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.ReceiverOptions;

public class AloRabbitMQReceiver<T> {
    public static final String CONFIG_PREFIX = "rabbitmq.receiver.";
    public static final String QOS_CONFIG = "rabbitmq.receiver.qos";
    public static final String BODY_DESERIALIZER_CONFIG = "rabbitmq.receiver.body.deserializer";
    @Deprecated
    public static final String NACK_STRATEGY_CONFIG = "rabbitmq.receiver.nack.strategy";
    public static final String NACKNOWLEDGER_TYPE_CONFIG = "rabbitmq.receiver.nacknowledger.type";
    public static final String NACKNOWLEDGER_TYPE_EMIT = "emit";
    public static final String NACKNOWLEDGER_TYPE_REQUEUE = "requeue";
    public static final String NACKNOWLEDGER_TYPE_DISCARD = "discard";
    public static final String ERROR_EMISSION_TIMEOUT_CONFIG = "rabbitmq.receiver.error.emission.timeout";
    private static final Logger LOGGER = LoggerFactory.getLogger(AloRabbitMQReceiver.class);
    private final RabbitMQConfigSource configSource;

    private AloRabbitMQReceiver(RabbitMQConfigSource configSource) {
        this.configSource = configSource;
    }

    public static <T> AloRabbitMQReceiver<T> from(RabbitMQConfigSource configSource) {
        return new AloRabbitMQReceiver<T>(configSource);
    }

    public AloFlux<T> receiveAloBodies(String queue) {
        return this.receiveAloMessages(queue).mapNotNull(RabbitMQMessage::body);
    }

    public AloFlux<ReceivedRabbitMQMessage<T>> receiveAloMessages(String queue) {
        return (AloFlux)((Mono)this.configSource.create()).map(ReceiveResources::new).flatMapMany(resources -> resources.receive(queue)).as(AloFlux::wrap);
    }

    private static final class ReceiveResources<T> {
        private final RabbitMQConfig config;
        private final BodyDeserializer<T> bodyDeserializer;
        private final NacknowledgerFactory<T> nacknowledgerFactory;

        public ReceiveResources(RabbitMQConfig config) {
            this.config = config;
            this.bodyDeserializer = config.loadConfiguredOrThrow(AloRabbitMQReceiver.BODY_DESERIALIZER_CONFIG, BodyDeserializer.class);
            this.nacknowledgerFactory = ReceiveResources.createNacknowledgerFactory(config);
        }

        public Flux<Alo<ReceivedRabbitMQMessage<T>>> receive(String queue) {
            AloFactory<ReceivedRabbitMQMessage<T>> aloFactory = this.loadAloFactory(queue);
            ErrorEmitter<Alo<ReceivedRabbitMQMessage<T>>> errorEmitter = this.newErrorEmitter();
            return Flux.using(this::newReceiver, it -> it.consumeManualAck(queue, this.newConsumeOptions()), Receiver::close).map(delivery -> this.deserialize((AcknowledgableDelivery)delivery, aloFactory, arg_0 -> ((ErrorEmitter)errorEmitter).safelyEmit(arg_0))).transform(arg_0 -> errorEmitter.applyTo(arg_0)).transform(aloMessages -> this.applySignalListenerFactories((Flux<Alo<ReceivedRabbitMQMessage<T>>>)aloMessages, queue));
        }

        private AloFactory<ReceivedRabbitMQMessage<T>> loadAloFactory(String queue) {
            Map<String, Object> factoryConfig = this.config.modifyAndGetProperties(it -> it.put("alo.decorator.rabbitmq.queue", queue));
            return AloFactoryConfig.loadDecorated(factoryConfig, AloReceivedRabbitMQMessageDecorator.class);
        }

        private ErrorEmitter<Alo<ReceivedRabbitMQMessage<T>>> newErrorEmitter() {
            Duration timeout = this.config.loadDuration(AloRabbitMQReceiver.ERROR_EMISSION_TIMEOUT_CONFIG).orElse(ErrorEmitter.DEFAULT_TIMEOUT);
            return ErrorEmitter.create((Duration)timeout);
        }

        private Receiver newReceiver() {
            ReceiverOptions receiverOptions = new ReceiverOptions().connectionFactory(this.config.buildConnectionFactory());
            return new Receiver(receiverOptions);
        }

        private ConsumeOptions newConsumeOptions() {
            return new ConsumeOptions().qos(this.config.loadInt(AloRabbitMQReceiver.QOS_CONFIG).orElse(256).intValue());
        }

        private Flux<Alo<ReceivedRabbitMQMessage<T>>> applySignalListenerFactories(Flux<Alo<ReceivedRabbitMQMessage<T>>> aloMessages, String queue) {
            Map<String, Object> factoryConfig = this.config.modifyAndGetProperties(properties -> properties.put("alo.signal.listener.factory.rabbitmq.queue", queue));
            List factories = AloSignalListenerFactoryConfig.loadList(factoryConfig, AloReceivedRabbitMQMessageSignalListenerFactory.class);
            for (AloSignalListenerFactory factory : factories) {
                aloMessages = aloMessages.tap((SignalListenerFactory)factory);
            }
            return aloMessages;
        }

        private Alo<ReceivedRabbitMQMessage<T>> deserialize(AcknowledgableDelivery delivery, AloFactory<ReceivedRabbitMQMessage<T>> aloFactory, Consumer<Throwable> errorEmitter) {
            SerializedBody body = SerializedBody.ofBytes(delivery.getBody());
            ReceivedRabbitMQMessage<T> message = ReceivedRabbitMQMessage.create(delivery.getEnvelope().getExchange(), delivery.getEnvelope().getRoutingKey(), delivery.getProperties(), this.bodyDeserializer.deserialize(body), delivery.getEnvelope().isRedeliver());
            Acknowledgement acknowledgement = Acknowledgement.create(() -> ReceiveResources.ack(delivery, errorEmitter), this.nacknowledgerFactory.create(message, requeue -> ReceiveResources.nack(delivery, requeue, errorEmitter), errorEmitter));
            return aloFactory.create(message, () -> ((Acknowledgement)acknowledgement).positive(), arg_0 -> ((Acknowledgement)acknowledgement).negative(arg_0));
        }

        private static <T> NacknowledgerFactory<T> createNacknowledgerFactory(RabbitMQConfig config) {
            Optional<NacknowledgerFactory<T>> nacknowledgerFactory = ReceiveResources.loadNacknowledgerFactory(config, AloRabbitMQReceiver.NACKNOWLEDGER_TYPE_CONFIG, NacknowledgerFactory.class);
            if (nacknowledgerFactory.isPresent()) {
                return nacknowledgerFactory.get();
            }
            Optional<NackStrategy> deprecatedNackStrategy = config.loadEnum(AloRabbitMQReceiver.NACK_STRATEGY_CONFIG, NackStrategy.class);
            if (deprecatedNackStrategy.isPresent()) {
                LOGGER.warn("The configuration rabbitmq.receiver.nack.strategy is deprecated. Use rabbitmq.receiver.nacknowledger.type");
                return (NacknowledgerFactory)deprecatedNackStrategy.map(Enum::name).flatMap(ReceiveResources::newPredefinedNacknowledgerFactory).orElseThrow(() -> new IllegalStateException("Failed to convert NackStrategy to NacknowledgerFactory"));
            }
            return new NacknowledgerFactory.Emit();
        }

        private static <T, N extends NacknowledgerFactory<T>> Optional<NacknowledgerFactory<T>> loadNacknowledgerFactory(RabbitMQConfig config, String key, Class<N> type) {
            return config.loadConfiguredWithPredefinedTypes(key, type, ReceiveResources::newPredefinedNacknowledgerFactory);
        }

        private static <T> Optional<NacknowledgerFactory<T>> newPredefinedNacknowledgerFactory(String typeName) {
            if (typeName.equalsIgnoreCase(AloRabbitMQReceiver.NACKNOWLEDGER_TYPE_EMIT)) {
                return Optional.of(new NacknowledgerFactory.Emit());
            }
            if (typeName.equalsIgnoreCase(AloRabbitMQReceiver.NACKNOWLEDGER_TYPE_REQUEUE)) {
                return Optional.of(new NacknowledgerFactory.Nack(LOGGER, true));
            }
            if (typeName.equalsIgnoreCase(AloRabbitMQReceiver.NACKNOWLEDGER_TYPE_DISCARD)) {
                return Optional.of(new NacknowledgerFactory.Nack(LOGGER, false));
            }
            return Optional.empty();
        }

        private static void ack(AcknowledgableDelivery delivery, Consumer<? super Throwable> errorEmitter) {
            try {
                delivery.ack(false);
            }
            catch (Throwable error) {
                LOGGER.error("Failed to ack", error);
                errorEmitter.accept(error);
            }
        }

        private static void nack(AcknowledgableDelivery delivery, boolean requeue, Consumer<? super Throwable> errorEmitter) {
            try {
                delivery.nack(false, requeue);
            }
            catch (Throwable fatalError) {
                LOGGER.error("Failed to nack", fatalError);
                errorEmitter.accept(fatalError);
            }
        }
    }

    @Deprecated
    public static enum NackStrategy {
        EMIT,
        REQUEUE,
        DISCARD;

    }
}

