/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.kafka;

import io.atleon.kafka.TopicPartitionGroupOffsets;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.LongUnaryOperator;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

public class ReactiveAdmin
implements Closeable {
    private static final ListOffsetsOptions LIST_OFFSETS_OPTIONS = new ListOffsetsOptions(IsolationLevel.READ_COMMITTED);
    private final Admin admin;

    ReactiveAdmin(Admin admin) {
        this.admin = admin;
    }

    public static ReactiveAdmin create(Map<String, Object> config) {
        return new ReactiveAdmin(Admin.create(config));
    }

    public Mono<Void> alterRawConsumerGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
        Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = offsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, it -> new OffsetAndMetadata(((Long)it.getValue()).longValue())));
        return this.execute(admin -> admin.alterConsumerGroupOffsets(groupId, offsetsAndMetadata).all());
    }

    public Flux<TopicPartitionGroupOffsets> listTopicPartitionGroupOffsets(String groupId) {
        return this.listTopicPartitionGroupOffsets(Collections.singletonList(groupId));
    }

    public Flux<TopicPartitionGroupOffsets> listTopicPartitionGroupOffsets(Collection<String> groupIds) {
        Map offsetSpecs = groupIds.stream().collect(Collectors.toMap(Function.identity(), __ -> new ListConsumerGroupOffsetsSpec()));
        return this.execute(admin -> admin.listConsumerGroupOffsets(offsetSpecs).all()).flatMapMany(this::listTopicPartitionGroupOffsets);
    }

    public Flux<TopicPartitionGroupOffsets> listTopicPartitionGroupOffsets(String groupId, OffsetResetStrategy resetStrategy, Collection<TopicPartition> topicPartitions) {
        return this.listTopicPartitionGroupOffsets(Collections.singletonMap(groupId, resetStrategy), topicPartitions);
    }

    public Flux<TopicPartitionGroupOffsets> listTopicPartitionGroupOffsets(Map<String, OffsetResetStrategy> groupIds, Collection<TopicPartition> topicPartitions) {
        ListConsumerGroupOffsetsSpec offsetsSpec = new ListConsumerGroupOffsetsSpec().topicPartitions(topicPartitions);
        Map offsetSpecs = groupIds.keySet().stream().collect(Collectors.toMap(Function.identity(), __ -> offsetsSpec));
        return Mono.zip(this.execute(admin -> admin.listConsumerGroupOffsets(offsetSpecs).all()), this.listOffsets(topicPartitions, OffsetSpec.earliest()), this.listOffsets(topicPartitions, OffsetSpec.latest())).flatMapIterable(it -> ReactiveAdmin.createTopicPartitionGroupOffsets(groupIds, (Map)it.getT1(), (Map)it.getT2(), (Map)it.getT3()));
    }

    public Mono<Map<TopicPartition, Long>> listOffsets(Collection<TopicPartition> topicPartitions, OffsetSpec offsetSpec) {
        return this.listOffsets(topicPartitions.stream().collect(Collectors.toMap(Function.identity(), __ -> offsetSpec)));
    }

    public Mono<Map<TopicPartition, Long>> listOffsets(Map<TopicPartition, OffsetSpec> offsetSpecs) {
        return this.listOffsets(offsetSpecs, LongUnaryOperator.identity());
    }

    public Mono<Map<TopicPartition, Long>> listOffsets(Map<TopicPartition, OffsetSpec> offsetSpecs, LongUnaryOperator adjustment) {
        if (offsetSpecs.isEmpty()) {
            return Mono.empty();
        }
        return this.execute(admin -> admin.listOffsets(offsetSpecs, LIST_OFFSETS_OPTIONS).all()).flatMapIterable(Map::entrySet).collectMap(Map.Entry::getKey, it -> adjustment.applyAsLong(((ListOffsetsResult.ListOffsetsResultInfo)it.getValue()).offset()));
    }

    public Flux<TopicPartition> listTopicPartitions(String topic) {
        return this.listTopicPartitions(Collections.singletonList(topic));
    }

    public Flux<TopicPartition> listTopicPartitions(Collection<String> topics) {
        return this.execute(admin -> admin.describeTopics(topics).allTopicNames()).flatMapIterable(Map::values).flatMapIterable(ReactiveAdmin::extractTopicPartitions);
    }

    @Override
    public void close() {
        this.admin.close();
    }

    private Flux<TopicPartitionGroupOffsets> listTopicPartitionGroupOffsets(Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetsByGroup) {
        Set<TopicPartition> topicPartitions = offsetsByGroup.values().stream().flatMap(it -> it.keySet().stream()).collect(Collectors.toSet());
        return this.listOffsets(topicPartitions, OffsetSpec.latest()).flatMapIterable(it -> ReactiveAdmin.createTopicPartitionGroupOffsets(offsetsByGroup, it));
    }

    private <T> Mono<T> execute(Function<Admin, KafkaFuture<T>> method) {
        return Mono.create((T sink) -> ((KafkaFuture)method.apply(this.admin)).whenComplete(new SinkKafkaBiConsumer(sink)));
    }

    private static List<TopicPartition> extractTopicPartitions(TopicDescription topicDescription) {
        return topicDescription.partitions().stream().map(it -> new TopicPartition(topicDescription.name(), it.partition())).collect(Collectors.toList());
    }

    private static List<TopicPartitionGroupOffsets> createTopicPartitionGroupOffsets(Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetsByGroup, Map<TopicPartition, Long> latestOffsets) {
        return offsetsByGroup.entrySet().stream().flatMap(it -> ReactiveAdmin.createTopicPartitionGroupOffsets((String)it.getKey(), (Map)it.getValue(), latestOffsets).stream()).collect(Collectors.toList());
    }

    private static List<TopicPartitionGroupOffsets> createTopicPartitionGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> groupOffsets, Map<TopicPartition, Long> latestOffsets) {
        return latestOffsets.entrySet().stream().filter(it -> groupOffsets.get(it.getKey()) != null).map(it -> new TopicPartitionGroupOffsets((TopicPartition)it.getKey(), (Long)it.getValue(), groupId, ((OffsetAndMetadata)groupOffsets.get(it.getKey())).offset())).collect(Collectors.toList());
    }

    private static List<TopicPartitionGroupOffsets> createTopicPartitionGroupOffsets(Map<String, OffsetResetStrategy> groupIds, Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetsByGroup, Map<TopicPartition, Long> earliestOffsets, Map<TopicPartition, Long> latestOffsets) {
        ArrayList<TopicPartitionGroupOffsets> result = new ArrayList<TopicPartitionGroupOffsets>();
        for (String groupId : groupIds.keySet()) {
            OffsetResetStrategy resetStrategy = groupIds.get(groupId);
            Map<TopicPartition, OffsetAndMetadata> offsets = offsetsByGroup.getOrDefault(groupId, Collections.emptyMap());
            for (TopicPartition topicPartition : latestOffsets.keySet()) {
                long earliestOffset = earliestOffsets.get(topicPartition);
                long latestOffset = latestOffsets.get(topicPartition);
                ReactiveAdmin.calculateGroupOffset(topicPartition, offsets, resetStrategy, earliestOffset, latestOffset).map(it -> new TopicPartitionGroupOffsets(topicPartition, latestOffset, groupId, (long)it)).ifPresent(result::add);
            }
        }
        return result;
    }

    private static Optional<Long> calculateGroupOffset(TopicPartition topicPartition, Map<TopicPartition, OffsetAndMetadata> committedOffsets, OffsetResetStrategy resetStrategy, long earliestOffset, long latestOffset) {
        OffsetAndMetadata committedOffsetAndMetadata = committedOffsets.get(topicPartition);
        if (committedOffsetAndMetadata != null) {
            return Optional.of(committedOffsetAndMetadata.offset());
        }
        if (resetStrategy == OffsetResetStrategy.EARLIEST) {
            return Optional.of(earliestOffset);
        }
        if (resetStrategy == OffsetResetStrategy.LATEST) {
            return Optional.of(latestOffset);
        }
        return Optional.empty();
    }

    private static final class SinkKafkaBiConsumer<T>
    implements KafkaFuture.BiConsumer<T, Throwable> {
        private final MonoSink<T> sink;

        public SinkKafkaBiConsumer(MonoSink<T> sink) {
            this.sink = sink;
        }

        public void accept(T t, Throwable throwable) {
            if (throwable != null) {
                this.sink.error(throwable);
            }
            if (t != null) {
                this.sink.success(t);
            } else {
                this.sink.success();
            }
        }
    }
}

