/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.kafka;

import io.atleon.core.Alo;
import io.atleon.core.AloFlux;
import io.atleon.core.OrderManagingAcknowledgementOperator;
import io.atleon.kafka.AloConsumerRecordFactory;
import io.atleon.kafka.ConsumerRecordExtraction;
import io.atleon.kafka.DefaultAloConsumerRecordFactory;
import io.atleon.kafka.KafkaConfigSource;
import io.atleon.util.ConfigLoading;
import io.atleon.util.Defaults;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOffset;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverPartition;
import reactor.kafka.receiver.ReceiverRecord;

public class AloKafkaReceiver<K, V> {
    public static final String CONFIG_PREFIX = "kafka.receiver.";
    public static final String BLOCK_REQUEST_ON_PARTITION_POSITIONS_CONFIG = "kafka.receiver.block.request.on.partition.positions";
    public static final String MAX_IN_FLIGHT_PER_SUBSCRIPTION_CONFIG = "kafka.receiver.max.in.flight.per.subscription";
    public static final String AUTO_INCREMENT_CLIENT_ID_CONFIG = "kafka.receiver.auto.increment.client.id";
    public static final String POLL_TIMEOUT_CONFIG = "kafka.receiver.poll.timeout";
    public static final String COMMIT_INTERVAL_CONFIG = "kafka.receiver.commit.interval";
    public static final String MAX_COMMIT_ATTEMPTS_CONFIG = "kafka.receiver.max.commit.attempts";
    public static final String CLOSE_TIMEOUT_CONFIG = "kafka.receiver.close.timeout";
    public static final String ALO_FACTORY_CONFIG = "kafka.receiver.alo.factory";
    private static final boolean DEFAULT_BLOCK_REQUEST_ON_PARTITION_POSITIONS = false;
    private static final long DEFAULT_MAX_IN_FLIGHT_PER_SUBSCRIPTION = 4096L;
    private static final boolean DEFAULT_AUTO_INCREMENT_CLIENT_ID = false;
    private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(100L);
    private static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5L);
    private static final int DEFAULT_MAX_COMMIT_ATTEMPTS = 100;
    private static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.ofSeconds(30L);
    private static final Logger LOGGER = LoggerFactory.getLogger(AloKafkaReceiver.class);
    private static final Map<String, AtomicLong> COUNTS_BY_CLIENT_ID = new ConcurrentHashMap<String, AtomicLong>();
    private static final Map<String, Scheduler> SCHEDULERS_BY_CLIENT_ID = new ConcurrentHashMap<String, Scheduler>();
    private final KafkaConfigSource configSource;

    private AloKafkaReceiver(KafkaConfigSource configSource) {
        this.configSource = configSource;
    }

    public static <K, V> AloKafkaReceiver<K, V> from(KafkaConfigSource configSource) {
        return new AloKafkaReceiver<K, V>(configSource);
    }

    public static <V> AloKafkaReceiver<Object, V> forValues(KafkaConfigSource configSource) {
        return new AloKafkaReceiver(configSource);
    }

    public AloFlux<V> receiveAloValues(String topic) {
        return this.receiveAloValues(Collections.singletonList(topic));
    }

    public AloFlux<V> receiveAloValues(Collection<String> topics) {
        return this.receiveAloRecords(topics).filter(record -> record.value() != null).map(ConsumerRecord::value);
    }

    public AloFlux<ConsumerRecord<K, V>> receiveAloRecords(String topic) {
        return this.receiveAloRecords(Collections.singletonList(topic));
    }

    public AloFlux<ConsumerRecord<K, V>> receiveAloRecords(Collection<String> topics) {
        return (AloFlux)((Mono)this.configSource.create()).flatMapMany(config -> this.receiveRecords((Map<String, Object>)config, topics)).as(AloFlux::wrap);
    }

    private Flux<Alo<ConsumerRecord<K, V>>> receiveRecords(Map<String, Object> config, Collection<String> topics) {
        HashMap<String, Object> options = new HashMap<String, Object>();
        AloKafkaReceiver.loadInto(options, config, BLOCK_REQUEST_ON_PARTITION_POSITIONS_CONFIG, false);
        AloKafkaReceiver.loadInto(options, config, MAX_IN_FLIGHT_PER_SUBSCRIPTION_CONFIG, 4096L);
        AloKafkaReceiver.loadInto(options, config, AUTO_INCREMENT_CLIENT_ID_CONFIG, false);
        AloKafkaReceiver.loadInto(options, config, POLL_TIMEOUT_CONFIG, DEFAULT_POLL_TIMEOUT);
        AloKafkaReceiver.loadInto(options, config, COMMIT_INTERVAL_CONFIG, DEFAULT_COMMIT_INTERVAL);
        AloKafkaReceiver.loadInto(options, config, MAX_COMMIT_ATTEMPTS_CONFIG, 100);
        AloKafkaReceiver.loadInto(options, config, CLOSE_TIMEOUT_CONFIG, DEFAULT_CLOSE_TIMEOUT);
        AloConsumerRecordFactory<K, V> aloFactory = AloKafkaReceiver.createAloFactory(config);
        long maxInFlightPerSubscription = (Long)ConfigLoading.loadOrThrow(options, (String)MAX_IN_FLIGHT_PER_SUBSCRIPTION_CONFIG, Long::valueOf);
        HashMap<String, Object> consumerConfig = new HashMap<String, Object>(config);
        options.keySet().forEach(consumerConfig::remove);
        String clientId = (String)ConfigLoading.loadOrThrow(config, (String)"client.id", Object::toString);
        if (((Boolean)ConfigLoading.loadOrThrow(options, (String)AUTO_INCREMENT_CLIENT_ID_CONFIG, Boolean::valueOf)).booleanValue()) {
            consumerConfig.put("client.id", clientId + "-" + AloKafkaReceiver.nextClientIdCount(clientId));
        }
        CompletableFuture assignedPartitions = (Boolean)ConfigLoading.loadOrThrow(options, (String)BLOCK_REQUEST_ON_PARTITION_POSITIONS_CONFIG, Boolean::valueOf) != false ? new CompletableFuture() : CompletableFuture.completedFuture(Collections.emptyList());
        CompletionStage positions = assignedPartitions.thenAccept(partitions -> partitions.forEach(ReceiverPartition::position));
        ReceiverOptions receiverOptions = ReceiverOptions.create(consumerConfig).subscription(topics).pollTimeout((Duration)ConfigLoading.loadOrThrow(options, (String)POLL_TIMEOUT_CONFIG, Duration::parse)).commitInterval((Duration)ConfigLoading.loadOrThrow(options, (String)COMMIT_INTERVAL_CONFIG, Duration::parse)).maxCommitAttempts(((Integer)ConfigLoading.loadOrThrow(options, (String)MAX_COMMIT_ATTEMPTS_CONFIG, Integer::valueOf)).intValue()).closeTimeout((Duration)ConfigLoading.loadOrThrow(options, (String)CLOSE_TIMEOUT_CONFIG, Duration::parse)).schedulerSupplier(() -> AloKafkaReceiver.schedulerForClient(clientId)).addAssignListener(assignedPartitions::complete);
        return KafkaReceiver.create((ReceiverOptions)receiverOptions).receive().transform(arg_0 -> AloKafkaReceiver.lambda$receiveRecords$4((Future)((Object)positions), arg_0)).transform(records -> this.createAloRecords((Flux<ReceiverRecord<K, V>>)records, aloFactory, maxInFlightPerSubscription));
    }

    private Flux<Alo<ConsumerRecord<K, V>>> createAloRecords(Flux<ReceiverRecord<K, V>> records, AloConsumerRecordFactory<K, V> aloFactory, long maxInFlightPerSubscription) {
        Sinks.Empty sink = Sinks.empty();
        return records.map(record -> aloFactory.create(record, () -> ((ReceiverOffset)record.receiverOffset()).acknowledge(), arg_0 -> ((Sinks.Empty)sink).tryEmitError(arg_0))).mergeWith((Publisher)sink.asMono()).transform(aloRecords -> new OrderManagingAcknowledgementOperator((Publisher)aloRecords, ConsumerRecordExtraction::extractTopicPartition, maxInFlightPerSubscription));
    }

    private static long nextClientIdCount(String clientId) {
        return COUNTS_BY_CLIENT_ID.computeIfAbsent(clientId, key -> new AtomicLong()).incrementAndGet();
    }

    private static Scheduler schedulerForClient(String clientId) {
        return SCHEDULERS_BY_CLIENT_ID.computeIfAbsent(clientId, AloKafkaReceiver::newSchedulerForClient);
    }

    private static Scheduler newSchedulerForClient(String clientId) {
        String schedulerName = AloKafkaReceiver.class.getSimpleName() + "-" + clientId;
        return Schedulers.newBoundedElastic((int)Defaults.THREAD_CAP, (int)Integer.MAX_VALUE, (String)schedulerName);
    }

    private static <K, V> AloConsumerRecordFactory<K, V> createAloFactory(Map<String, Object> config) {
        return ConfigLoading.loadConfigured(config, (String)ALO_FACTORY_CONFIG).orElseGet(DefaultAloConsumerRecordFactory::new);
    }

    private static void loadInto(Map<String, Object> destination, Map<String, Object> source, String key, Object def) {
        destination.put(key, source.getOrDefault(key, def));
    }

    private static <T> Mono<T> blockRequestOn(Future<?> future) {
        return Mono.empty().doOnRequest(requested -> {
            try {
                future.get();
            }
            catch (Exception e) {
                LOGGER.error("Failed to block Request Thread on Future", (Throwable)e);
            }
        });
    }

    private static /* synthetic */ Publisher lambda$receiveRecords$4(Future positions, Flux records) {
        return positions.isDone() ? records : records.mergeWith(AloKafkaReceiver.blockRequestOn(positions));
    }
}

