/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.kafka;

import io.atleon.kafka.KafkaConfig;
import io.atleon.kafka.KafkaConfigSource;
import io.atleon.kafka.OffsetRange;
import io.atleon.kafka.OffsetRangeProvider;
import io.atleon.kafka.ReactiveAdmin;
import io.atleon.kafka.RecordRange;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;

public class KafkaBoundedReceiver<K, V> {
    public static final String CONFIG_PREFIX = "kafka.bounded.receiver";
    public static final String POLL_TIMEOUT_CONFIG = "kafka.bounded.receiverpoll.timeout";
    public static final String CLOSE_TIMEOUT_CONFIG = "kafka.bounded.receiverclose.timeout";
    private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(100L);
    private static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.ofSeconds(30L);
    private final KafkaConfigSource configSource;

    private KafkaBoundedReceiver(KafkaConfigSource configSource) {
        this.configSource = configSource;
    }

    public static <K, V> KafkaBoundedReceiver<K, V> create(KafkaConfigSource configSource) {
        return new KafkaBoundedReceiver<K, V>(configSource);
    }

    public Flux<ConsumerRecord<K, V>> receiveRecords(String topic, OffsetRange offsetRange) {
        return this.receiveRecords(Collections.singletonList(topic), offsetRange);
    }

    public Flux<ConsumerRecord<K, V>> receiveRecords(Collection<String> topics, OffsetRange offsetRange) {
        return this.receiveRecords(topics, OffsetRangeProvider.inOffsetRangeFromAllTopicPartitions(offsetRange));
    }

    public Flux<ConsumerRecord<K, V>> receiveRecords(String topic, OffsetRangeProvider rangeProvider) {
        return this.receiveRecords(Collections.singletonList(topic), rangeProvider);
    }

    public Flux<ConsumerRecord<K, V>> receiveRecords(Collection<String> topics, OffsetRangeProvider rangeProvider) {
        return ((Mono)this.configSource.create()).flatMapMany(it -> KafkaBoundedReceiver.listRecordRanges(it, topics, rangeProvider)).filter(RecordRange::hasNonNegativeLength).concatMap(this::receiveRecordsInRange);
    }

    Flux<ConsumerRecord<K, V>> receiveRecordsInRange(RecordRange recordRange) {
        return ((Mono)this.configSource.create()).flatMapMany(it -> this.receiveRecordsInRange((KafkaConfig)it, recordRange));
    }

    private Flux<ConsumerRecord<K, V>> receiveRecordsInRange(KafkaConfig kafkaConfig, RecordRange recordRange) {
        ReceiverOptions receiverOptions = ReceiverOptions.create(kafkaConfig.nativeProperties()).assignment(Collections.singletonList(recordRange.topicPartition())).pollTimeout(kafkaConfig.loadDuration(POLL_TIMEOUT_CONFIG).orElse(DEFAULT_POLL_TIMEOUT)).closeTimeout(kafkaConfig.loadDuration(CLOSE_TIMEOUT_CONFIG).orElse(DEFAULT_CLOSE_TIMEOUT)).addAssignListener(partitions -> partitions.forEach(it -> it.seek(recordRange.minInclusive())));
        return KafkaReceiver.create((ReceiverOptions)receiverOptions).receive().map(Function.identity()).takeWhile(it -> it.offset() <= recordRange.maxInclusive()).takeUntil(it -> it.offset() == recordRange.maxInclusive());
    }

    private static Flux<RecordRange> listRecordRanges(KafkaConfig config, Collection<String> topics, OffsetRangeProvider rangeProvider) {
        return Flux.using(() -> ReactiveAdmin.create(config.nativeProperties()), it -> RecordRange.list(it, topics, rangeProvider), ReactiveAdmin::close);
    }
}

