/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.rabbitmq;

import com.rabbitmq.client.ConnectionFactory;
import io.atleon.core.Alo;
import io.atleon.core.AloFactory;
import io.atleon.core.AloFlux;
import io.atleon.rabbitmq.BodyDeserializer;
import io.atleon.rabbitmq.NacknowledgerFactory;
import io.atleon.rabbitmq.RabbitMQConfig;
import io.atleon.rabbitmq.RabbitMQConfigSource;
import io.atleon.rabbitmq.RabbitMQMessage;
import io.atleon.rabbitmq.ReceivedRabbitMQMessage;
import io.atleon.rabbitmq.SerializedBody;
import java.util.Optional;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.ReceiverOptions;

public class AloRabbitMQReceiver<T> {
    public static final String CONFIG_PREFIX = "rabbitmq.receiver.";
    public static final String QOS_CONFIG = "rabbitmq.receiver.qos";
    public static final String BODY_DESERIALIZER_CONFIG = "rabbitmq.receiver.body.deserializer";
    @Deprecated
    public static final String NACK_STRATEGY_CONFIG = "rabbitmq.receiver.nack.strategy";
    public static final String NACKNOWLEDGER_TYPE_CONFIG = "rabbitmq.receiver.nacknowledger.type";
    public static final String NACKNOWLEDGER_TYPE_EMIT = "emit";
    public static final String NACKNOWLEDGER_TYPE_REQUEUE = "requeue";
    public static final String NACKNOWLEDGER_TYPE_DISCARD = "discard";
    private static final Logger LOGGER = LoggerFactory.getLogger(AloRabbitMQReceiver.class);
    private final Mono<Resources<T>> futureResources;

    private AloRabbitMQReceiver(RabbitMQConfigSource configSource) {
        this.futureResources = ((Mono)configSource.create()).map(Resources::fromConfig).cache();
    }

    public static <T> AloRabbitMQReceiver<T> from(RabbitMQConfigSource configSource) {
        return new AloRabbitMQReceiver<T>(configSource);
    }

    public AloFlux<T> receiveAloBodies(String queue) {
        return this.receiveAloMessages(queue).mapNotNull(RabbitMQMessage::getBody);
    }

    public AloFlux<ReceivedRabbitMQMessage<T>> receiveAloMessages(String queue) {
        return (AloFlux)this.futureResources.flatMapMany(resources -> this.receiveMessages((Resources<T>)resources, queue)).as(AloFlux::wrap);
    }

    private Flux<Alo<ReceivedRabbitMQMessage<T>>> receiveMessages(Resources<T> resources, String queue) {
        Sinks.Empty sink = Sinks.empty();
        return resources.receive(queue, arg_0 -> ((Sinks.Empty)sink).tryEmitError(arg_0)).mergeWith((Publisher)sink.asMono());
    }

    private static final class Resources<T> {
        private final ConnectionFactory connectionFactory;
        private final int qos;
        private final BodyDeserializer<T> bodyDeserializer;
        private final NacknowledgerFactory<T> nacknowledgerFactory;
        private final AloFactory<ReceivedRabbitMQMessage<T>> aloFactory;

        private Resources(ConnectionFactory connectionFactory, int qos, BodyDeserializer<T> bodyDeserializer, NacknowledgerFactory<T> nacknowledgerFactory, AloFactory<ReceivedRabbitMQMessage<T>> aloFactory) {
            this.connectionFactory = connectionFactory;
            this.qos = qos;
            this.bodyDeserializer = bodyDeserializer;
            this.nacknowledgerFactory = nacknowledgerFactory;
            this.aloFactory = aloFactory;
        }

        public static <T> Resources<T> fromConfig(RabbitMQConfig config) {
            return new Resources<T>(config.getConnectionFactory(), config.loadInt(AloRabbitMQReceiver.QOS_CONFIG).orElse(256), config.loadConfiguredOrThrow(AloRabbitMQReceiver.BODY_DESERIALIZER_CONFIG, BodyDeserializer.class), Resources.createNacknowledgerFactory(config), config.loadAloFactory());
        }

        public Flux<Alo<ReceivedRabbitMQMessage<T>>> receive(String queue, Consumer<Throwable> errorEmitter) {
            ReceiverOptions receiverOptions = new ReceiverOptions().connectionFactory(this.connectionFactory);
            ConsumeOptions consumeOptions = new ConsumeOptions().qos(this.qos);
            return new Receiver(receiverOptions).consumeManualAck(queue, consumeOptions).map(delivery -> this.deserialize(queue, (AcknowledgableDelivery)delivery, errorEmitter));
        }

        private Alo<ReceivedRabbitMQMessage<T>> deserialize(String queue, AcknowledgableDelivery delivery, Consumer<Throwable> errorEmitter) {
            SerializedBody body = SerializedBody.ofBytes(delivery.getBody());
            ReceivedRabbitMQMessage<T> message = new ReceivedRabbitMQMessage<T>(queue, delivery.getEnvelope().getExchange(), delivery.getEnvelope().getRoutingKey(), delivery.getProperties(), this.bodyDeserializer.deserialize(body));
            return this.aloFactory.create(message, () -> this.ack(delivery, errorEmitter), this.nacknowledgerFactory.create(message, requeue -> this.nack(delivery, requeue, errorEmitter), errorEmitter));
        }

        private void ack(AcknowledgableDelivery delivery, Consumer<? super Throwable> errorEmitter) {
            try {
                delivery.ack(false);
            }
            catch (Throwable error) {
                LOGGER.error("Failed to ack", error);
                errorEmitter.accept(error);
            }
        }

        private void nack(AcknowledgableDelivery delivery, boolean requeue, Consumer<? super Throwable> errorEmitter) {
            try {
                delivery.nack(false, requeue);
            }
            catch (Throwable fatalError) {
                LOGGER.error("Failed to nack", fatalError);
                errorEmitter.accept(fatalError);
            }
        }

        private static <T> NacknowledgerFactory<T> createNacknowledgerFactory(RabbitMQConfig config) {
            Optional<NacknowledgerFactory<T>> nacknowledgerFactory = Resources.loadNacknowledgerFactory(config, AloRabbitMQReceiver.NACKNOWLEDGER_TYPE_CONFIG, NacknowledgerFactory.class);
            if (nacknowledgerFactory.isPresent()) {
                return nacknowledgerFactory.get();
            }
            Optional<NackStrategy> deprecatedNackStrategy = config.loadParseable(AloRabbitMQReceiver.NACK_STRATEGY_CONFIG, NackStrategy.class, NackStrategy::valueOf);
            if (deprecatedNackStrategy.isPresent()) {
                LOGGER.warn("The configuration rabbitmq.receiver.nack.strategy is deprecated. Use rabbitmq.receiver.nacknowledger.type");
                return (NacknowledgerFactory)deprecatedNackStrategy.map(Enum::name).flatMap(Resources::instantiatePredefinedNacknowledgerFactory).orElseThrow(() -> new IllegalStateException("Failed to convert NackStrategy to NacknowledgerFactory"));
            }
            return new NacknowledgerFactory.Emit();
        }

        private static <T, N extends NacknowledgerFactory<T>> Optional<NacknowledgerFactory<T>> loadNacknowledgerFactory(RabbitMQConfig config, String key, Class<N> type) {
            return config.loadConfiguredWithPredefinedTypes(key, type, Resources::instantiatePredefinedNacknowledgerFactory);
        }

        private static <T> Optional<NacknowledgerFactory<T>> instantiatePredefinedNacknowledgerFactory(String typeName) {
            if (typeName.equalsIgnoreCase(AloRabbitMQReceiver.NACKNOWLEDGER_TYPE_EMIT)) {
                return Optional.of(new NacknowledgerFactory.Emit());
            }
            if (typeName.equalsIgnoreCase(AloRabbitMQReceiver.NACKNOWLEDGER_TYPE_REQUEUE)) {
                return Optional.of(new NacknowledgerFactory.Nack(LOGGER, true));
            }
            if (typeName.equalsIgnoreCase(AloRabbitMQReceiver.NACKNOWLEDGER_TYPE_DISCARD)) {
                return Optional.of(new NacknowledgerFactory.Nack(LOGGER, false));
            }
            return Optional.empty();
        }
    }

    @Deprecated
    public static enum NackStrategy {
        EMIT,
        REQUEUE,
        DISCARD;

    }
}

