/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kafka.spout.internal;

import com.google.common.annotations.VisibleForTesting;
import java.io.Serializable;
import java.util.Comparator;
import java.util.Iterator;
import java.util.NavigableSet;
import java.util.TreeSet;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutMessageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OffsetManager {
    private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = new OffsetComparator();
    private static final Logger LOG = LoggerFactory.getLogger(OffsetManager.class);
    private final TopicPartition tp;
    private final NavigableSet<Long> emittedOffsets = new TreeSet<Long>();
    private final NavigableSet<KafkaSpoutMessageId> ackedMsgs = new TreeSet<KafkaSpoutMessageId>(OFFSET_COMPARATOR);
    private long committedOffset;
    private boolean committed;
    private long latestEmittedOffset;

    public OffsetManager(TopicPartition tp, long initialFetchOffset) {
        this.tp = tp;
        this.committedOffset = initialFetchOffset;
        LOG.debug("Instantiated {}", (Object)this.toString());
    }

    public void addToAckMsgs(KafkaSpoutMessageId msgId) {
        this.ackedMsgs.add(msgId);
    }

    public void addToEmitMsgs(long offset) {
        this.emittedOffsets.add(offset);
        this.latestEmittedOffset = Math.max(this.latestEmittedOffset, offset);
    }

    public int getNumUncommittedOffsets() {
        return this.emittedOffsets.size();
    }

    public long getNthUncommittedOffsetAfterCommittedOffset(int index) {
        Iterator<Long> offsetIter = this.emittedOffsets.iterator();
        for (int i = 0; i < index - 1; ++i) {
            offsetIter.next();
        }
        return offsetIter.next();
    }

    public OffsetAndMetadata findNextCommitOffset(String commitMetadata) {
        boolean found = false;
        long nextCommitOffset = this.committedOffset;
        for (KafkaSpoutMessageId currAckedMsg : this.ackedMsgs) {
            long currOffset = currAckedMsg.offset();
            if (currOffset == nextCommitOffset) {
                found = true;
                nextCommitOffset = currOffset + 1L;
                continue;
            }
            if (currOffset > nextCommitOffset) {
                if (this.emittedOffsets.contains(nextCommitOffset)) {
                    LOG.debug("topic-partition [{}] has non-sequential offset [{}]. It will be processed in a subsequent batch.", (Object)this.tp, (Object)currOffset);
                    break;
                }
                LOG.debug("Processed non-sequential offset. The earliest uncommitted offset is no longer part of the topic. Missing offset: [{}], Processed: [{}]", (Object)nextCommitOffset, (Object)currOffset);
                Long nextEmittedOffset = this.emittedOffsets.ceiling(nextCommitOffset);
                if (nextEmittedOffset != null && currOffset == nextEmittedOffset) {
                    LOG.debug("Found committable offset: [{}] after missing offset: [{}], skipping to the committable offset", (Object)currOffset, (Object)nextCommitOffset);
                    found = true;
                    nextCommitOffset = currOffset + 1L;
                    continue;
                }
                LOG.debug("Topic-partition [{}] has non-sequential offset [{}]. Next offset to commit should be [{}]", new Object[]{this.tp, currOffset, nextCommitOffset});
                break;
            }
            throw new IllegalStateException("The offset [" + currOffset + "] is below the current nextCommitOffset [" + nextCommitOffset + "] for [" + String.valueOf(this.tp) + "]. This should not be possible, and likely indicates a bug in the spout's acking or emit logic.");
        }
        OffsetAndMetadata nextCommitOffsetAndMetadata = null;
        if (found) {
            nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, commitMetadata);
            LOG.debug("Topic-partition [{}] has offsets [{}-{}] ready to be committed. Processing will resume at offset [{}] upon spout restart", new Object[]{this.tp, this.committedOffset, nextCommitOffsetAndMetadata.offset() - 1L, nextCommitOffsetAndMetadata.offset()});
        } else {
            LOG.debug("Topic-partition [{}] has no offsets ready to be committed", (Object)this.tp);
        }
        LOG.trace("{}", (Object)this);
        return nextCommitOffsetAndMetadata;
    }

    public long commit(OffsetAndMetadata committedOffsetAndMeta) {
        this.committed = true;
        long preCommitCommittedOffset = this.committedOffset;
        long numCommittedOffsets = 0L;
        this.committedOffset = committedOffsetAndMeta.offset();
        Iterator<Serializable> iterator = this.ackedMsgs.iterator();
        while (iterator.hasNext() && iterator.next().offset() < committedOffsetAndMeta.offset()) {
            iterator.remove();
            ++numCommittedOffsets;
        }
        iterator = this.emittedOffsets.iterator();
        while (iterator.hasNext() && (Long)iterator.next() < committedOffsetAndMeta.offset()) {
            iterator.remove();
        }
        LOG.trace("{}", (Object)this);
        LOG.debug("Committed [{}] offsets in the range [{}-{}] for topic-partition [{}]. Processing will resume at [{}] upon spout restart", new Object[]{numCommittedOffsets, preCommitCommittedOffset, this.committedOffset - 1L, this.tp, this.committedOffset});
        return numCommittedOffsets;
    }

    public boolean hasCommitted() {
        return this.committed;
    }

    public boolean contains(KafkaSpoutMessageId msgId) {
        return this.ackedMsgs.contains(msgId);
    }

    @VisibleForTesting
    boolean containsEmitted(long offset) {
        return this.emittedOffsets.contains(offset);
    }

    public long getLatestEmittedOffset() {
        return this.latestEmittedOffset;
    }

    public long getCommittedOffset() {
        return this.committedOffset;
    }

    public final String toString() {
        return "OffsetManager{topic-partition=" + String.valueOf(this.tp) + ", committedOffset=" + this.committedOffset + ", emittedOffsets=" + String.valueOf(this.emittedOffsets) + ", ackedMsgs=" + String.valueOf(this.ackedMsgs) + ", latestEmittedOffset=" + this.latestEmittedOffset + "}";
    }

    private static class OffsetComparator
    implements Comparator<KafkaSpoutMessageId> {
        private OffsetComparator() {
        }

        @Override
        public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) {
            return m1.offset() < m2.offset() ? -1 : (m1.offset() == m2.offset() ? 0 : 1);
        }
    }
}

