/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer;

import io.confluent.csid.utils.KafkaUtils;
import io.confluent.csid.utils.LoopingResumingIterator;
import io.confluent.csid.utils.WallClock;
import io.confluent.parallelconsumer.OffsetMapCodecManager;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.WorkContainer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniSets;

public class WorkManager<K, V>
implements ConsumerRebalanceListener {
    private static final Logger log = LoggerFactory.getLogger(WorkManager.class);
    private final ParallelConsumerOptions options;
    private final Map<Object, NavigableMap<Long, WorkContainer<K, V>>> processingShards = new ConcurrentHashMap<Object, NavigableMap<Long, WorkContainer<K, V>>>();
    private final LinkedBlockingQueue<ConsumerRecords<K, V>> workInbox = new LinkedBlockingQueue();
    private final Map<TopicPartition, NavigableMap<Long, WorkContainer<K, V>>> partitionCommitQueues = new ConcurrentHashMap<TopicPartition, NavigableMap<Long, WorkContainer<K, V>>>();
    private Optional<Object> iterationResumePoint = Optional.empty();
    private int inFlightCount = 0;
    private final int loadingFactor = 3;
    private final List<java.util.function.Consumer<WorkContainer<K, V>>> successfulWorkListeners = new ArrayList<java.util.function.Consumer<WorkContainer<K, V>>>();
    private WallClock clock = new WallClock();
    Consumer<K, V> consumer;
    Map<TopicPartition, TreeSet<Long>> partitionIncompleteOffsets = new HashMap<TopicPartition, TreeSet<Long>>();
    Map<TopicPartition, Long> partitionOffsetHighWaterMarks = new HashMap<TopicPartition, Long>();
    long MISSING_HIGH_WATER_MARK = -1L;

    public WorkManager(ParallelConsumerOptions options, Consumer<K, V> consumer) {
        this.options = options;
        this.consumer = consumer;
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        try {
            log.debug("onPartitionsAssigned: {}", partitions);
            Set partitionsSet = UniSets.copyOf(partitions);
            OffsetMapCodecManager<K, V> om = new OffsetMapCodecManager<K, V>(this, this.consumer);
            om.loadOffsetMapForPartition(partitionsSet);
        }
        catch (Exception e) {
            log.error("Error in onPartitionsAssigned", (Throwable)e);
            throw e;
        }
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        try {
            log.debug("Partitions revoked: {}", partitions);
            this.resetOffsetMapAndRemoveWork(partitions);
        }
        catch (Exception e) {
            log.error("Error in onPartitionsRevoked", (Throwable)e);
            throw e;
        }
    }

    public void onPartitionsLost(Collection<TopicPartition> partitions) {
        try {
            log.warn("Partitions have been lost");
            log.debug("Lost partitions: {}", partitions);
            this.resetOffsetMapAndRemoveWork(partitions);
        }
        catch (Exception e) {
            log.error("Error in onPartitionsLost", (Throwable)e);
            throw e;
        }
    }

    private void resetOffsetMapAndRemoveWork(Collection<TopicPartition> partitions) {
        for (TopicPartition partition : partitions) {
            this.partitionIncompleteOffsets.remove(partition);
            this.partitionOffsetHighWaterMarks.remove(partition);
            NavigableMap<Long, WorkContainer<K, V>> oldWorkPartitionQueue = this.partitionCommitQueues.remove(partition);
            if (oldWorkPartitionQueue != null) {
                this.removeShardsFoundIn(oldWorkPartitionQueue);
                continue;
            }
            log.trace("Removing empty commit queue");
        }
    }

    private void removeShardsFoundIn(NavigableMap<Long, WorkContainer<K, V>> oldWorkPartitionQueue) {
        for (WorkContainer work : oldWorkPartitionQueue.values()) {
            Object key = work.getCr().key();
            this.processingShards.remove(key);
        }
    }

    public void registerWork(List<ConsumerRecords<K, V>> records) {
        for (ConsumerRecords<K, V> record : records) {
            this.registerWork(record);
        }
    }

    public void registerWork(ConsumerRecords<K, V> records) {
        this.workInbox.add(records);
    }

    private void processInbox() {
        ArrayList mail = new ArrayList();
        this.workInbox.drainTo(mail);
        for (ConsumerRecords records : mail) {
            this.processInbox(records);
        }
    }

    private void processInbox(ConsumerRecords<K, V> records) {
        log.debug("Registering {} records of work", (Object)records.count());
        for (ConsumerRecord rec : records) {
            if (this.isRecordPreviouslyProcessed(rec)) {
                log.trace("Record previously processed, skipping. offset: {}", (Object)rec.offset());
                continue;
            }
            Object shardKey = this.computeShardKey(rec);
            long offset = rec.offset();
            WorkContainer wc = new WorkContainer(rec);
            TopicPartition tp = KafkaUtils.toTP(rec);
            this.raisePartitionHighWaterMark(offset, tp);
            this.processingShards.computeIfAbsent(shardKey, ignore -> new ConcurrentSkipListMap()).put(offset, wc);
            this.partitionCommitQueues.computeIfAbsent(tp, ignore -> new ConcurrentSkipListMap()).put(offset, wc);
        }
    }

    void raisePartitionHighWaterMark(long highWater, TopicPartition tp) {
        Long oldHighWaterMark = this.partitionOffsetHighWaterMarks.getOrDefault(tp, this.MISSING_HIGH_WATER_MARK);
        if (highWater >= oldHighWaterMark || highWater == this.MISSING_HIGH_WATER_MARK) {
            this.partitionOffsetHighWaterMarks.put(tp, highWater);
        }
    }

    private boolean isRecordPreviouslyProcessed(ConsumerRecord<K, V> rec) {
        long offset = rec.offset();
        TopicPartition tp = new TopicPartition(rec.topic(), rec.partition());
        TreeSet incompleteOffsets = this.partitionIncompleteOffsets.getOrDefault(tp, new TreeSet());
        if (incompleteOffsets.contains(offset)) {
            return false;
        }
        Long offsetHighWaterMarks = this.partitionOffsetHighWaterMarks.getOrDefault(tp, this.MISSING_HIGH_WATER_MARK);
        return offset < offsetHighWaterMarks;
    }

    private Object computeShardKey(ConsumerRecord<K, V> rec) {
        Object object;
        switch (this.options.getOrdering()) {
            case KEY: {
                object = rec.key();
                break;
            }
            case PARTITION: 
            case UNORDERED: {
                object = new TopicPartition(rec.topic(), rec.partition());
                break;
            }
            default: {
                throw new IncompatibleClassChangeError();
            }
        }
        return object;
    }

    public <R> List<WorkContainer<K, V>> maybeGetWork() {
        return this.maybeGetWork(this.options.getMaxMessagesToQueue());
    }

    public List<WorkContainer<K, V>> maybeGetWork(int requestedMaxWorkToRetrieve) {
        this.processInbox();
        int minWorkToGetSetting = Math.min(Math.min(requestedMaxWorkToRetrieve, this.options.getMaxMessagesToQueue()), this.options.getMaxNumberMessagesBeyondBaseCommitOffset());
        int workToGetDelta = minWorkToGetSetting - this.getInFlightCount();
        if (workToGetDelta < 1) {
            return UniLists.of();
        }
        ArrayList<WorkContainer<K, V>> work = new ArrayList<WorkContainer<K, V>>();
        LoopingResumingIterator<Object, NavigableMap<Long, WorkContainer<K, V>>> it = new LoopingResumingIterator<Object, NavigableMap<Long, WorkContainer<K, V>>>(this.iterationResumePoint, this.processingShards);
        for (Map.Entry<Object, NavigableMap<Long, WorkContainer<K, V>>> entry : it) {
            log.trace("Looking for work on shard: {}", entry.getKey());
            if (work.size() >= workToGetDelta) {
                this.iterationResumePoint = Optional.of(entry.getKey());
                log.debug("Work taken is now over max, stopping (saving iteration resume point {})", this.iterationResumePoint);
                break;
            }
            ArrayList<WorkContainer> shardWork = new ArrayList<WorkContainer>();
            SortedMap shardQueue = entry.getValue();
            Set shardQueueEntries = shardQueue.entrySet();
            for (Map.Entry queueEntry : shardQueueEntries) {
                boolean alreadySucceeded;
                int taken = work.size() + shardWork.size();
                if (taken >= workToGetDelta) {
                    log.trace("Work taken ({}) exceeds max ({})", (Object)taken, (Object)workToGetDelta);
                    break;
                }
                WorkContainer wc = (WorkContainer)queueEntry.getValue();
                boolean bl = alreadySucceeded = !wc.isUserFunctionSucceeded();
                if (wc.hasDelayPassed(this.clock) && wc.isNotInFlight() && alreadySucceeded) {
                    log.trace("Taking {} as work", (Object)wc);
                    wc.takingAsWork();
                    shardWork.add(wc);
                } else {
                    log.trace("Work ({}) still delayed or is in flight, can't take...", (Object)wc);
                }
                ParallelConsumerOptions.ProcessingOrder ordering = this.options.getOrdering();
                if (ordering == ParallelConsumerOptions.ProcessingOrder.UNORDERED) continue;
                log.trace("Processing by {}, so have cannot get more messages on this ({}) shard.", (Object)this.options.getOrdering(), entry.getKey());
                break;
            }
            work.addAll(shardWork);
        }
        log.debug("Got {} records of work", (Object)work.size());
        this.inFlightCount += work.size();
        return work;
    }

    public void success(WorkContainer<K, V> wc) {
        ConsumerRecord<K, V> cr = wc.getCr();
        log.trace("Work success ({}), removing from processing shard queue", wc);
        wc.succeed();
        Object key = this.computeShardKey(cr);
        NavigableMap<Long, WorkContainer<K, V>> shard = this.processingShards.get(key);
        shard.remove(cr.offset());
        boolean keyOrdering = this.options.getOrdering().equals((Object)ParallelConsumerOptions.ProcessingOrder.KEY);
        if (keyOrdering && shard.isEmpty()) {
            log.debug("Removing empty shard (key: {})", key);
            this.processingShards.remove(key);
        }
        this.successfulWorkListeners.forEach(c -> c.accept(wc));
        --this.inFlightCount;
    }

    public void failed(WorkContainer<K, V> wc) {
        wc.fail(this.clock);
        this.putBack(wc);
    }

    private void putBack(WorkContainer<K, V> wc) {
        log.debug("Work FAILED, returning to shard");
        ConsumerRecord<K, V> cr = wc.getCr();
        Object key = this.computeShardKey(cr);
        NavigableMap<Long, WorkContainer<Long, WorkContainer<K, V>>> shard = this.processingShards.get(key);
        long offset = wc.getCr().offset();
        shard.put(offset, wc);
        --this.inFlightCount;
    }

    public int getPartitionWorkRemainingCount() {
        int count = 0;
        for (Map.Entry<TopicPartition, NavigableMap<Long, WorkContainer<K, V>>> e : this.partitionCommitQueues.entrySet()) {
            count += e.getValue().size();
        }
        return count;
    }

    public int getWorkRemainingCount() {
        return this.getMappedShardWorkRemainingCount() + this.workInbox.stream().map(x -> x.count()).reduce(0, (a, b) -> a + b);
    }

    public int getMappedShardWorkRemainingCount() {
        int count = 0;
        for (Map.Entry<Object, NavigableMap<Long, WorkContainer<K, V>>> e : this.processingShards.entrySet()) {
            count += e.getValue().size();
        }
        return count;
    }

    boolean isRecordsAwaitingProcessing() {
        int partitionWorkRemainingCount = this.getMappedShardWorkRemainingCount();
        return partitionWorkRemainingCount > 0 || !this.workInbox.isEmpty();
    }

    boolean isRecordsAwaitingToBeCommitted() {
        int partitionWorkRemainingCount = this.getPartitionWorkRemainingCount();
        return partitionWorkRemainingCount > 0;
    }

    public WorkContainer<K, V> getWorkContainerForRecord(ConsumerRecord<K, V> rec) {
        Object key = this.computeShardKey(rec);
        NavigableMap<Long, WorkContainer<K, V>> longWorkContainerTreeMap = this.processingShards.get(key);
        long offset = rec.offset();
        WorkContainer wc = (WorkContainer)longWorkContainerTreeMap.get(offset);
        return wc;
    }

    Map<TopicPartition, OffsetAndMetadata> findCompletedEligibleOffsetsAndRemove() {
        return this.findCompletedEligibleOffsetsAndRemove(true);
    }

    boolean hasComittableOffsets() {
        return this.findCompletedEligibleOffsetsAndRemove(false).size() != 0;
    }

    <R> Map<TopicPartition, OffsetAndMetadata> findCompletedEligibleOffsetsAndRemove(boolean remove) {
        HashMap<TopicPartition, OffsetAndMetadata> offsetsToSend = new HashMap<TopicPartition, OffsetAndMetadata>();
        int count = 0;
        int removed = 0;
        log.trace("Scanning for in order in-flight work that has completed...");
        int totalOffsetMetaSize = 0;
        for (Map.Entry<TopicPartition, NavigableMap<Long, WorkContainer<K, V>>> partitionQueueEntry : this.partitionCommitQueues.entrySet()) {
            TopicPartition topicPartitionKey = partitionQueueEntry.getKey();
            log.trace("Starting scan of partition: {}", (Object)topicPartitionKey);
            NavigableMap<Long, WorkContainer<K, V>> partitionQueue = partitionQueueEntry.getValue();
            count += partitionQueue.size();
            LinkedList<WorkContainer> workToRemove = new LinkedList<WorkContainer>();
            LinkedHashSet<Long> incompleteOffsets = new LinkedHashSet<Long>();
            boolean iteratedBeyondLowWaterMarkBeingLowestCommittableOffset = false;
            for (Map.Entry offsetAndItsWorkContainer : partitionQueue.entrySet()) {
                WorkContainer container = (WorkContainer)offsetAndItsWorkContainer.getValue();
                boolean complete = container.isUserFunctionComplete();
                long offset = container.getCr().offset();
                if (complete) {
                    if (container.getUserFunctionSucceeded().get().booleanValue() && !iteratedBeyondLowWaterMarkBeingLowestCommittableOffset) {
                        log.trace("Found offset candidate ({}) to add to offset commit map", (Object)container);
                        workToRemove.add(container);
                        long offsetOfNextExpectedMessageToBeCommitted = offset + 1L;
                        OffsetAndMetadata offsetData = new OffsetAndMetadata(offsetOfNextExpectedMessageToBeCommitted);
                        offsetsToSend.put(topicPartitionKey, offsetData);
                        continue;
                    }
                    if (container.getUserFunctionSucceeded().get().booleanValue() && iteratedBeyondLowWaterMarkBeingLowestCommittableOffset) {
                        log.trace("Offset {} is complete and succeeded, but we've iterated past the lowest committable offset. Will mark as complete in the offset map.", (Object)container.getCr().offset());
                        continue;
                    }
                    log.trace("Offset {} is complete, but failed processing. Will track in offset map as not complete. Can't do normal offset commit past this point.", (Object)container.getCr().offset());
                    iteratedBeyondLowWaterMarkBeingLowestCommittableOffset = true;
                    incompleteOffsets.add(offset);
                    continue;
                }
                iteratedBeyondLowWaterMarkBeingLowestCommittableOffset = true;
                log.trace("Offset ({}) is incomplete, holding up the queue ({}) of size {}.", new Object[]{container.getCr().offset(), partitionQueueEntry.getKey(), partitionQueueEntry.getValue().size()});
                incompleteOffsets.add(offset);
            }
            if (!incompleteOffsets.isEmpty()) {
                OffsetAndMetadata finalOffsetOnly = (OffsetAndMetadata)offsetsToSend.get(topicPartitionKey);
                long offsetOfNextExpectedMessage = finalOffsetOnly == null ? ((Long)incompleteOffsets.iterator().next()).longValue() : finalOffsetOnly.offset();
                OffsetMapCodecManager<K, V> om = new OffsetMapCodecManager<K, V>(this, this.consumer);
                String offsetMapPayload = om.makeOffsetMetadataPayload(offsetOfNextExpectedMessage, topicPartitionKey, incompleteOffsets);
                totalOffsetMetaSize += offsetMapPayload.length();
                OffsetAndMetadata offsetWithExtraMap = new OffsetAndMetadata(offsetOfNextExpectedMessage, offsetMapPayload);
                offsetsToSend.put(topicPartitionKey, offsetWithExtraMap);
            }
            if (!remove) continue;
            removed += workToRemove.size();
            for (WorkContainer w : workToRemove) {
                long offset = w.getCr().offset();
                partitionQueue.remove(offset);
            }
        }
        this.maybeStripOffsetPayload(offsetsToSend, totalOffsetMetaSize);
        log.debug("Scan finished, {} were in flight, {} completed offsets removed, coalesced to {} offset(s) ({}) to be committed", new Object[]{count, removed, offsetsToSend.size(), offsetsToSend});
        return offsetsToSend;
    }

    private void maybeStripOffsetPayload(Map<TopicPartition, OffsetAndMetadata> offsetsToSend, int totalOffsetMetaSize) {
        if (totalOffsetMetaSize > 4096) {
            log.warn("Offset map data too large (size: {}) to fit in metadata payload - stripping offset map out. See kafka.coordinator.group.OffsetConfig#DefaultMaxMetadataSize = 4096", (Object)totalOffsetMetaSize);
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsetsToSend.entrySet()) {
                TopicPartition key = entry.getKey();
                OffsetAndMetadata v = entry.getValue();
                OffsetAndMetadata stripped = new OffsetAndMetadata(v.offset(), v.toString());
                offsetsToSend.replace(key, stripped);
            }
        }
    }

    public void onOffsetCommitSuccess(Map<TopicPartition, OffsetAndMetadata> offsetsToSend) {
        offsetsToSend.forEach((tp, meta) -> {
            boolean trackedOffsetsForThisPartitionExist;
            Set offsets = this.partitionIncompleteOffsets.get(tp);
            boolean bl = trackedOffsetsForThisPartitionExist = offsets != null;
            if (trackedOffsetsForThisPartitionExist) {
                long newLowWaterMark = meta.offset();
                offsets.removeIf(offset -> offset < newLowWaterMark);
            }
        });
    }

    public boolean shouldThrottle() {
        return this.isSufficientlyLoaded();
    }

    boolean isSufficientlyLoaded() {
        boolean remainingIsSufficient;
        int remaining = this.getPartitionWorkRemainingCount();
        boolean loadedEnoughInPipeline = remaining > this.options.getMaxMessagesToQueue() * 3;
        boolean overMaxUncommitted = remaining > this.options.getMaxNumberMessagesBeyondBaseCommitOffset();
        boolean bl = remainingIsSufficient = loadedEnoughInPipeline || overMaxUncommitted;
        if (remainingIsSufficient) {
            log.debug("loadedEnoughInPipeline {} || overMaxUncommitted {}", (Object)loadedEnoughInPipeline, (Object)overMaxUncommitted);
        }
        return remainingIsSufficient;
    }

    public int getInFlightCount() {
        return this.inFlightCount;
    }

    public boolean workIsWaitingToBeCompletedSuccessfully() {
        Collection<NavigableMap<Long, WorkContainer<K, V>>> values = this.processingShards.values();
        for (NavigableMap<Long, WorkContainer<K, V>> value : values) {
            if (value.isEmpty()) continue;
            return true;
        }
        return false;
    }

    public boolean hasWorkInFlight() {
        return this.getInFlightCount() != 0;
    }

    public ParallelConsumerOptions getOptions() {
        return this.options;
    }

    List<java.util.function.Consumer<WorkContainer<K, V>>> getSuccessfulWorkListeners() {
        return this.successfulWorkListeners;
    }

    void setClock(WallClock clock) {
        this.clock = clock;
    }
}

