/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.kafka;

import io.atleon.kafka.KafkaConfig;
import io.atleon.kafka.KafkaConfigSource;
import io.atleon.kafka.OffsetCriteria;
import io.atleon.kafka.OffsetRange;
import io.atleon.kafka.OffsetRangeProvider;
import io.atleon.kafka.ReactiveAdmin;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;

public class KafkaBoundedReceiver<K, V> {
    public static final String CONFIG_PREFIX = "kafka.bounded.receiver";
    public static final String POLL_TIMEOUT_CONFIG = "kafka.bounded.receiverpoll.timeout";
    public static final String CLOSE_TIMEOUT_CONFIG = "kafka.bounded.receiverclose.timeout";
    private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(100L);
    private static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.ofSeconds(30L);
    private final KafkaConfigSource configSource;

    private KafkaBoundedReceiver(KafkaConfigSource configSource) {
        this.configSource = configSource;
    }

    public static <K, V> KafkaBoundedReceiver<K, V> create(KafkaConfigSource configSource) {
        return new KafkaBoundedReceiver<K, V>(configSource);
    }

    public Flux<ConsumerRecord<K, V>> receiveRecords(String topic, OffsetRange offsetRange) {
        return this.receiveRecords(Collections.singletonList(topic), offsetRange);
    }

    public Flux<ConsumerRecord<K, V>> receiveRecords(Collection<String> topics, OffsetRange offsetRange) {
        return this.receiveRecords(topics, OffsetRangeProvider.inOffsetRangeFromAllTopicPartitions(offsetRange));
    }

    public Flux<ConsumerRecord<K, V>> receiveRecords(String topic, OffsetRangeProvider rangeProvider) {
        return this.receiveRecords(Collections.singletonList(topic), rangeProvider);
    }

    public Flux<ConsumerRecord<K, V>> receiveRecords(Collection<String> topics, OffsetRangeProvider rangeProvider) {
        return ((Mono)this.configSource.create()).flatMapMany(it -> this.receiveRecords((KafkaConfig)it, topics, rangeProvider));
    }

    private Flux<ConsumerRecord<K, V>> receiveRecords(KafkaConfig config, Collection<String> topics, OffsetRangeProvider rangeProvider) {
        Flux recordRanges = Flux.using(() -> ReactiveAdmin.create(config.nativeProperties()), it -> this.listRecordRanges((ReactiveAdmin)it, topics, rangeProvider), ReactiveAdmin::close);
        return recordRanges.filter(RecordRange::hasNonNegativeLength).concatMap(this::receiveRecordsInRange);
    }

    private Flux<RecordRange> listRecordRanges(ReactiveAdmin admin, Collection<String> topics, OffsetRangeProvider rangeProvider) {
        return admin.listTopicPartitions(topics).collectMap(Function.identity(), rangeProvider::forTopicPartition).map(it -> KafkaBoundedReceiver.sortPresent(it, rangeProvider.topicPartitionComparator())).flatMapMany(it -> this.listRecordRanges(admin, (SortedMap<TopicPartition, OffsetRange>)it));
    }

    private Flux<RecordRange> listRecordRanges(ReactiveAdmin admin, SortedMap<TopicPartition, OffsetRange> ranges) {
        Map<TopicPartition, OffsetCriteria> minCriteria = ranges.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, it -> ((OffsetRange)it.getValue()).minInclusive()));
        Map<TopicPartition, OffsetCriteria> maxCriteria = ranges.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, it -> ((OffsetRange)it.getValue()).maxInclusive()));
        return Mono.zip(this.calculateRawOffsets(admin, minCriteria, Extrema.MIN), this.calculateRawOffsets(admin, maxCriteria, Extrema.MAX), admin.listOffsets(minCriteria.keySet(), OffsetSpec.earliest()), admin.listOffsets(maxCriteria.keySet(), OffsetSpec.latest())).flatMapIterable(it -> KafkaBoundedReceiver.createRecordRanges(ranges.keySet(), (Map)it.getT1(), (Map)it.getT2(), (Map)it.getT3(), (Map)it.getT4()));
    }

    private Mono<Map<TopicPartition, Long>> calculateRawOffsets(ReactiveAdmin admin, Map<TopicPartition, OffsetCriteria> criteria, Extrema extrema) {
        Map<Class, Map<TopicPartition, OffsetCriteria>> typedCriteria = criteria.entrySet().stream().collect(Collectors.groupingBy(it -> ((OffsetCriteria)it.getValue()).getClass(), Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
        Map<TopicPartition, Long> rawOffsets = typedCriteria.getOrDefault(OffsetCriteria.Raw.class, Collections.emptyMap()).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, it -> KafkaBoundedReceiver.toRawOffset((OffsetCriteria)it.getValue(), extrema)));
        Map<TopicPartition, OffsetSpec> timestampSpecs = typedCriteria.getOrDefault(OffsetCriteria.Timestamp.class, Collections.emptyMap()).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, it -> KafkaBoundedReceiver.toTimestampSpec((OffsetCriteria)it.getValue(), extrema)));
        Map<TopicPartition, OffsetSpec> earliestSpecs = typedCriteria.getOrDefault(OffsetCriteria.Earliest.class, Collections.emptyMap()).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, __ -> OffsetSpec.earliest()));
        Map<TopicPartition, OffsetSpec> latestSpecs = typedCriteria.getOrDefault(OffsetCriteria.Latest.class, Collections.emptyMap()).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, __ -> OffsetSpec.latest()));
        return Mono.just(rawOffsets).mergeWith(admin.listOffsets(timestampSpecs, offset -> offset == -1L ? Long.MAX_VALUE : offset)).mergeWith(admin.listOffsets(earliestSpecs, offset -> offset + (long)(extrema == Extrema.MAX ? 1 : 0))).mergeWith(admin.listOffsets(latestSpecs, offset -> offset - (long)(extrema == Extrema.MIN ? 1 : 0))).flatMapIterable(Map::entrySet).collectMap(Map.Entry::getKey, Map.Entry::getValue);
    }

    private Flux<ConsumerRecord<K, V>> receiveRecordsInRange(RecordRange recordRange) {
        return ((Mono)this.configSource.create()).flatMapMany(it -> this.receiveRecordsInRange((KafkaConfig)it, recordRange));
    }

    private Flux<ConsumerRecord<K, V>> receiveRecordsInRange(KafkaConfig kafkaConfig, RecordRange recordRange) {
        ReceiverOptions receiverOptions = ReceiverOptions.create(kafkaConfig.nativeProperties()).assignment(Collections.singletonList(recordRange.topicPartition())).pollTimeout(kafkaConfig.loadDuration(POLL_TIMEOUT_CONFIG).orElse(DEFAULT_POLL_TIMEOUT)).closeTimeout(kafkaConfig.loadDuration(CLOSE_TIMEOUT_CONFIG).orElse(DEFAULT_CLOSE_TIMEOUT)).addAssignListener(partitions -> partitions.forEach(it -> it.seek(recordRange.minInclusive())));
        return KafkaReceiver.create((ReceiverOptions)receiverOptions).receive().map(Function.identity()).takeWhile(it -> it.offset() <= recordRange.maxInclusive()).takeUntil(it -> it.offset() == recordRange.maxInclusive());
    }

    private static List<RecordRange> createRecordRanges(Collection<TopicPartition> topicPartitions, Map<TopicPartition, Long> minOffsets, Map<TopicPartition, Long> maxOffsets, Map<TopicPartition, Long> earliestOffsets, Map<TopicPartition, Long> latestOffsets) {
        return topicPartitions.stream().map(topicPartition -> new RecordRange((TopicPartition)topicPartition, Math.max((Long)earliestOffsets.get(topicPartition), (Long)minOffsets.get(topicPartition)), Math.min((Long)latestOffsets.get(topicPartition), (Long)maxOffsets.get(topicPartition)) - 1L)).collect(Collectors.toList());
    }

    private static <K, V> SortedMap<K, V> sortPresent(Map<K, Optional<V>> map, Comparator<? super K> comparator) {
        return map.entrySet().stream().filter(it -> ((Optional)it.getValue()).isPresent()).collect(Collectors.toMap(Map.Entry::getKey, it -> ((Optional)it.getValue()).get(), (l, r) -> l, () -> new TreeMap(comparator)));
    }

    private static long toRawOffset(OffsetCriteria endpoint, Extrema extrema) {
        return ((OffsetCriteria.Raw)OffsetCriteria.Raw.class.cast(endpoint)).offset() + (long)(extrema == Extrema.MAX ? 1 : 0);
    }

    private static OffsetSpec toTimestampSpec(OffsetCriteria offsetCriteria, Extrema extrema) {
        OffsetCriteria.Timestamp timestampEndpoint = (OffsetCriteria.Timestamp)OffsetCriteria.Timestamp.class.cast(offsetCriteria);
        return OffsetSpec.forTimestamp((long)(timestampEndpoint.epochMillis() + (long)(extrema == Extrema.MAX ? 1 : 0)));
    }

    private static final class RecordRange {
        private final TopicPartition topicPartition;
        private final long minInclusive;
        private final long maxInclusive;

        public RecordRange(TopicPartition topicPartition, long minInclusive, long maxExclusive) {
            this.topicPartition = topicPartition;
            this.minInclusive = minInclusive;
            this.maxInclusive = maxExclusive;
        }

        public TopicPartition topicPartition() {
            return this.topicPartition;
        }

        public boolean hasNonNegativeLength() {
            return this.maxInclusive - this.minInclusive >= 0L;
        }

        public long minInclusive() {
            return this.minInclusive;
        }

        public long maxInclusive() {
            return this.maxInclusive;
        }
    }

    private static enum Extrema {
        MIN,
        MAX;

    }
}

