/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.state;

import io.confluent.csid.utils.JavaUtils;
import io.confluent.parallelconsumer.internal.EpochAndRecordsMap;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.metrics.PCMetrics;
import io.confluent.parallelconsumer.metrics.PCMetricsDef;
import io.confluent.parallelconsumer.offsets.NoEncodingPossibleException;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import io.confluent.parallelconsumer.state.PartitionStateManager;
import io.confluent.parallelconsumer.state.ShardManager;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.Tag;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionState<K, V> {
    private static final Logger log = LoggerFactory.getLogger(PartitionState.class);
    public static final long KAFKA_OFFSET_ABSENCE = -1L;
    private final PCModule<K, V> module;
    @NonNull
    private final TopicPartition tp;
    @NonNull
    private ConcurrentSkipListMap<Long, Optional<ConsumerRecord<K, V>>> incompleteOffsets;
    private boolean bootstrapPhase = true;
    private boolean dirty;
    private long offsetHighestSeen;
    private long offsetHighestSucceeded = -1L;
    private boolean allowedMoreRecords = true;
    private final long partitionsAssignmentEpoch;
    private long lastCommittedOffset;
    private Gauge lastCommittedOffsetGauge;
    private Gauge highestSeenOffsetGauge;
    private Gauge highestCompletedOffsetGauge;
    private Gauge highestSequentialSucceededOffsetGauge;
    private Gauge numberOfIncompletesGauge;
    private Gauge ephochGauge;
    private DistributionSummary ratioPayloadUsedDistributionSummary;
    private DistributionSummary ratioMetadataSpaceUsedDistributionSummary;

    public PartitionState(long newEpoch, PCModule<K, V> pcModule, TopicPartition topicPartition, OffsetMapCodecManager.HighestOffsetAndIncompletes offsetData) {
        this.module = pcModule;
        this.tp = topicPartition;
        this.partitionsAssignmentEpoch = newEpoch;
        this.initStateFromOffsetData(offsetData);
        this.initMetrics();
    }

    private void initStateFromOffsetData(OffsetMapCodecManager.HighestOffsetAndIncompletes offsetData) {
        this.offsetHighestSeen = offsetData.getHighestSeenOffset().orElse(-1L);
        this.incompleteOffsets = new ConcurrentSkipListMap();
        offsetData.getIncompleteOffsets().forEach(offset -> this.incompleteOffsets.put((Long)offset, Optional.empty()));
        this.offsetHighestSucceeded = this.offsetHighestSeen;
    }

    private void maybeRaiseHighestSeenOffset(long offset) {
        if (offset >= this.offsetHighestSeen) {
            log.trace("Updating highest seen - was: {} now: {}", (Object)this.offsetHighestSeen, (Object)offset);
            this.offsetHighestSeen = offset;
        }
    }

    public void onOffsetCommitSuccess(OffsetAndMetadata committed) {
        this.lastCommittedOffset = committed.offset();
        this.setClean();
    }

    private void setClean() {
        this.setDirty(false);
    }

    private void setDirty() {
        this.setDirty(true);
    }

    public boolean isRecordPreviouslyCompleted(ConsumerRecord<K, V> rec) {
        long recOffset = rec.offset();
        if (this.incompleteOffsets.containsKey(recOffset)) {
            return false;
        }
        return recOffset <= this.offsetHighestSeen;
    }

    public boolean hasIncompleteOffsets() {
        return !this.incompleteOffsets.isEmpty();
    }

    public int getNumberOfIncompleteOffsets() {
        return this.incompleteOffsets.size();
    }

    public void onSuccess(long offset) {
        boolean removedFromIncompletes;
        boolean bl = removedFromIncompletes = this.incompleteOffsets.remove(offset) != null;
        assert (removedFromIncompletes);
        this.updateHighestSucceededOffsetSoFar(offset);
        this.setDirty();
    }

    public void onFailure(WorkContainer<K, V> work) {
    }

    private void updateHighestSucceededOffsetSoFar(long thisOffset) {
        long highestSucceeded = this.getOffsetHighestSucceeded();
        if (thisOffset > highestSucceeded) {
            log.trace("Updating highest completed - was: {} now: {}", (Object)highestSucceeded, (Object)thisOffset);
            this.offsetHighestSucceeded = thisOffset;
        }
    }

    private boolean epochIsStale(EpochAndRecordsMap.RecordsAndEpoch recordsAndEpoch) {
        long currentPartitionEpoch = this.getPartitionsAssignmentEpoch();
        Long epochOfInboundRecords = recordsAndEpoch.getEpochOfPartitionAtPoll();
        return !Objects.equals(epochOfInboundRecords, currentPartitionEpoch);
    }

    public void maybeRegisterNewPollBatchAsWork(@NonNull EpochAndRecordsMap.RecordsAndEpoch recordsAndEpoch) {
        if (recordsAndEpoch == null) {
            throw new NullPointerException("recordsAndEpoch is marked non-null but is null");
        }
        if (this.epochIsStale(recordsAndEpoch)) {
            log.debug("Inbound record of work has epoch ({}) not matching currently assigned epoch for the applicable partition ({}), skipping", (Object)recordsAndEpoch.getEpochOfPartitionAtPoll(), (Object)this.getPartitionsAssignmentEpoch());
            return;
        }
        this.maybeTruncateOrPruneTrackedOffsets(recordsAndEpoch);
        long epochOfInboundRecords = recordsAndEpoch.getEpochOfPartitionAtPoll();
        List recordPollBatch = recordsAndEpoch.getRecords();
        for (ConsumerRecord aRecord : recordPollBatch) {
            if (this.isRecordPreviouslyCompleted(aRecord)) {
                log.trace("Record previously completed, skipping. offset: {}", (Object)aRecord.offset());
                continue;
            }
            this.getShardManager().addWorkContainer(epochOfInboundRecords, aRecord);
            this.addNewIncompleteRecord(aRecord);
        }
    }

    private ShardManager<K, V> getShardManager() {
        return this.module.workManager().getSm();
    }

    public boolean isPartitionRemovedOrNeverAssigned() {
        return false;
    }

    public void addNewIncompleteRecord(ConsumerRecord<K, V> record) {
        long offset = record.offset();
        this.maybeRaiseHighestSeenOffset(offset);
        this.incompleteOffsets.put(offset, Optional.of(record));
    }

    private void maybeTruncateBelowOrAbove(long bootstrapPolledOffset) {
        boolean pollBelowExpected;
        if (!this.bootstrapPhase) {
            return;
        }
        this.bootstrapPhase = false;
        long expectedBootstrapRecordOffset = this.getOffsetToCommit();
        boolean pollAboveExpected = bootstrapPolledOffset > expectedBootstrapRecordOffset;
        boolean bl = pollBelowExpected = bootstrapPolledOffset < expectedBootstrapRecordOffset;
        if (pollAboveExpected) {
            log.warn("Truncating state - removing records lower than {} from partition {} of topic {}. Offsets have been removed from the partition by the broker or committed offset has been raised. Bootstrap polled {} but expected {} from loaded commit data. Could be caused by record retention or compaction and offset reset policy LATEST.", new Object[]{bootstrapPolledOffset, this.tp.partition(), this.tp.topic(), bootstrapPolledOffset, expectedBootstrapRecordOffset});
            NavigableSet<Long> incompletesToPrune = this.incompleteOffsets.keySet().headSet(bootstrapPolledOffset, false);
            incompletesToPrune.forEach(this.incompleteOffsets::remove);
        } else if (pollBelowExpected) {
            log.warn("Bootstrap polled offset has been reset to an earlier offset ({}) for partition {} of topic {} - truncating state - all records above (including this) will be replayed. Was expecting {} but bootstrap poll was {}. Could be caused by record retention or compaction and offset reset policy EARLIEST.", new Object[]{bootstrapPolledOffset, this.tp.partition(), this.tp.topic(), expectedBootstrapRecordOffset, bootstrapPolledOffset});
            OffsetMapCodecManager.HighestOffsetAndIncompletes offsetData = OffsetMapCodecManager.HighestOffsetAndIncompletes.of();
            this.initStateFromOffsetData(offsetData);
        }
    }

    public boolean isRemoved() {
        return false;
    }

    public Optional<OffsetAndMetadata> getCommitDataIfDirty() {
        return this.isDirty() ? Optional.of(this.createOffsetAndMetadata()) : Optional.empty();
    }

    protected OffsetAndMetadata createOffsetAndMetadata() {
        Optional<String> payloadOpt = this.tryToEncodeOffsets();
        long nextOffset = this.getOffsetToCommit();
        return payloadOpt.map(encodedOffsets -> new OffsetAndMetadata(nextOffset, encodedOffsets)).orElseGet(() -> new OffsetAndMetadata(nextOffset));
    }

    protected long getOffsetToCommit() {
        return this.getOffsetHighestSequentialSucceeded() + 1L;
    }

    public List<Long> getAllIncompleteOffsets() {
        return Collections.unmodifiableList(this.incompleteOffsets.keySet().parallelStream().collect(Collectors.toList()));
    }

    public SortedSet<Long> getIncompleteOffsetsBelowHighestSucceeded() {
        long highestSucceeded = this.getOffsetHighestSucceeded();
        return this.incompleteOffsets.keySet().parallelStream().filter(x -> x < highestSucceeded).collect(JavaUtils.toTreeSet());
    }

    public long getOffsetHighestSequentialSucceeded() {
        boolean incompleteOffsetsWasEmpty;
        long currentOffsetHighestSeen = this.offsetHighestSeen;
        Long firstIncompleteOffset = this.incompleteOffsets.keySet().ceiling(-1L);
        boolean bl = incompleteOffsetsWasEmpty = firstIncompleteOffset == null;
        if (incompleteOffsetsWasEmpty) {
            return currentOffsetHighestSeen;
        }
        return firstIncompleteOffset - 1L;
    }

    private Optional<String> tryToEncodeOffsets() {
        if (this.incompleteOffsets.isEmpty()) {
            this.setAllowedMoreRecords(true);
            return Optional.empty();
        }
        try {
            OffsetMapCodecManager<K, V> om = new OffsetMapCodecManager<K, V>(this.module);
            long offsetOfNextExpectedMessage = this.getOffsetToCommit();
            long offsetRange = this.getOffsetHighestSucceeded() - offsetOfNextExpectedMessage;
            String offsetMapPayload = om.makeOffsetMetadataPayload(offsetOfNextExpectedMessage, this);
            this.ratioPayloadUsedDistributionSummary.record((double)offsetMapPayload.length() / (double)offsetRange);
            this.ratioMetadataSpaceUsedDistributionSummary.record((double)offsetMapPayload.length() / (double)OffsetMapCodecManager.DefaultMaxMetadataSize);
            boolean mustStrip = this.updateBlockFromEncodingResult(offsetMapPayload);
            if (mustStrip) {
                return Optional.empty();
            }
            return Optional.of(offsetMapPayload);
        }
        catch (NoEncodingPossibleException e) {
            this.setAllowedMoreRecords(false);
            log.warn("No encodings could be used to encode the offset map, skipping. Warning: messages might be replayed on rebalance.", (Throwable)e);
            return Optional.empty();
        }
    }

    private boolean updateBlockFromEncodingResult(String offsetMapPayload) {
        int metaPayloadLength = offsetMapPayload.length();
        boolean mustStrip = false;
        if (metaPayloadLength > OffsetMapCodecManager.DefaultMaxMetadataSize) {
            mustStrip = true;
            this.setAllowedMoreRecords(false);
            log.warn("Offset map data too large (size: {}) to fit in metadata payload hard limit of {} - cannot include in commit. Warning: messages might be replayed on rebalance. See kafka.coordinator.group.OffsetConfig#DefaultMaxMetadataSize = {} and issue #47.", new Object[]{metaPayloadLength, OffsetMapCodecManager.DefaultMaxMetadataSize, OffsetMapCodecManager.DefaultMaxMetadataSize});
        } else if ((double)metaPayloadLength > this.getPressureThresholdValue()) {
            this.setAllowedMoreRecords(false);
            log.warn("Payload size {} higher than threshold {}, but still lower than max {}. Will write payload, but will not allow further messages, in order to allow the offset data to shrink (via succeeding messages).", new Object[]{metaPayloadLength, this.getPressureThresholdValue(), OffsetMapCodecManager.DefaultMaxMetadataSize});
        } else {
            if (!this.allowedMoreRecords) {
                this.setAllowedMoreRecords(true);
            }
            log.debug("Payload size {} within threshold {}", (Object)metaPayloadLength, (Object)this.getPressureThresholdValue());
        }
        return mustStrip;
    }

    private double getPressureThresholdValue() {
        return (double)OffsetMapCodecManager.DefaultMaxMetadataSize * PartitionStateManager.getUSED_PAYLOAD_THRESHOLD_MULTIPLIER();
    }

    public void onPartitionsRemoved(ShardManager<K, V> sm) {
        sm.removeAnyShardEntriesReferencedFrom(this.incompleteOffsets.values());
        this.deregisterMetrics();
    }

    public boolean isBlocked() {
        return !this.isAllowedMoreRecords();
    }

    private void maybeTruncateOrPruneTrackedOffsets(EpochAndRecordsMap.RecordsAndEpoch polledRecordBatch) {
        List records = polledRecordBatch.getRecords();
        if (records.isEmpty()) {
            log.warn("Polled an empty batch of records? {}", (Object)polledRecordBatch);
            return;
        }
        long offsetOfLowestRecord = JavaUtils.getFirst(records).get().offset();
        this.maybeTruncateBelowOrAbove(offsetOfLowestRecord);
        Set polledOffsets = records.stream().map(ConsumerRecord::offset).collect(Collectors.toSet());
        long offsetOfHighestRecord = JavaUtils.getLast(records).get().offset();
        ArrayList<Long> offsetsToRemoveFromTracking = new ArrayList<Long>();
        NavigableSet<Long> trackedIncompletesWithinPolledBatch = this.incompleteOffsets.keySet().subSet(offsetOfLowestRecord, true, offsetOfHighestRecord, true);
        for (long trackedIncomplete : trackedIncompletesWithinPolledBatch) {
            boolean incompleteMissingFromPolledRecords = !polledOffsets.contains(trackedIncomplete);
            if (!incompleteMissingFromPolledRecords) continue;
            offsetsToRemoveFromTracking.add(trackedIncomplete);
        }
        if (!offsetsToRemoveFromTracking.isEmpty()) {
            log.warn("Offsets {} have been removed from partition {} (as they were not been returned within a polled batch which should have contained them - batch offset range is {} to {}), so they be removed from tracking state, as they will never be sent again to be retried. This can be caused by PC rebalancing across a partition which has been compacted on offsets above the committed base offset, after initial load and before a rebalance.", new Object[]{offsetsToRemoveFromTracking, this.getTp(), offsetOfLowestRecord, offsetOfHighestRecord});
            offsetsToRemoveFromTracking.forEach(this.incompleteOffsets::remove);
        }
    }

    private boolean isBlockingProgress(WorkContainer<?, ?> workContainer) {
        return workContainer.offset() < this.getOffsetHighestSucceeded();
    }

    public boolean couldBeTakenAsWork(WorkContainer<K, V> workContainer) {
        if (this.checkIfWorkIsStale(workContainer)) {
            log.debug("Work is in queue with stale epoch or no longer assigned. Skipping. Shard it came from will/was removed during partition revocation. WC: {}", workContainer);
            return false;
        }
        if (this.isAllowedMoreRecords()) {
            log.debug("Partition is allowed more records. Taking work. WC: {}", workContainer);
            return true;
        }
        if (this.isBlockingProgress(workContainer)) {
            log.debug("Partition is blocked, but this record is blocking progress. Taking work. WC: {}", workContainer);
            return true;
        }
        log.debug("Not allowed more records for the partition ({}) as set from previous encode run (blocked), that this record ({}) belongs to, due to offset encoding back pressure, is within the encoded payload already (offset lower than highest succeeded, not in flight ({}), continuing on to next container in shardEntry.", new Object[]{workContainer.getTopicPartition(), workContainer.offset(), workContainer.isNotInFlight()});
        return false;
    }

    boolean checkIfWorkIsStale(WorkContainer<?, ?> workContainer) {
        boolean epochMissMatch;
        Long currentPartitionEpoch = this.getPartitionsAssignmentEpoch();
        long workEpoch = workContainer.getEpoch();
        boolean partitionNotAssigned = this.isPartitionRemovedOrNeverAssigned();
        boolean bl = epochMissMatch = currentPartitionEpoch != workEpoch;
        if (epochMissMatch || partitionNotAssigned) {
            log.debug("Epoch mismatch {} vs {} for record {}. Skipping message - it's partition has already assigned to a different consumer.", new Object[]{workEpoch, currentPartitionEpoch, workContainer});
            return true;
        }
        return false;
    }

    private void initMetrics() {
        TopicPartition topicPartition = this.getTp();
        if (topicPartition == null) {
            return;
        }
        Tag[] partitionStateTags = new Tag[]{Tag.of((String)"topic", (String)topicPartition.topic()), Tag.of((String)"partition", (String)String.valueOf(topicPartition.partition()))};
        this.lastCommittedOffsetGauge = PCMetrics.getInstance().gaugeFromMetricDef(PCMetricsDef.PARTITION_LAST_COMMITTED_OFFSET, this, partitionState -> partitionState.lastCommittedOffset, partitionStateTags);
        this.highestSeenOffsetGauge = PCMetrics.getInstance().gaugeFromMetricDef(PCMetricsDef.PARTITION_HIGHEST_SEEN_OFFSET, this, PartitionState::getOffsetHighestSeen, partitionStateTags);
        this.highestCompletedOffsetGauge = PCMetrics.getInstance().gaugeFromMetricDef(PCMetricsDef.PARTITION_HIGHEST_COMPLETED_OFFSET, this, PartitionState::getOffsetHighestSucceeded, partitionStateTags);
        this.highestSequentialSucceededOffsetGauge = PCMetrics.getInstance().gaugeFromMetricDef(PCMetricsDef.PARTITION_HIGHEST_SEQUENTIAL_SUCCEEDED_OFFSET, this, PartitionState::getOffsetHighestSequentialSucceeded, partitionStateTags);
        this.numberOfIncompletesGauge = PCMetrics.getInstance().gaugeFromMetricDef(PCMetricsDef.PARTITION_INCOMPLETE_OFFSETS, this, partitionState -> partitionState.incompleteOffsets.size(), partitionStateTags);
        this.ephochGauge = PCMetrics.getInstance().gaugeFromMetricDef(PCMetricsDef.PARTITION_ASSIGNMENT_EPOCH, this, PartitionState::getPartitionsAssignmentEpoch, partitionStateTags);
        this.ratioMetadataSpaceUsedDistributionSummary = PCMetrics.getInstance().getDistributionSummaryFromMetricDef(PCMetricsDef.METADATA_SPACE_USED, partitionStateTags);
        this.ratioPayloadUsedDistributionSummary = PCMetrics.getInstance().getDistributionSummaryFromMetricDef(PCMetricsDef.PAYLOAD_RATIO_USED, partitionStateTags);
    }

    private void deregisterMetrics() {
        PCMetrics.removeMeter((Meter)this.lastCommittedOffsetGauge);
        PCMetrics.removeMeter((Meter)this.highestSeenOffsetGauge);
        PCMetrics.removeMeter((Meter)this.highestCompletedOffsetGauge);
        PCMetrics.removeMeter((Meter)this.highestSequentialSucceededOffsetGauge);
        PCMetrics.removeMeter((Meter)this.numberOfIncompletesGauge);
        PCMetrics.removeMeter((Meter)this.ephochGauge);
        PCMetrics.removeMeter((Meter)this.ratioMetadataSpaceUsedDistributionSummary);
        PCMetrics.removeMeter((Meter)this.ratioPayloadUsedDistributionSummary);
    }

    public String toString() {
        return "PartitionState(module=" + this.module + ", tp=" + this.getTp() + ", incompleteOffsets=" + this.incompleteOffsets + ", bootstrapPhase=" + this.bootstrapPhase + ", dirty=" + this.isDirty() + ", offsetHighestSeen=" + this.getOffsetHighestSeen() + ", offsetHighestSucceeded=" + this.getOffsetHighestSucceeded() + ", allowedMoreRecords=" + this.isAllowedMoreRecords() + ", partitionsAssignmentEpoch=" + this.getPartitionsAssignmentEpoch() + ", lastCommittedOffset=" + this.lastCommittedOffset + ", lastCommittedOffsetGauge=" + this.lastCommittedOffsetGauge + ", highestSeenOffsetGauge=" + this.highestSeenOffsetGauge + ", highestCompletedOffsetGauge=" + this.highestCompletedOffsetGauge + ", highestSequentialSucceededOffsetGauge=" + this.highestSequentialSucceededOffsetGauge + ", numberOfIncompletesGauge=" + this.numberOfIncompletesGauge + ", ephochGauge=" + this.ephochGauge + ", ratioPayloadUsedDistributionSummary=" + this.ratioPayloadUsedDistributionSummary + ", ratioMetadataSpaceUsedDistributionSummary=" + this.ratioMetadataSpaceUsedDistributionSummary + ")";
    }

    @NonNull
    public TopicPartition getTp() {
        return this.tp;
    }

    private void setDirty(boolean dirty) {
        this.dirty = dirty;
    }

    boolean isDirty() {
        return this.dirty;
    }

    public long getOffsetHighestSeen() {
        return this.offsetHighestSeen;
    }

    public long getOffsetHighestSucceeded() {
        return this.offsetHighestSucceeded;
    }

    boolean isAllowedMoreRecords() {
        return this.allowedMoreRecords;
    }

    private void setAllowedMoreRecords(boolean allowedMoreRecords) {
        this.allowedMoreRecords = allowedMoreRecords;
    }

    public long getPartitionsAssignmentEpoch() {
        return this.partitionsAssignmentEpoch;
    }
}

