/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.state;

import io.confluent.csid.utils.KafkaUtils;
import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.EpochAndRecordsMap;
import io.confluent.parallelconsumer.internal.InternalRuntimeError;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import io.confluent.parallelconsumer.state.PartitionState;
import io.confluent.parallelconsumer.state.RemovedPartitionState;
import io.confluent.parallelconsumer.state.ShardManager;
import io.confluent.parallelconsumer.state.WorkContainer;
import java.time.Clock;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionStateManager<K, V>
implements ConsumerRebalanceListener {
    private static final Logger log = LoggerFactory.getLogger(PartitionStateManager.class);
    public static final double USED_PAYLOAD_THRESHOLD_MULTIPLIER_DEFAULT = 0.75;
    private static double USED_PAYLOAD_THRESHOLD_MULTIPLIER = 0.75;
    private final Consumer<K, V> consumer;
    private final ShardManager<K, V> sm;
    private final ParallelConsumerOptions<K, V> options;
    private final Map<TopicPartition, PartitionState<K, V>> partitionStates = new ConcurrentHashMap<TopicPartition, PartitionState<K, V>>();
    private final Map<TopicPartition, Long> partitionsAssignmentEpochs = new ConcurrentHashMap<TopicPartition, Long>();
    private final Clock clock;

    public PartitionState<K, V> getPartitionState(TopicPartition tp) {
        return this.partitionStates.get(tp);
    }

    public void onPartitionsAssigned(Collection<TopicPartition> assignedPartitions) {
        log.debug("Partitions assigned: {}", assignedPartitions);
        for (TopicPartition partitionAssignment : assignedPartitions) {
            boolean isAlreadyAssigned = this.partitionStates.containsKey(partitionAssignment);
            if (!isAlreadyAssigned) continue;
            PartitionState<K, V> previouslyAssignedState = this.partitionStates.get(partitionAssignment);
            if (previouslyAssignedState.isRemoved()) {
                log.trace("Reassignment of previously revoked partition {} - state: {}", (Object)partitionAssignment, previouslyAssignedState);
                continue;
            }
            log.warn("New assignment of partition which already exists and isn't recorded as removed in partition state. Could be a state bug - was the partition revocation somehow missed, or is this a race? Please file a GH issue. Partition: {}, state: {}", (Object)partitionAssignment, previouslyAssignedState);
        }
        this.incrementPartitionAssignmentEpoch(assignedPartitions);
        try {
            OffsetMapCodecManager<K, V> om = new OffsetMapCodecManager<K, V>(this.consumer);
            Map<TopicPartition, PartitionState<K, V>> partitionStates = om.loadPartitionStateForAssignment(assignedPartitions);
            this.partitionStates.putAll(partitionStates);
        }
        catch (Exception e) {
            log.error("Error in onPartitionsAssigned", (Throwable)e);
            throw e;
        }
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        log.info("Partitions revoked: {}", partitions);
        try {
            this.onPartitionsRemoved(partitions);
        }
        catch (Exception e) {
            log.error("Error in onPartitionsRevoked", (Throwable)e);
            throw e;
        }
    }

    void onPartitionsRemoved(Collection<TopicPartition> partitions) {
        this.incrementPartitionAssignmentEpoch(partitions);
        this.resetOffsetMapAndRemoveWork(partitions);
    }

    public void onPartitionsLost(Collection<TopicPartition> partitions) {
        try {
            log.info("Lost partitions: {}", partitions);
            this.onPartitionsRemoved(partitions);
        }
        catch (Exception e) {
            log.error("Error in onPartitionsLost", (Throwable)e);
            throw e;
        }
    }

    public void onOffsetCommitSuccess(Map<TopicPartition, OffsetAndMetadata> committed) {
        committed.forEach((tp, meta) -> {
            PartitionState<K, V> partition = this.getPartitionState((TopicPartition)tp);
            partition.onOffsetCommitSuccess((OffsetAndMetadata)meta);
        });
    }

    private void resetOffsetMapAndRemoveWork(Collection<TopicPartition> allRemovedPartitions) {
        for (TopicPartition removedPartition : allRemovedPartitions) {
            PartitionState<K, V> partition = this.partitionStates.get(removedPartition);
            this.partitionStates.put(removedPartition, RemovedPartitionState.getSingleton());
            partition.onPartitionsRemoved(this.sm);
        }
    }

    public Long getEpochOfPartitionForRecord(ConsumerRecord<K, V> rec) {
        TopicPartition tp = KafkaUtils.toTopicPartition(rec);
        Long epoch = this.partitionsAssignmentEpochs.get(tp);
        if (epoch == null) {
            throw new InternalRuntimeError(StringUtils.msg("Received message for a partition which is not assigned: {}", rec));
        }
        return epoch;
    }

    public Long getEpochOfPartition(TopicPartition partition) {
        return this.partitionsAssignmentEpochs.get(partition);
    }

    private void incrementPartitionAssignmentEpoch(Collection<TopicPartition> partitions) {
        for (TopicPartition partition : partitions) {
            Long epoch;
            Long l = epoch = this.partitionsAssignmentEpochs.getOrDefault(partition, -1L);
            epoch = epoch + 1L;
            this.partitionsAssignmentEpochs.put(partition, epoch);
        }
    }

    boolean checkIfWorkIsStale(WorkContainer<?, ?> workContainer) {
        boolean epochMissMatch;
        TopicPartition topicPartitionKey = workContainer.getTopicPartition();
        Long currentPartitionEpoch = this.partitionsAssignmentEpochs.get(topicPartitionKey);
        long workEpoch = workContainer.getEpoch();
        boolean partitionNotAssigned = this.isPartitionRemovedOrNeverAssigned(workContainer.getCr());
        boolean bl = epochMissMatch = currentPartitionEpoch != workEpoch;
        if (epochMissMatch || partitionNotAssigned) {
            log.debug("Epoch mismatch {} vs {} for record {}. Skipping message - it's partition has already assigned to a different consumer.", new Object[]{workEpoch, currentPartitionEpoch, workContainer});
            return true;
        }
        return false;
    }

    public boolean isRecordPreviouslyCompleted(ConsumerRecord<K, V> rec) {
        TopicPartition tp = KafkaUtils.toTopicPartition(rec);
        PartitionState<K, V> partitionState = this.getPartitionState(tp);
        boolean previouslyCompleted = partitionState.isRecordPreviouslyCompleted(rec);
        log.trace("Record {} previously completed? {}", (Object)rec.offset(), (Object)previouslyCompleted);
        return previouslyCompleted;
    }

    public boolean isAllowedMoreRecords(TopicPartition tp) {
        PartitionState<K, V> partitionState = this.getPartitionState(tp);
        return partitionState.isAllowedMoreRecords();
    }

    public boolean isAllowedMoreRecords(WorkContainer<?, ?> wc) {
        return this.isAllowedMoreRecords(wc.getTopicPartition());
    }

    public boolean hasWorkInCommitQueues() {
        for (PartitionState<K, V> partition : this.getAssignedPartitions().values()) {
            if (!partition.hasWorkInCommitQueue()) continue;
            return true;
        }
        return false;
    }

    public long getNumberOfEntriesInPartitionQueues() {
        Collection<PartitionState<K, V>> values = this.getAssignedPartitions().values();
        return values.stream().mapToLong(PartitionState::getCommitQueueSize).reduce(Long::sum).orElse(0L);
    }

    public long getHighestSeenOffset(TopicPartition tp) {
        return this.getPartitionState(tp).getOffsetHighestSeen();
    }

    public void addWorkContainer(WorkContainer<K, V> wc) {
        TopicPartition tp = wc.getTopicPartition();
        this.getPartitionState(tp).addWorkContainer(wc);
    }

    public boolean isBlocked(TopicPartition topicPartition) {
        return !this.isAllowedMoreRecords(topicPartition);
    }

    public boolean isPartitionRemovedOrNeverAssigned(ConsumerRecord<?, ?> rec) {
        TopicPartition topicPartition = KafkaUtils.toTopicPartition(rec);
        PartitionState<K, V> partitionState = this.getPartitionState(topicPartition);
        boolean hasNeverBeenAssigned = partitionState == null;
        return hasNeverBeenAssigned || partitionState.isRemoved();
    }

    public void onSuccess(WorkContainer<K, V> wc) {
        PartitionState<K, V> partitionState = this.getPartitionState(wc.getTopicPartition());
        partitionState.onSuccess(wc);
    }

    public void onFailure(WorkContainer<K, V> wc) {
        PartitionState<K, V> partitionState = this.getPartitionState(wc.getTopicPartition());
        partitionState.onFailure(wc);
    }

    void maybeRegisterNewRecordAsWork(EpochAndRecordsMap<K, V> recordsMap) {
        for (TopicPartition partition : recordsMap.partitions()) {
            EpochAndRecordsMap.RecordsAndEpoch recordsList = recordsMap.records(partition);
            Long epochOfInboundRecords = recordsList.getEpochOfPartitionAtPoll();
            for (ConsumerRecord rec : recordsList.getRecords()) {
                this.maybeRegisterNewRecordAsWork(epochOfInboundRecords, rec);
            }
        }
    }

    private void maybeRegisterNewRecordAsWork(Long epochOfInboundRecords, ConsumerRecord<K, V> rec) {
        Long currentPartitionEpoch = this.getEpochOfPartitionForRecord(rec);
        if (Objects.equals(epochOfInboundRecords, currentPartitionEpoch)) {
            if (this.isPartitionRemovedOrNeverAssigned(rec)) {
                log.debug("Record in buffer for a partition no longer assigned. Dropping. TP: {} rec: {}", (Object)KafkaUtils.toTopicPartition(rec), rec);
            }
            if (this.isRecordPreviouslyCompleted(rec)) {
                log.trace("Record previously completed, skipping. offset: {}", (Object)rec.offset());
            } else {
                WorkContainer<K, V> work = new WorkContainer<K, V>(epochOfInboundRecords, rec, this.options.getRetryDelayProvider(), this.clock);
                this.sm.addWorkContainer(work);
                this.addWorkContainer(work);
            }
        } else {
            log.debug("Inbound record of work has epoch ({}) not matching currently assigned epoch for the applicable partition ({}), skipping", (Object)epochOfInboundRecords, (Object)currentPartitionEpoch);
        }
    }

    public Map<TopicPartition, OffsetAndMetadata> collectDirtyCommitData() {
        HashMap<TopicPartition, OffsetAndMetadata> dirties = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (PartitionState state : this.getAssignedPartitions().values()) {
            Optional<OffsetAndMetadata> offsetAndMetadata = state.getCommitDataIfDirty();
            offsetAndMetadata.ifPresent(andMetadata -> dirties.put(state.getTp(), (OffsetAndMetadata)andMetadata));
        }
        return dirties;
    }

    private Map<TopicPartition, PartitionState<K, V>> getAssignedPartitions() {
        return Collections.unmodifiableMap(this.partitionStates.entrySet().stream().filter(e -> !((PartitionState)e.getValue()).isRemoved()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
    }

    public boolean couldBeTakenAsWork(WorkContainer<?, ?> workContainer) {
        if (this.checkIfWorkIsStale(workContainer)) {
            log.debug("Work is in queue with stale epoch or no longer assigned. Skipping. Shard it came from will/was removed during partition revocation. WC: {}", workContainer);
            return false;
        }
        if (this.isAllowedMoreRecords(workContainer)) {
            return true;
        }
        if (this.isBlockingProgress(workContainer)) {
            return true;
        }
        log.debug("Not allowed more records for the partition ({}) as set from previous encode run (blocked), that this record ({}) belongs to, due to offset encoding back pressure, is within the encoded payload already (offset lower than highest succeeded, not in flight ({}), continuing on to next container in shardEntry.", new Object[]{workContainer.getTopicPartition(), workContainer.offset(), workContainer.isNotInFlight()});
        return false;
    }

    private boolean isBlockingProgress(WorkContainer<?, ?> workContainer) {
        PartitionState<K, V> partitionState = this.getPartitionState(workContainer.getTopicPartition());
        return workContainer.offset() < partitionState.getOffsetHighestSucceeded();
    }

    public PartitionStateManager(Consumer<K, V> consumer, ShardManager<K, V> sm, ParallelConsumerOptions<K, V> options, Clock clock) {
        this.consumer = consumer;
        this.sm = sm;
        this.options = options;
        this.clock = clock;
    }

    public static double getUSED_PAYLOAD_THRESHOLD_MULTIPLIER() {
        return USED_PAYLOAD_THRESHOLD_MULTIPLIER;
    }

    public static void setUSED_PAYLOAD_THRESHOLD_MULTIPLIER(double USED_PAYLOAD_THRESHOLD_MULTIPLIER) {
        PartitionStateManager.USED_PAYLOAD_THRESHOLD_MULTIPLIER = USED_PAYLOAD_THRESHOLD_MULTIPLIER;
    }
}

