/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.kafka;

import io.atleon.kafka.OffsetCriteria;
import io.atleon.kafka.OffsetRange;
import io.atleon.kafka.OffsetRangeProvider;
import io.atleon.kafka.ReactiveAdmin;
import io.atleon.kafka.TopicPartitionGroupOffsets;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

final class RecordRange {
    private final TopicPartition topicPartition;
    private final long minInclusive;
    private final long maxInclusive;

    public RecordRange(TopicPartition topicPartition, long minInclusive, long maxExclusive) {
        this.topicPartition = topicPartition;
        this.minInclusive = minInclusive;
        this.maxInclusive = maxExclusive;
    }

    public static Flux<RecordRange> list(ReactiveAdmin admin, Collection<String> topics, OffsetRangeProvider rangeProvider) {
        return admin.listTopicPartitions(topics).collectMap(Function.identity(), rangeProvider::forTopicPartition).map(it -> RecordRange.sortPresent(it, rangeProvider.topicPartitionComparator())).flatMapMany(it -> RecordRange.list(admin, it));
    }

    public static Flux<RecordRange> list(ReactiveAdmin admin, Map<TopicPartition, OffsetRange> ranges) {
        Map<TopicPartition, OffsetCriteria> minCriteria = ranges.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, it -> ((OffsetRange)it.getValue()).minInclusive()));
        Map<TopicPartition, OffsetCriteria> maxCriteria = ranges.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, it -> ((OffsetRange)it.getValue()).maxInclusive()));
        return Mono.zip(RecordRange.calculateRawOffsets(admin, minCriteria, Extrema.MIN), RecordRange.calculateRawOffsets(admin, maxCriteria, Extrema.MAX), admin.listOffsets(minCriteria.keySet(), OffsetSpec.earliest()), admin.listOffsets(maxCriteria.keySet(), OffsetSpec.latest())).flatMapIterable(it -> RecordRange.createRecordRanges(ranges.keySet(), (Map)it.getT1(), (Map)it.getT2(), (Map)it.getT3(), (Map)it.getT4()));
    }

    public TopicPartition topicPartition() {
        return this.topicPartition;
    }

    public boolean hasNonNegativeLength() {
        return this.maxInclusive - this.minInclusive >= 0L;
    }

    public long minInclusive() {
        return this.minInclusive;
    }

    public long maxInclusive() {
        return this.maxInclusive;
    }

    private static Mono<Map<TopicPartition, Long>> calculateRawOffsets(ReactiveAdmin admin, Map<TopicPartition, OffsetCriteria> criteria, Extrema extrema) {
        Map<Class, Map<TopicPartition, OffsetCriteria>> typedCriteria = criteria.entrySet().stream().collect(Collectors.groupingBy(it -> ((OffsetCriteria)it.getValue()).getClass(), Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
        Map<TopicPartition, Long> rawOffsets = typedCriteria.getOrDefault(OffsetCriteria.Raw.class, Collections.emptyMap()).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, it -> RecordRange.toRawOffset((OffsetCriteria)it.getValue(), extrema)));
        Map<TopicPartition, OffsetCriteria.ConsumerGroup> consumerGroups = typedCriteria.getOrDefault(OffsetCriteria.ConsumerGroup.class, Collections.emptyMap()).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, it -> (OffsetCriteria.ConsumerGroup)it.getValue()));
        Map<TopicPartition, OffsetSpec> timestampSpecs = typedCriteria.getOrDefault(OffsetCriteria.Timestamp.class, Collections.emptyMap()).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, it -> RecordRange.toTimestampSpec((OffsetCriteria)it.getValue(), extrema)));
        Map<TopicPartition, OffsetSpec> earliestSpecs = typedCriteria.getOrDefault(OffsetCriteria.Earliest.class, Collections.emptyMap()).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, __ -> OffsetSpec.earliest()));
        Map<TopicPartition, OffsetSpec> latestSpecs = typedCriteria.getOrDefault(OffsetCriteria.Latest.class, Collections.emptyMap()).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, __ -> OffsetSpec.latest()));
        return Mono.just(rawOffsets).mergeWith(RecordRange.listConsumerGroupOffsets(admin, consumerGroups)).mergeWith(admin.listOffsets(timestampSpecs, offset -> offset == -1L ? Long.MAX_VALUE : offset)).mergeWith(admin.listOffsets(earliestSpecs, offset -> offset + (long)(extrema == Extrema.MAX ? 1 : 0))).mergeWith(admin.listOffsets(latestSpecs, offset -> offset - (long)(extrema == Extrema.MIN ? 1 : 0))).flatMapIterable(Map::entrySet).collectMap(Map.Entry::getKey, Map.Entry::getValue);
    }

    private static Mono<Map<TopicPartition, Long>> listConsumerGroupOffsets(ReactiveAdmin admin, Map<TopicPartition, OffsetCriteria.ConsumerGroup> groups) {
        Map topicPartitions = groups.entrySet().stream().collect(Collectors.groupingBy(it -> Tuples.of((Object)((OffsetCriteria.ConsumerGroup)it.getValue()).groupId(), (Object)((OffsetCriteria.ConsumerGroup)it.getValue()).resetStrategy()), Collectors.mapping(Map.Entry::getKey, Collectors.toList())));
        return Flux.fromIterable(topicPartitions.entrySet()).concatMap(it -> admin.listTopicPartitionGroupOffsets((String)((Tuple2)it.getKey()).getT1(), (OffsetResetStrategy)((Tuple2)it.getKey()).getT2(), (Collection)it.getValue())).collectMap(TopicPartitionGroupOffsets::topicPartition, TopicPartitionGroupOffsets::groupOffset);
    }

    private static List<RecordRange> createRecordRanges(Collection<TopicPartition> topicPartitions, Map<TopicPartition, Long> minOffsets, Map<TopicPartition, Long> maxOffsets, Map<TopicPartition, Long> earliestOffsets, Map<TopicPartition, Long> latestOffsets) {
        return topicPartitions.stream().filter(it -> minOffsets.containsKey(it) && maxOffsets.containsKey(it)).map(topicPartition -> new RecordRange((TopicPartition)topicPartition, Math.max((Long)earliestOffsets.get(topicPartition), (Long)minOffsets.get(topicPartition)), Math.min((Long)latestOffsets.get(topicPartition), (Long)maxOffsets.get(topicPartition)) - 1L)).collect(Collectors.toList());
    }

    private static long toRawOffset(OffsetCriteria endpoint, Extrema extrema) {
        return ((OffsetCriteria.Raw)OffsetCriteria.Raw.class.cast(endpoint)).offset() + (long)(extrema == Extrema.MAX ? 1 : 0);
    }

    private static OffsetSpec toTimestampSpec(OffsetCriteria offsetCriteria, Extrema extrema) {
        OffsetCriteria.Timestamp timestampEndpoint = (OffsetCriteria.Timestamp)OffsetCriteria.Timestamp.class.cast(offsetCriteria);
        return OffsetSpec.forTimestamp((long)(timestampEndpoint.epochMillis() + (long)(extrema == Extrema.MAX ? 1 : 0)));
    }

    private static <K, V> SortedMap<K, V> sortPresent(Map<K, Optional<V>> map, Comparator<? super K> comparator) {
        return map.entrySet().stream().filter(it -> ((Optional)it.getValue()).isPresent()).collect(Collectors.toMap(Map.Entry::getKey, it -> ((Optional)it.getValue()).get(), (l, r) -> l, () -> new TreeMap(comparator)));
    }

    private static enum Extrema {
        MIN,
        MAX;

    }
}

