/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.state;

import io.confluent.csid.utils.KafkaUtils;
import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.internal.InternalRuntimeError;
import io.confluent.parallelconsumer.offsets.EncodingNotSupportedException;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import io.confluent.parallelconsumer.state.PartitionState;
import io.confluent.parallelconsumer.state.ShardManager;
import io.confluent.parallelconsumer.state.WorkContainer;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniSets;

public class PartitionMonitor<K, V>
implements ConsumerRebalanceListener {
    private static final Logger log = LoggerFactory.getLogger(PartitionMonitor.class);
    private double USED_PAYLOAD_THRESHOLD_MULTIPLIER = 0.75;
    private final Consumer<K, V> consumer;
    private final ShardManager<K, V> sm;
    private final Map<TopicPartition, PartitionState<K, V>> partitionStates = new HashMap<TopicPartition, PartitionState<K, V>>();
    private final Map<TopicPartition, Integer> partitionsAssignmentEpochs = new HashMap<TopicPartition, Integer>();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public PartitionState<K, V> getState(TopicPartition tp) {
        PartitionState<K, V> kvPartitionState;
        Map<TopicPartition, PartitionState<K, V>> map = this.partitionStates;
        synchronized (map) {
            kvPartitionState = this.partitionStates.get(tp);
        }
        return kvPartitionState;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        log.debug("Partitions assigned: {}", partitions);
        Map<TopicPartition, PartitionState<K, V>> map = this.partitionStates;
        synchronized (map) {
            for (TopicPartition partition : partitions) {
                if (!this.partitionStates.containsKey(partition)) continue;
                log.warn("New assignment of partition {} which already exists in partition state. Could be a state bug.", (Object)partition);
            }
            this.incrementPartitionAssignmentEpoch(partitions);
            try {
                Set partitionsSet = UniSets.copyOf(partitions);
                OffsetMapCodecManager<K, V> om = new OffsetMapCodecManager<K, V>(this.consumer);
                Map<TopicPartition, PartitionState<K, V>> partitionStates = om.loadOffsetMapForPartition(partitionsSet);
                this.partitionStates.putAll(partitionStates);
            }
            catch (Exception e) {
                log.error("Error in onPartitionsAssigned", (Throwable)e);
                throw e;
            }
        }
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        log.info("Partitions revoked: {}", partitions);
        try {
            this.onPartitionsRemoved(partitions);
        }
        catch (Exception e) {
            log.error("Error in onPartitionsRevoked", (Throwable)e);
            throw e;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void onPartitionsRemoved(Collection<TopicPartition> partitions) {
        Map<TopicPartition, PartitionState<K, V>> map = this.partitionStates;
        synchronized (map) {
            this.incrementPartitionAssignmentEpoch(partitions);
            this.resetOffsetMapAndRemoveWork(partitions);
        }
    }

    public void onPartitionsLost(Collection<TopicPartition> partitions) {
        try {
            log.info("Lost partitions: {}", partitions);
            this.onPartitionsRemoved(partitions);
        }
        catch (Exception e) {
            log.error("Error in onPartitionsLost", (Throwable)e);
            throw e;
        }
    }

    public void onOffsetCommitSuccess(Map<TopicPartition, OffsetAndMetadata> offsetsToSend) {
        offsetsToSend.forEach((tp, meta) -> {
            PartitionState<K, V> partition = this.getState((TopicPartition)tp);
            partition.onOffsetCommitSuccess((OffsetAndMetadata)meta);
        });
    }

    private void resetOffsetMapAndRemoveWork(Collection<TopicPartition> partitions) {
        for (TopicPartition tp : partitions) {
            PartitionState<K, V> partition = this.partitionStates.remove(tp);
            NavigableMap<Long, WorkContainer<K, V>> oldWorkPartitionQueue = partition.getCommitQueues();
            if (oldWorkPartitionQueue != null) {
                this.sm.removeShardsFoundIn(oldWorkPartitionQueue);
                continue;
            }
            log.trace("Removing empty commit queue");
        }
    }

    public int getEpoch(ConsumerRecord<K, V> rec, TopicPartition tp) {
        Integer epoch = this.partitionsAssignmentEpochs.get(tp);
        rec.topic();
        if (epoch == null) {
            throw new InternalRuntimeError(StringUtils.msg("Received message for a partition which is not assigned: {}", rec));
        }
        return epoch;
    }

    public int getEpoch(ConsumerRecord<K, V> rec) {
        TopicPartition tp = KafkaUtils.toTP(rec);
        Integer epoch = this.partitionsAssignmentEpochs.get(tp);
        rec.topic();
        if (epoch == null) {
            throw new InternalRuntimeError(StringUtils.msg("Received message for a partition which is not assigned: {}", rec));
        }
        return epoch;
    }

    private void incrementPartitionAssignmentEpoch(Collection<TopicPartition> partitions) {
        for (TopicPartition partition : partitions) {
            int epoch = this.partitionsAssignmentEpochs.getOrDefault(partition, -1);
            this.partitionsAssignmentEpochs.put(partition, ++epoch);
        }
    }

    boolean checkEpochIsStale(WorkContainer<K, V> workContainer) {
        TopicPartition topicPartitionKey = workContainer.getTopicPartition();
        Integer currentPartitionEpoch = this.partitionsAssignmentEpochs.get(topicPartitionKey);
        int workEpoch = workContainer.getEpoch();
        if (currentPartitionEpoch != workEpoch) {
            log.debug("Epoch mismatch {} vs {} for record {} - were partitions lost? Skipping message - it's already assigned to a different consumer (possibly me).", new Object[]{workEpoch, currentPartitionEpoch, workContainer});
            return true;
        }
        return false;
    }

    private void maybeRaiseHighestSeenOffset(WorkContainer<K, V> wc) {
        this.maybeRaiseHighestSeenOffset(wc.getTopicPartition(), wc.offset());
    }

    public void maybeRaiseHighestSeenOffset(TopicPartition tp, long highWater) {
        this.getState(tp).maybeRaiseHighestSeenOffset(highWater);
    }

    boolean isRecordPreviouslyProcessed(ConsumerRecord<K, V> rec) {
        TopicPartition tp = KafkaUtils.toTP(rec);
        PartitionState<K, V> partition = this.getState(tp);
        if (partition == null) {
            int epoch = this.getEpoch(rec, tp);
            log.error("No state found for partition {}, presuming message never before been processed. Partition epoch: {}", (Object)tp, (Object)epoch);
            return false;
        }
        boolean previouslyProcessed = partition.isRecordPreviouslyProcessed(rec);
        log.trace("Record {} previously seen? {}", (Object)rec.offset(), (Object)previouslyProcessed);
        return previouslyProcessed;
    }

    public boolean isAllowedMoreRecords(TopicPartition tp) {
        return this.getState(tp).isAllowedMoreRecords();
    }

    public boolean hasWorkInCommitQueues() {
        for (PartitionState<K, V> partition : this.partitionStates.values()) {
            if (!partition.hasWorkInCommitQueue()) continue;
            return true;
        }
        return false;
    }

    public long getNumberOfEntriesInPartitionQueues() {
        return this.partitionStates.values().stream().mapToLong(PartitionState::getCommitQueueSize).reduce(Long::sum).orElse(0L);
    }

    private void setPartitionMoreRecordsAllowedToProcess(TopicPartition topicPartitionKey, boolean moreMessagesAllowed) {
        PartitionState<K, V> state = this.getState(topicPartitionKey);
        state.setAllowedMoreRecords(moreMessagesAllowed);
    }

    public Long getHighestSeenOffset(TopicPartition tp) {
        return this.getState(tp).getOffsetHighestSeen();
    }

    public void addWorkContainer(WorkContainer<K, V> wc) {
        this.maybeRaiseHighestSeenOffset(wc);
        TopicPartition tp = wc.getTopicPartition();
        NavigableMap<Long, WorkContainer<Long, WorkContainer<K, V>>> queue = this.getState(tp).getCommitQueues();
        queue.put(wc.offset(), wc);
    }

    public boolean isBlocked(TopicPartition topicPartition) {
        return !this.isAllowedMoreRecords(topicPartition);
    }

    void addEncodedOffsets(Map<TopicPartition, OffsetAndMetadata> offsetsToSend, TopicPartition topicPartitionKey, LinkedHashSet<Long> incompleteOffsets) {
        boolean offsetEncodingNeeded;
        boolean bl = offsetEncodingNeeded = !incompleteOffsets.isEmpty();
        if (offsetEncodingNeeded) {
            OffsetAndMetadata finalOffsetOnly = offsetsToSend.get(topicPartitionKey);
            long offsetOfNextExpectedMessage = finalOffsetOnly == null ? ((Long)incompleteOffsets.iterator().next()).longValue() : finalOffsetOnly.offset();
            OffsetMapCodecManager<K, V> om = new OffsetMapCodecManager<K, V>(this.consumer);
            try {
                OffsetAndMetadata offsetWithExtraMap;
                boolean moreMessagesAllowed;
                PartitionState<K, V> state = this.getState(topicPartitionKey);
                state.setIncompleteOffsets(incompleteOffsets);
                String offsetMapPayload = om.makeOffsetMetadataPayload(offsetOfNextExpectedMessage, state);
                int metaPayloadLength = offsetMapPayload.length();
                double pressureThresholdValue = (double)OffsetMapCodecManager.DefaultMaxMetadataSize * this.USED_PAYLOAD_THRESHOLD_MULTIPLIER;
                if (metaPayloadLength > OffsetMapCodecManager.DefaultMaxMetadataSize) {
                    moreMessagesAllowed = false;
                    offsetWithExtraMap = new OffsetAndMetadata(offsetOfNextExpectedMessage);
                    log.warn("Offset map data too large (size: {}) to fit in metadata payload hard limit of {} - cannot include in commit. Warning: messages might be replayed on rebalance. See kafka.coordinator.group.OffsetConfig#DefaultMaxMetadataSize = {} and issue #47.", new Object[]{metaPayloadLength, OffsetMapCodecManager.DefaultMaxMetadataSize, OffsetMapCodecManager.DefaultMaxMetadataSize});
                } else if ((double)metaPayloadLength > pressureThresholdValue) {
                    moreMessagesAllowed = false;
                    offsetWithExtraMap = new OffsetAndMetadata(offsetOfNextExpectedMessage, offsetMapPayload);
                    log.warn("Payload size {} higher than threshold {}, but still lower than max {}. Will write payload, but will not allow further messages, in order to allow the offset data to shrink (via succeeding messages).", new Object[]{metaPayloadLength, pressureThresholdValue, OffsetMapCodecManager.DefaultMaxMetadataSize});
                } else {
                    moreMessagesAllowed = true;
                    offsetWithExtraMap = new OffsetAndMetadata(offsetOfNextExpectedMessage, offsetMapPayload);
                    log.debug("Payload size {} within threshold {}", (Object)metaPayloadLength, (Object)pressureThresholdValue);
                }
                this.setPartitionMoreRecordsAllowedToProcess(topicPartitionKey, moreMessagesAllowed);
                offsetsToSend.put(topicPartitionKey, offsetWithExtraMap);
            }
            catch (EncodingNotSupportedException e) {
                this.setPartitionMoreRecordsAllowedToProcess(topicPartitionKey, false);
                log.warn("No encodings could be used to encode the offset map, skipping. Warning: messages might be replayed on rebalance.", (Throwable)e);
            }
        } else {
            this.setPartitionMoreRecordsAllowedToProcess(topicPartitionKey, true);
        }
    }

    public boolean isPartitionAssigned(ConsumerRecord<K, V> rec) {
        return this.getState(KafkaUtils.toTP(rec)) != null;
    }

    public void onSuccess(WorkContainer<K, V> wc) {
        this.getState(wc.getTopicPartition()).onSuccess(wc);
    }

    public PartitionMonitor(Consumer<K, V> consumer, ShardManager<K, V> sm) {
        this.consumer = consumer;
        this.sm = sm;
    }

    public double getUSED_PAYLOAD_THRESHOLD_MULTIPLIER() {
        return this.USED_PAYLOAD_THRESHOLD_MULTIPLIER;
    }

    public void setUSED_PAYLOAD_THRESHOLD_MULTIPLIER(double USED_PAYLOAD_THRESHOLD_MULTIPLIER) {
        this.USED_PAYLOAD_THRESHOLD_MULTIPLIER = USED_PAYLOAD_THRESHOLD_MULTIPLIER;
    }

    Map<TopicPartition, PartitionState<K, V>> getPartitionStates() {
        return this.partitionStates;
    }
}

