/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.kafka;

import io.atleon.core.AcknowledgementQueueMode;
import io.atleon.core.Alo;
import io.atleon.core.AloComponentExtractor;
import io.atleon.core.AloFactory;
import io.atleon.core.AloFactoryConfig;
import io.atleon.core.AloFlux;
import io.atleon.core.AloQueueListener;
import io.atleon.core.AloQueueListenerConfig;
import io.atleon.core.AloQueueingTransformer;
import io.atleon.core.AloSignalListenerFactory;
import io.atleon.core.AloSignalListenerFactoryConfig;
import io.atleon.core.ErrorEmitter;
import io.atleon.kafka.AloKafkaConsumerRecordDecorator;
import io.atleon.kafka.AloKafkaConsumerRecordSignalListenerFactory;
import io.atleon.kafka.AloKafkaQueueListener;
import io.atleon.kafka.ConsumerMutexEnforcer;
import io.atleon.kafka.KafkaConfig;
import io.atleon.kafka.KafkaConfigSource;
import io.atleon.kafka.NacknowledgerFactory;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import reactor.core.observability.SignalListenerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOffset;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.receiver.internals.ConsumerFactory;

public class AloKafkaReceiver<K, V> {
    public static final String CONFIG_PREFIX = "kafka.receiver.";
    public static final String ACKNOWLEDGEMENT_QUEUE_MODE_CONFIG = "kafka.receiver.acknowledgement.queue.mode";
    public static final String NACKNOWLEDGER_TYPE_CONFIG = "kafka.receiver.nacknowledger.type";
    public static final String NACKNOWLEDGER_TYPE_EMIT = "emit";
    public static final String ERROR_EMISSION_TIMEOUT_CONFIG = "kafka.receiver.error.emission.timeout";
    public static final String MAX_IN_FLIGHT_PER_SUBSCRIPTION_CONFIG = "kafka.receiver.max.in.flight.per.subscription";
    public static final String AUTO_INCREMENT_CLIENT_ID_CONFIG = "kafka.receiver.auto.increment.client.id";
    public static final String POLL_TIMEOUT_CONFIG = "kafka.receiver.poll.timeout";
    public static final String COMMIT_INTERVAL_CONFIG = "kafka.receiver.commit.interval";
    public static final String MAX_COMMIT_ATTEMPTS_CONFIG = "kafka.receiver.max.commit.attempts";
    public static final String CLOSE_TIMEOUT_CONFIG = "kafka.receiver.close.timeout";
    public static final String MAX_DELAY_REBALANCE_CONFIG = "kafka.receiver.max.delay.rebalance";
    private static final AcknowledgementQueueMode DEFAULT_ACKNOWLEDGEMENT_QUEUE_MODE = AcknowledgementQueueMode.STRICT;
    private static final long DEFAULT_MAX_IN_FLIGHT_PER_SUBSCRIPTION = 4096L;
    private static final boolean DEFAULT_AUTO_INCREMENT_CLIENT_ID = false;
    private static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.ofSeconds(30L);
    private static final Map<String, AtomicLong> COUNTS_BY_ID = new ConcurrentHashMap<String, AtomicLong>();
    private final KafkaConfigSource configSource;

    private AloKafkaReceiver(KafkaConfigSource configSource) {
        this.configSource = configSource;
    }

    public static <K, V> AloKafkaReceiver<K, V> from(KafkaConfigSource configSource) {
        return AloKafkaReceiver.create(configSource);
    }

    public static <K, V> AloKafkaReceiver<K, V> create(KafkaConfigSource configSource) {
        return new AloKafkaReceiver<K, V>(configSource);
    }

    @Deprecated
    public static <V> AloKafkaReceiver<Object, V> forValues(KafkaConfigSource configSource) {
        return new AloKafkaReceiver(configSource);
    }

    public AloFlux<V> receiveAloValues(String topic) {
        return this.receiveAloValues(Collections.singletonList(topic));
    }

    public AloFlux<V> receiveAloValues(Collection<String> topics) {
        return this.receiveAloRecords(topics).mapNotNull(ConsumerRecord::value);
    }

    public AloFlux<V> receiveAloValues(Pattern topicsPattern) {
        return this.receiveAloRecords(topicsPattern).mapNotNull(ConsumerRecord::value);
    }

    public AloFlux<ConsumerRecord<K, V>> receiveAloRecords(String topic) {
        return this.receiveAloRecords(Collections.singletonList(topic));
    }

    public AloFlux<ConsumerRecord<K, V>> receiveAloRecords(Collection<String> topics) {
        return this.receiveAloRecords((Map<String, Object> consumerConfig) -> ReceiverOptions.create((Map)consumerConfig).subscription(topics));
    }

    public AloFlux<ConsumerRecord<K, V>> receiveAloRecords(Pattern topicsPattern) {
        return this.receiveAloRecords((Map<String, Object> consumerConfig) -> ReceiverOptions.create((Map)consumerConfig).subscription(topicsPattern));
    }

    private AloFlux<ConsumerRecord<K, V>> receiveAloRecords(ReceiverOptionsInitializer<K, V> optionsInitializer) {
        ConsumerMutexEnforcer consumerMutexEnforcer = new ConsumerMutexEnforcer();
        return (AloFlux)((Mono)this.configSource.create()).map(ReceiveResources::new).flatMapMany(resources -> resources.receive(optionsInitializer, consumerMutexEnforcer)).as(AloFlux::wrap);
    }

    private static final class ReceiveResources<K, V> {
        private final KafkaConfig config;
        private final NacknowledgerFactory<K, V> nacknowledgerFactory;
        private final AcknowledgementQueueMode acknowledgementQueueMode;

        public ReceiveResources(KafkaConfig config) {
            this.config = config;
            this.nacknowledgerFactory = ReceiveResources.createNacknowledgerFactory(config);
            this.acknowledgementQueueMode = ReceiveResources.loadAcknowledgementQueueMode(config);
        }

        public Flux<Alo<ConsumerRecord<K, V>>> receive(ReceiverOptionsInitializer<K, V> optionsInitializer, ConsumerMutexEnforcer consumerMutexEnforcer) {
            ErrorEmitter<Alo<ConsumerRecord<K, V>>> errorEmitter = this.newErrorEmitter();
            ReceiverOptions options = this.newReceiverOptions(optionsInitializer);
            ConsumerMutexEnforcer.ProhibitableConsumerFactory consumerFactory = consumerMutexEnforcer.newConsumerFactory();
            return KafkaReceiver.create((ConsumerFactory)consumerFactory, options).receive().transform(this.newAloQueueingTransformer(arg_0 -> errorEmitter.safelyEmit(arg_0))).transform(arg_0 -> errorEmitter.applyTo(arg_0)).transform(this::applySignalListenerFactories).doFinally(__ -> consumerFactory.prohibitFurtherConsumption(options.closeTimeout().multipliedBy(2L)));
        }

        private ErrorEmitter<Alo<ConsumerRecord<K, V>>> newErrorEmitter() {
            Duration timeout = this.config.loadDuration(AloKafkaReceiver.ERROR_EMISSION_TIMEOUT_CONFIG).orElse(ErrorEmitter.DEFAULT_TIMEOUT);
            return ErrorEmitter.create((Duration)timeout);
        }

        private ReceiverOptions<K, V> newReceiverOptions(ReceiverOptionsInitializer<K, V> optionsInitializer) {
            ReceiverOptions<K, V> defaultOptions = optionsInitializer.initialize(this.newConsumerConfig());
            return defaultOptions.pollTimeout(this.config.loadDuration(AloKafkaReceiver.POLL_TIMEOUT_CONFIG).orElse(defaultOptions.pollTimeout())).commitInterval(this.config.loadDuration(AloKafkaReceiver.COMMIT_INTERVAL_CONFIG).orElse(defaultOptions.commitInterval())).maxCommitAttempts(this.config.loadInt(AloKafkaReceiver.MAX_COMMIT_ATTEMPTS_CONFIG).orElse(defaultOptions.maxCommitAttempts()).intValue()).closeTimeout(this.config.loadDuration(AloKafkaReceiver.CLOSE_TIMEOUT_CONFIG).orElse(DEFAULT_CLOSE_TIMEOUT)).maxDelayRebalance(this.loadMaxDelayRebalance().orElse(defaultOptions.maxDelayRebalance()));
        }

        private Map<String, Object> newConsumerConfig() {
            return this.config.modifyAndGetProperties(properties -> {
                properties.keySet().removeIf(key -> key.startsWith(AloKafkaReceiver.CONFIG_PREFIX));
                if (this.config.loadBoolean(AloKafkaReceiver.AUTO_INCREMENT_CLIENT_ID_CONFIG).orElse(false).booleanValue()) {
                    properties.computeIfPresent("client.id", (__, id) -> ReceiveResources.incrementId(id.toString()));
                }
            });
        }

        private Optional<Duration> loadMaxDelayRebalance() {
            Optional<Duration> maxDelayRebalance = this.config.loadDuration(AloKafkaReceiver.MAX_DELAY_REBALANCE_CONFIG);
            return maxDelayRebalance.isPresent() || this.acknowledgementQueueMode == AcknowledgementQueueMode.STRICT ? maxDelayRebalance : Optional.of(Duration.ZERO);
        }

        private AloQueueingTransformer<ReceiverRecord<K, V>, ConsumerRecord<K, V>> newAloQueueingTransformer(Consumer<Throwable> errorEmitter) {
            return AloQueueingTransformer.create(this.newComponentExtractor(errorEmitter)).withGroupExtractor(record -> record.receiverOffset().topicPartition()).withQueueMode(this.acknowledgementQueueMode).withListener(this.loadQueueListener()).withFactory(this.loadAloFactory()).withMaxInFlight(this.loadMaxInFlightPerSubscription());
        }

        private AloComponentExtractor<ReceiverRecord<K, V>, ConsumerRecord<K, V>> newComponentExtractor(Consumer<Throwable> errorEmitter) {
            return AloComponentExtractor.composed(record -> () -> ((ReceiverOffset)record.receiverOffset()).acknowledge(), record -> this.nacknowledgerFactory.create((ConsumerRecord<K, V>)record, errorEmitter), Function.identity());
        }

        private AloQueueListener loadQueueListener() {
            Map<String, Object> listenerConfig = this.config.modifyAndGetProperties(properties -> {});
            return AloQueueListenerConfig.load(listenerConfig, AloKafkaQueueListener.class).orElseGet(AloQueueListener::noOp);
        }

        private AloFactory<ConsumerRecord<K, V>> loadAloFactory() {
            Map<String, Object> factoryConfig = this.config.modifyAndGetProperties(properties -> {});
            return AloFactoryConfig.loadDecorated(factoryConfig, AloKafkaConsumerRecordDecorator.class);
        }

        private long loadMaxInFlightPerSubscription() {
            return this.config.loadLong(AloKafkaReceiver.MAX_IN_FLIGHT_PER_SUBSCRIPTION_CONFIG).orElse(4096L);
        }

        private Flux<Alo<ConsumerRecord<K, V>>> applySignalListenerFactories(Flux<Alo<ConsumerRecord<K, V>>> aloRecords) {
            Map<String, Object> factoryConfig = this.config.modifyAndGetProperties(properties -> {});
            List factories = AloSignalListenerFactoryConfig.loadList(factoryConfig, AloKafkaConsumerRecordSignalListenerFactory.class);
            for (AloSignalListenerFactory factory : factories) {
                aloRecords = aloRecords.tap((SignalListenerFactory)factory);
            }
            return aloRecords;
        }

        private static <K, V> NacknowledgerFactory<K, V> createNacknowledgerFactory(KafkaConfig config) {
            Optional<NacknowledgerFactory<K, V>> nacknowledgerFactory = ReceiveResources.loadNacknowledgerFactory(config, AloKafkaReceiver.NACKNOWLEDGER_TYPE_CONFIG, NacknowledgerFactory.class);
            return nacknowledgerFactory.orElseGet(NacknowledgerFactory.Emit::new);
        }

        private static <K, V, N extends NacknowledgerFactory<K, V>> Optional<NacknowledgerFactory<K, V>> loadNacknowledgerFactory(KafkaConfig config, String key, Class<N> type) {
            return config.loadConfiguredWithPredefinedTypes(key, type, ReceiveResources::newPredefinedNacknowledgerFactory);
        }

        private static <K, V> Optional<NacknowledgerFactory<K, V>> newPredefinedNacknowledgerFactory(String typeName) {
            if (typeName.equalsIgnoreCase(AloKafkaReceiver.NACKNOWLEDGER_TYPE_EMIT)) {
                return Optional.of(new NacknowledgerFactory.Emit());
            }
            return Optional.empty();
        }

        private static AcknowledgementQueueMode loadAcknowledgementQueueMode(KafkaConfig config) {
            return config.loadEnum(AloKafkaReceiver.ACKNOWLEDGEMENT_QUEUE_MODE_CONFIG, AcknowledgementQueueMode.class).orElse(DEFAULT_ACKNOWLEDGEMENT_QUEUE_MODE);
        }

        private static String incrementId(String id) {
            return id + "-" + COUNTS_BY_ID.computeIfAbsent(id, __ -> new AtomicLong()).incrementAndGet();
        }
    }

    private static interface ReceiverOptionsInitializer<K, V> {
        public ReceiverOptions<K, V> initialize(Map<String, Object> var1);
    }
}

