/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.state;

import io.confluent.parallelconsumer.offsets.NoEncodingPossibleException;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import io.confluent.parallelconsumer.state.PartitionStateManager;
import io.confluent.parallelconsumer.state.ShardManager;
import io.confluent.parallelconsumer.state.WorkContainer;
import java.util.Collections;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionState<K, V> {
    private static final Logger log = LoggerFactory.getLogger(PartitionState.class);
    public static final long KAFKA_OFFSET_ABSENCE = -1L;
    private final TopicPartition tp;
    private final ConcurrentSkipListSet<Long> incompleteOffsets;
    private boolean dirty;
    private long offsetHighestSeen;
    private long offsetHighestSucceeded = -1L;
    private boolean allowedMoreRecords = true;
    private final NavigableMap<Long, WorkContainer<K, V>> commitQueue = new ConcurrentSkipListMap<Long, WorkContainer<K, V>>();

    private NavigableMap<Long, WorkContainer<K, V>> getCommitQueue() {
        return Collections.unmodifiableNavigableMap(this.commitQueue);
    }

    public PartitionState(TopicPartition tp, OffsetMapCodecManager.HighestOffsetAndIncompletes offsetData) {
        this.tp = tp;
        this.offsetHighestSeen = offsetData.getHighestSeenOffset().orElse(-1L);
        this.incompleteOffsets = new ConcurrentSkipListSet<Long>(offsetData.getIncompleteOffsets());
        this.offsetHighestSucceeded = this.offsetHighestSeen;
    }

    private void maybeRaiseHighestSeenOffset(long offset) {
        if (offset >= this.offsetHighestSeen) {
            log.trace("Updating highest seen - was: {} now: {}", (Object)this.offsetHighestSeen, (Object)offset);
            this.offsetHighestSeen = offset;
        }
    }

    public void onOffsetCommitSuccess(OffsetAndMetadata committed) {
        this.setClean();
    }

    private void setClean() {
        this.setDirty(false);
    }

    private void setDirty() {
        this.setDirty(true);
    }

    public boolean isRecordPreviouslyCompleted(ConsumerRecord<K, V> rec) {
        long recOffset = rec.offset();
        if (!this.incompleteOffsets.contains(recOffset)) {
            return recOffset <= this.offsetHighestSeen;
        }
        return false;
    }

    public boolean hasWorkInCommitQueue() {
        return !this.commitQueue.isEmpty();
    }

    public int getCommitQueueSize() {
        return this.commitQueue.size();
    }

    public void onSuccess(WorkContainer<K, V> work) {
        long offset = work.offset();
        WorkContainer removedFromQueue = (WorkContainer)this.commitQueue.remove(work.offset());
        assert (removedFromQueue != null);
        boolean removedFromIncompletes = this.incompleteOffsets.remove(offset);
        assert (removedFromIncompletes);
        this.updateHighestSucceededOffsetSoFar(work);
        this.setDirty();
    }

    public void onFailure(WorkContainer<K, V> work) {
    }

    private void updateHighestSucceededOffsetSoFar(WorkContainer<K, V> work) {
        long highestSucceeded = this.getOffsetHighestSucceeded();
        long thisOffset = work.offset();
        if (thisOffset > highestSucceeded) {
            log.trace("Updating highest completed - was: {} now: {}", (Object)highestSucceeded, (Object)thisOffset);
            this.offsetHighestSucceeded = thisOffset;
        }
    }

    public void addWorkContainer(WorkContainer<K, V> wc) {
        this.maybeRaiseHighestSeenOffset(wc.offset());
        this.commitQueue.put(wc.offset(), wc);
        this.incompleteOffsets.add(wc.offset());
    }

    public boolean isRemoved() {
        return false;
    }

    public Optional<OffsetAndMetadata> getCommitDataIfDirty() {
        if (this.isDirty()) {
            return Optional.of(this.createOffsetAndMetadata());
        }
        return Optional.empty();
    }

    private OffsetAndMetadata createOffsetAndMetadata() {
        Optional<String> payloadOpt = this.tryToEncodeOffsets();
        long nextOffset = this.getNextExpectedPolledOffset();
        return payloadOpt.map(s -> new OffsetAndMetadata(nextOffset, s)).orElseGet(() -> new OffsetAndMetadata(nextOffset));
    }

    private long getNextExpectedPolledOffset() {
        return this.getOffsetHighestSequentialSucceeded() + 1L;
    }

    public Set<Long> getAllIncompleteOffsets() {
        return Collections.unmodifiableSet(this.incompleteOffsets.parallelStream().collect(Collectors.toSet()));
    }

    public Set<Long> getIncompleteOffsetsBelowHighestSucceeded() {
        long highestSucceeded = this.getOffsetHighestSucceeded();
        return Collections.unmodifiableSet(this.incompleteOffsets.parallelStream().filter(x -> x < highestSucceeded).collect(Collectors.toSet()));
    }

    public long getOffsetHighestSequentialSucceeded() {
        if (this.incompleteOffsets.isEmpty()) {
            return this.offsetHighestSeen;
        }
        return this.incompleteOffsets.first() - 1L;
    }

    private Optional<String> tryToEncodeOffsets() {
        if (this.incompleteOffsets.isEmpty()) {
            this.setAllowedMoreRecords(true);
            return Optional.empty();
        }
        try {
            OffsetMapCodecManager om = new OffsetMapCodecManager(null);
            long offsetOfNextExpectedMessage = this.getNextExpectedPolledOffset();
            String offsetMapPayload = om.makeOffsetMetadataPayload(offsetOfNextExpectedMessage, this);
            boolean mustStrip = this.updateBlockFromEncodingResult(offsetMapPayload);
            if (mustStrip) {
                return Optional.empty();
            }
            return Optional.of(offsetMapPayload);
        }
        catch (NoEncodingPossibleException e) {
            this.setAllowedMoreRecords(false);
            log.warn("No encodings could be used to encode the offset map, skipping. Warning: messages might be replayed on rebalance.", (Throwable)e);
            return Optional.empty();
        }
    }

    private boolean updateBlockFromEncodingResult(String offsetMapPayload) {
        int metaPayloadLength = offsetMapPayload.length();
        boolean mustStrip = false;
        if (metaPayloadLength > OffsetMapCodecManager.DefaultMaxMetadataSize) {
            mustStrip = true;
            this.setAllowedMoreRecords(false);
            log.warn("Offset map data too large (size: {}) to fit in metadata payload hard limit of {} - cannot include in commit. Warning: messages might be replayed on rebalance. See kafka.coordinator.group.OffsetConfig#DefaultMaxMetadataSize = {} and issue #47.", new Object[]{metaPayloadLength, OffsetMapCodecManager.DefaultMaxMetadataSize, OffsetMapCodecManager.DefaultMaxMetadataSize});
        } else if ((double)metaPayloadLength > this.getPressureThresholdValue()) {
            this.setAllowedMoreRecords(false);
            log.warn("Payload size {} higher than threshold {}, but still lower than max {}. Will write payload, but will not allow further messages, in order to allow the offset data to shrink (via succeeding messages).", new Object[]{metaPayloadLength, this.getPressureThresholdValue(), OffsetMapCodecManager.DefaultMaxMetadataSize});
        } else {
            this.setAllowedMoreRecords(true);
            log.debug("Payload size {} within threshold {}", (Object)metaPayloadLength, (Object)this.getPressureThresholdValue());
        }
        return mustStrip;
    }

    private double getPressureThresholdValue() {
        return (double)OffsetMapCodecManager.DefaultMaxMetadataSize * PartitionStateManager.getUSED_PAYLOAD_THRESHOLD_MULTIPLIER();
    }

    public void onPartitionsRemoved(ShardManager<K, V> sm) {
        sm.removeAnyShardsReferencedBy(this.getCommitQueue());
    }

    public boolean isBlocked() {
        return !this.isAllowedMoreRecords();
    }

    public String toString() {
        return "PartitionState(tp=" + this.getTp() + ", incompleteOffsets=" + this.incompleteOffsets + ", dirty=" + this.isDirty() + ", offsetHighestSeen=" + this.getOffsetHighestSeen() + ", offsetHighestSucceeded=" + this.getOffsetHighestSucceeded() + ", allowedMoreRecords=" + this.isAllowedMoreRecords() + ")";
    }

    public TopicPartition getTp() {
        return this.tp;
    }

    private void setDirty(boolean dirty) {
        this.dirty = dirty;
    }

    private boolean isDirty() {
        return this.dirty;
    }

    public long getOffsetHighestSeen() {
        return this.offsetHighestSeen;
    }

    public long getOffsetHighestSucceeded() {
        return this.offsetHighestSucceeded;
    }

    boolean isAllowedMoreRecords() {
        return this.allowedMoreRecords;
    }

    private void setAllowedMoreRecords(boolean allowedMoreRecords) {
        this.allowedMoreRecords = allowedMoreRecords;
    }
}

