/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.kafka;

import io.atleon.core.Alo;
import io.atleon.core.AloFlux;
import io.atleon.core.ComposedAlo;
import io.atleon.core.ErrorEmitter;
import io.atleon.kafka.KafkaBoundedReceiver;
import io.atleon.kafka.KafkaConfig;
import io.atleon.kafka.KafkaConfigSource;
import io.atleon.kafka.OffsetCriteria;
import io.atleon.kafka.OffsetRange;
import io.atleon.kafka.ReactiveAdmin;
import io.atleon.kafka.RecordRange;
import io.atleon.kafka.TopicPartitionGroupOffsets;
import io.atleon.util.Consuming;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Locale;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class AloKafkaBoundedReceiver<K, V> {
    private final KafkaConfigSource configSource;

    private AloKafkaBoundedReceiver(KafkaConfigSource configSource) {
        this.configSource = configSource;
    }

    public static <K, V> AloKafkaBoundedReceiver<K, V> create(KafkaConfigSource configSource) {
        return new AloKafkaBoundedReceiver<K, V>(configSource);
    }

    public AloFlux<ConsumerRecord<K, V>> receiveAloRecordsUpTo(String topic, OffsetCriteria maxInclusive) {
        return this.receiveAloRecordsUpTo(Collections.singletonList(topic), maxInclusive);
    }

    public AloFlux<ConsumerRecord<K, V>> receiveAloRecordsUpTo(Collection<String> topics, OffsetCriteria maxInclusive) {
        return (AloFlux)((Mono)this.configSource.create()).flatMapMany(it -> this.receiveAloRecordsUpTo((KafkaConfig)it, topics, maxInclusive)).as(AloFlux::wrap);
    }

    private Flux<Alo<ConsumerRecord<K, V>>> receiveAloRecordsUpTo(KafkaConfig config, Collection<String> topics, OffsetCriteria maxInclusive) {
        return Flux.using(() -> ReactiveAdmin.create(config.nativeProperties()), it -> this.receiveAndCommitRecordsInRange((ReactiveAdmin)it, config, topics, maxInclusive), ReactiveAdmin::close);
    }

    private Flux<Alo<ConsumerRecord<K, V>>> receiveAndCommitRecordsInRange(ReactiveAdmin admin, KafkaConfig config, Collection<String> topics, OffsetCriteria maxInclusive) {
        String groupId = config.loadString("group.id").orElseThrow(() -> new IllegalArgumentException("Must provide Consumer Group ID"));
        OffsetResetStrategy resetStrategy = config.loadString("auto.offset.reset").map(it -> OffsetResetStrategy.valueOf((String)it.toUpperCase(Locale.ROOT))).orElse(OffsetResetStrategy.LATEST);
        return admin.listTopicPartitions(topics).collectList().flatMapMany(it -> admin.listTopicPartitionGroupOffsets(groupId, resetStrategy, (Collection<TopicPartition>)it)).collectMap(TopicPartitionGroupOffsets::topicPartition, it -> AloKafkaBoundedReceiver.toOffsetRange(it.groupOffset(), maxInclusive)).flatMapMany(it -> RecordRange.list(admin, it)).filter(RecordRange::hasNonNegativeLength).sort(Comparator.comparing(RecordRange::topicPartition, AloKafkaBoundedReceiver.topicPartitionComparator())).concatMap(it -> this.receiveAndCommitRecordsInRange(admin, groupId, (RecordRange)it));
    }

    private AloFlux<ConsumerRecord<K, V>> receiveAndCommitRecordsInRange(ReactiveAdmin admin, String groupId, RecordRange recordRange) {
        Map<TopicPartition, Long> offsetsToCommit = Collections.singletonMap(recordRange.topicPartition(), recordRange.maxInclusive() + 1L);
        ErrorEmitter errorEmitter = ErrorEmitter.create();
        Runnable acknowledger = () -> admin.alterRawConsumerGroupOffsets(groupId, offsetsToCommit).subscribe(Consuming.noOp(), arg_0 -> ((ErrorEmitter)errorEmitter).safelyEmit(arg_0), () -> ((ErrorEmitter)errorEmitter).safelyComplete());
        Alo[] aloArray = new Alo[1];
        aloArray[0] = new ComposedAlo((Object)recordRange, acknowledger, arg_0 -> ((ErrorEmitter)errorEmitter).safelyEmit(arg_0));
        return AloFlux.just((Alo[])aloArray).concatMap(KafkaBoundedReceiver.create(this.configSource)::receiveRecordsInRange).transform(arg_0 -> ((ErrorEmitter)errorEmitter).applyTo(arg_0));
    }

    private static OffsetRange toOffsetRange(long rawMinInclusive, OffsetCriteria maxInclusive) {
        return OffsetCriteria.raw(rawMinInclusive).to(maxInclusive);
    }

    private static Comparator<TopicPartition> topicPartitionComparator() {
        return Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition);
    }
}

