/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer;

import io.confluent.csid.utils.BackportUtils;
import io.confluent.csid.utils.KafkaUtils;
import io.confluent.csid.utils.LoopingResumingIterator;
import io.confluent.csid.utils.StringUtils;
import io.confluent.csid.utils.WallClock;
import io.confluent.parallelconsumer.DynamicLoadFactor;
import io.confluent.parallelconsumer.EncodingNotSupportedException;
import io.confluent.parallelconsumer.InternalRuntimeError;
import io.confluent.parallelconsumer.OffsetMapCodecManager;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.RateLimiter;
import io.confluent.parallelconsumer.WorkContainer;
import io.confluent.parallelconsumer.WorkMailBoxManager;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniSets;

public class WorkManager<K, V>
implements ConsumerRebalanceListener {
    private static final Logger log = LoggerFactory.getLogger(WorkManager.class);
    public static double USED_PAYLOAD_THRESHOLD_MULTIPLIER = 0.75;
    private final ParallelConsumerOptions options;
    private final Map<Object, NavigableMap<Long, WorkContainer<K, V>>> processingShards = new HashMap<Object, NavigableMap<Long, WorkContainer<K, V>>>();
    private final Map<TopicPartition, NavigableMap<Long, WorkContainer<K, V>>> partitionCommitQueues = new ConcurrentHashMap<TopicPartition, NavigableMap<Long, WorkContainer<K, V>>>();
    private final DynamicLoadFactor dynamicLoadFactor;
    private final WorkMailBoxManager<K, V> wmbm;
    private Optional<Object> iterationResumePoint = Optional.empty();
    private int numberRecordsOutForProcessing = 0;
    private final List<java.util.function.Consumer<WorkContainer<K, V>>> successfulWorkListeners = new ArrayList<java.util.function.Consumer<WorkContainer<K, V>>>();
    private WallClock clock = new WallClock();
    Consumer<K, V> consumer;
    Map<TopicPartition, Set<Long>> partitionIncompleteOffsets = new HashMap<TopicPartition, Set<Long>>();
    Map<TopicPartition, Long> partitionOffsetHighWaterMarks = new HashMap<TopicPartition, Long>();
    long MISSING_HIGH_WATER_MARK = -1L;
    private AtomicBoolean workStateIsDirtyNeedsCommitting = new AtomicBoolean(false);
    Map<TopicPartition, Boolean> partitionMoreRecordsAllowedToProcess = new HashMap<TopicPartition, Boolean>();
    private Map<TopicPartition, Integer> partitionsAssignmentEpochs = new HashMap<TopicPartition, Integer>();
    private final Duration thresholdForTimeSpentInQueueWarning = Duration.ofSeconds(10L);
    private final RateLimiter slowWarningRateLimit = new RateLimiter(5);

    public WorkManager(ParallelConsumerOptions options, Consumer<K, V> consumer) {
        this(options, consumer, new DynamicLoadFactor());
    }

    public WorkManager(ParallelConsumerOptions newOptions, Consumer<K, V> consumer, DynamicLoadFactor dynamicExtraLoadFactor) {
        this.options = newOptions;
        this.consumer = consumer;
        this.dynamicLoadFactor = dynamicExtraLoadFactor;
        this.wmbm = new WorkMailBoxManager();
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        log.debug("Partitions assigned: {}", partitions);
        this.incrementPartitionAssignmentEpoch(partitions);
        for (TopicPartition partition : partitions) {
            this.partitionMoreRecordsAllowedToProcess.putIfAbsent(partition, true);
        }
        try {
            Set partitionsSet = UniSets.copyOf(partitions);
            OffsetMapCodecManager<K, V> om = new OffsetMapCodecManager<K, V>(this, this.consumer);
            om.loadOffsetMapForPartition(partitionsSet);
        }
        catch (Exception e) {
            log.error("Error in onPartitionsAssigned", (Throwable)e);
            throw e;
        }
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        log.info("Partitions revoked: {}", partitions);
        try {
            this.onPartitionsRemoved(partitions);
        }
        catch (Exception e) {
            log.error("Error in onPartitionsRevoked", (Throwable)e);
            throw e;
        }
    }

    private void onPartitionsRemoved(Collection<TopicPartition> partitions) {
        this.incrementPartitionAssignmentEpoch(partitions);
        this.resetOffsetMapAndRemoveWork(partitions);
    }

    public void onPartitionsLost(Collection<TopicPartition> partitions) {
        try {
            log.info("Lost partitions: {}", partitions);
            this.onPartitionsRemoved(partitions);
        }
        catch (Exception e) {
            log.error("Error in onPartitionsLost", (Throwable)e);
            throw e;
        }
    }

    private void resetOffsetMapAndRemoveWork(Collection<TopicPartition> partitions) {
        for (TopicPartition partition : partitions) {
            this.partitionIncompleteOffsets.remove(partition);
            this.partitionOffsetHighWaterMarks.remove(partition);
            NavigableMap<Long, WorkContainer<K, V>> oldWorkPartitionQueue = this.partitionCommitQueues.remove(partition);
            if (oldWorkPartitionQueue != null) {
                this.removeShardsFoundIn(oldWorkPartitionQueue);
                continue;
            }
            log.trace("Removing empty commit queue");
        }
        this.wmbm.onPartitionsRemoved(partitions);
    }

    private void removeShardsFoundIn(NavigableMap<Long, WorkContainer<K, V>> oldWorkPartitionQueue) {
        for (WorkContainer work : oldWorkPartitionQueue.values()) {
            this.removeWorkFromShard(work);
        }
    }

    private void removeWorkFromShard(WorkContainer<K, V> work) {
        Object shardKey = this.computeShardKey(work.getCr());
        log.debug("Removing expired work {} for shard key: {}", work, shardKey);
        this.processingShards.remove(shardKey);
    }

    public void registerWork(List<ConsumerRecords<K, V>> records) {
        for (ConsumerRecords<K, V> record : records) {
            this.registerWork(record);
        }
    }

    public void registerWork(ConsumerRecords<K, V> records) {
        this.wmbm.registerWork(records);
    }

    private void processInbox(int requestedMaxWorkToRetrieve) {
        this.wmbm.processInbox();
        int gap = requestedMaxWorkToRetrieve;
        int taken = 0;
        log.debug("Will attempt to register the requested {} - {} available in internal mailbox", (Object)requestedMaxWorkToRetrieve, (Object)this.wmbm.internalFlattenedMailQueueSize());
        while (taken < gap && !this.wmbm.internalFlattenedMailQueueIsEmpty()) {
            ConsumerRecord<K, V> poll = this.wmbm.internalFlattenedMailQueuePoll();
            boolean takenAsWork = this.processInbox(poll);
            if (!takenAsWork) continue;
            ++taken;
        }
        log.debug("{} new records were registered.", (Object)taken);
    }

    private boolean processInbox(ConsumerRecord<K, V> rec) {
        if (this.isRecordPreviouslyProcessed(rec)) {
            log.trace("Record previously processed, skipping. offset: {}", (Object)rec.offset());
            return false;
        }
        Object shardKey = this.computeShardKey(rec);
        long offset = rec.offset();
        TopicPartition tp = KafkaUtils.toTP(rec);
        Integer currentPartitionEpoch = this.partitionsAssignmentEpochs.get(tp);
        if (currentPartitionEpoch == null) {
            throw new InternalRuntimeError(StringUtils.msg("Received message for a partition which is not assigned: {}", rec));
        }
        WorkContainer<K, V> wc = new WorkContainer<K, V>(currentPartitionEpoch, rec);
        this.raisePartitionHighWaterMark(offset, tp);
        this.processingShards.computeIfAbsent(shardKey, ignore -> new TreeMap()).put(offset, wc);
        this.partitionCommitQueues.computeIfAbsent(tp, ignore -> new ConcurrentSkipListMap()).put(offset, wc);
        return true;
    }

    void raisePartitionHighWaterMark(long highWater, TopicPartition tp) {
        Long oldHighWaterMark = this.partitionOffsetHighWaterMarks.getOrDefault(tp, this.MISSING_HIGH_WATER_MARK);
        if (highWater >= oldHighWaterMark || highWater == this.MISSING_HIGH_WATER_MARK) {
            this.partitionOffsetHighWaterMarks.put(tp, highWater);
        }
    }

    private boolean isRecordPreviouslyProcessed(ConsumerRecord<K, V> rec) {
        Long offsetHighWaterMark;
        long offset = rec.offset();
        TopicPartition tp = new TopicPartition(rec.topic(), rec.partition());
        Set incompleteOffsets = this.partitionIncompleteOffsets.getOrDefault(tp, new TreeSet());
        boolean previouslyProcessed = incompleteOffsets.contains(offset) ? false : offset <= (offsetHighWaterMark = this.partitionOffsetHighWaterMarks.getOrDefault(tp, this.MISSING_HIGH_WATER_MARK));
        log.trace("Record {} previously seen? {}", (Object)rec.offset(), (Object)previouslyProcessed);
        return previouslyProcessed;
    }

    private Object computeShardKey(ConsumerRecord<K, V> rec) {
        Object object;
        switch (this.options.getOrdering()) {
            case KEY: {
                object = rec.key();
                break;
            }
            case PARTITION: 
            case UNORDERED: {
                object = new TopicPartition(rec.topic(), rec.partition());
                break;
            }
            default: {
                throw new IncompatibleClassChangeError();
            }
        }
        return object;
    }

    public <R> List<WorkContainer<K, V>> maybeGetWork() {
        return this.maybeGetWork(Integer.MAX_VALUE);
    }

    public List<WorkContainer<K, V>> maybeGetWork(int requestedMaxWorkToRetrieve) {
        int workToGetDelta = requestedMaxWorkToRetrieve;
        if (workToGetDelta < 1) {
            return UniLists.of();
        }
        int available = this.getWorkQueuedInShardsCount();
        int extraNeededFromInboxToSatisfy = requestedMaxWorkToRetrieve - available;
        log.debug("Requested: {}, available in shards: {}, will try to process from mailbox the delta of: {}", new Object[]{requestedMaxWorkToRetrieve, available, extraNeededFromInboxToSatisfy});
        this.processInbox(extraNeededFromInboxToSatisfy);
        ArrayList<WorkContainer<K, V>> work = new ArrayList<WorkContainer<K, V>>();
        LoopingResumingIterator<Object, NavigableMap<Long, WorkContainer<K, V>>> it = new LoopingResumingIterator<Object, NavigableMap<Long, WorkContainer<K, V>>>(this.iterationResumePoint, this.processingShards);
        ArrayList<WorkContainer> staleWorkToRemove = new ArrayList<WorkContainer>();
        int slowWorkCount = 0;
        for (Map.Entry<Object, NavigableMap<Long, WorkContainer<K, V>>> entry : it) {
            log.trace("Looking for work on shard: {}", entry.getKey());
            if (work.size() >= workToGetDelta) {
                this.iterationResumePoint = Optional.of(entry.getKey());
                log.debug("Work taken is now over max, stopping (saving iteration resume point {})", this.iterationResumePoint);
                break;
            }
            ArrayList<WorkContainer> shardWork = new ArrayList<WorkContainer>();
            SortedMap shardQueue = entry.getValue();
            Set shardQueueEntries = shardQueue.entrySet();
            for (Map.Entry queueEntry : shardQueueEntries) {
                boolean recordNeverAttempted;
                int taken = work.size() + shardWork.size();
                if (taken >= workToGetDelta) {
                    log.trace("Work taken ({}) exceeds max ({})", (Object)taken, (Object)workToGetDelta);
                    break;
                }
                WorkContainer workContainer = (WorkContainer)queueEntry.getValue();
                if (this.checkEpochIsStale(workContainer)) {
                    log.debug("Work is in queue with stale epoch. Will remove now. Was it not removed properly on revoke? Or are we in a race state? {}", (Object)workContainer);
                    staleWorkToRemove.add(workContainer);
                    continue;
                }
                TopicPartition topicPartition = workContainer.getTopicPartition();
                boolean notAllowedMoreRecords = this.partitionMoreRecordsAllowedToProcess.getOrDefault(topicPartition, true) == false;
                boolean bl = recordNeverAttempted = !workContainer.hasPreviouslyFailed();
                if (notAllowedMoreRecords && recordNeverAttempted && workContainer.isNotInFlight()) {
                    log.debug("Not allowed more records ({}) for the partition ({}) as set from previous encode run, that this record ({}) belongs to due to offset encoding back pressure, has never been attemtped before ({}), not in flight ({}), continuing on to next container in shard.", new Object[]{notAllowedMoreRecords, topicPartition, workContainer.offset(), recordNeverAttempted, workContainer.isNotInFlight()});
                    continue;
                }
                boolean hasNotSucceededAlready = !workContainer.isUserFunctionSucceeded();
                boolean delayHasPassed = workContainer.hasDelayPassed(this.clock);
                if (delayHasPassed && workContainer.isNotInFlight() && hasNotSucceededAlready) {
                    log.trace("Taking {} as work", (Object)workContainer);
                    workContainer.queueingForExecution();
                    shardWork.add(workContainer);
                } else {
                    Duration timeInFlight = workContainer.getTimeInFlight();
                    String msg = "Can't take as work: Work ({}). Must all be true: Delay passed= {}. Is not in flight= {}. Has not succeeded already= {}. Time spent in execution queue: {}.";
                    if (BackportUtils.toSeconds(timeInFlight) > BackportUtils.toSeconds(this.thresholdForTimeSpentInQueueWarning)) {
                        ++slowWorkCount;
                        log.trace("Work has spent over " + this.thresholdForTimeSpentInQueueWarning + " in queue! " + msg, new Object[]{workContainer, delayHasPassed, workContainer.isNotInFlight(), hasNotSucceededAlready, timeInFlight});
                    } else {
                        log.trace(msg, new Object[]{workContainer, delayHasPassed, workContainer.isNotInFlight(), hasNotSucceededAlready, timeInFlight});
                    }
                }
                ParallelConsumerOptions.ProcessingOrder ordering = this.options.getOrdering();
                if (ordering == ParallelConsumerOptions.ProcessingOrder.UNORDERED) continue;
                log.trace("Processing by {}, so have cannot get more messages on this ({}) shard.", (Object)this.options.getOrdering(), entry.getKey());
                break;
            }
            work.addAll(shardWork);
        }
        if (slowWorkCount > 0) {
            int finalSlowWorkCount = slowWorkCount;
            this.slowWarningRateLimit.performIfNotLimited(() -> log.warn("Warning: {} records in the queue have been waiting longer than {}.", (Object)finalSlowWorkCount, (Object)BackportUtils.toSeconds(this.thresholdForTimeSpentInQueueWarning)));
        }
        for (WorkContainer workContainer : staleWorkToRemove) {
            this.removeWorkFromShard(workContainer);
        }
        log.debug("Got {} records of work. In-flight: {}, Awaiting in commit queues: {}", new Object[]{work.size(), this.getNumberRecordsOutForProcessing(), this.getNumberOfEntriesInPartitionQueues()});
        this.numberRecordsOutForProcessing += work.size();
        return work;
    }

    public void onSuccess(WorkContainer<K, V> wc) {
        log.trace("Processing success...");
        this.workStateIsDirtyNeedsCommitting.set(true);
        ConsumerRecord<K, V> cr = wc.getCr();
        log.trace("Work success ({}), removing from processing shard queue", wc);
        wc.succeed();
        Object key = this.computeShardKey(cr);
        NavigableMap<Long, WorkContainer<K, V>> shard = this.processingShards.get(key);
        long offset = cr.offset();
        shard.remove(offset);
        boolean keyOrdering = this.options.getOrdering().equals((Object)ParallelConsumerOptions.ProcessingOrder.KEY);
        if (keyOrdering && shard.isEmpty()) {
            log.trace("Removing empty shard (key: {})", key);
            this.processingShards.remove(key);
        }
        this.successfulWorkListeners.forEach(c -> c.accept(wc));
        --this.numberRecordsOutForProcessing;
    }

    public void onFailure(WorkContainer<K, V> wc) {
        wc.fail(this.clock);
        this.putBack(wc);
    }

    private void putBack(WorkContainer<K, V> wc) {
        log.debug("Work FAILED, returning to shard");
        ConsumerRecord<K, V> cr = wc.getCr();
        Object key = this.computeShardKey(cr);
        NavigableMap<Long, WorkContainer<Long, WorkContainer<K, V>>> shard = this.processingShards.get(key);
        long offset = wc.getCr().offset();
        shard.put(offset, wc);
        --this.numberRecordsOutForProcessing;
    }

    public int getNumberOfEntriesInPartitionQueues() {
        int count = 0;
        for (Map.Entry<TopicPartition, NavigableMap<Long, WorkContainer<K, V>>> e : this.partitionCommitQueues.entrySet()) {
            count += e.getValue().size();
        }
        return count;
    }

    public int getTotalWorkWaitingProcessing() {
        int workQueuedInShardsCount = this.getWorkQueuedInShardsCount();
        Integer workQueuedInMailboxCount = this.getWorkQueuedInMailboxCount();
        return workQueuedInShardsCount + workQueuedInMailboxCount;
    }

    Integer getWorkQueuedInMailboxCount() {
        return this.wmbm.getWorkQueuedInMailboxCount();
    }

    public int getWorkQueuedInShardsCount() {
        int count = 0;
        for (Map.Entry<Object, NavigableMap<Long, WorkContainer<K, V>>> e : this.processingShards.entrySet()) {
            count += e.getValue().size();
        }
        return count;
    }

    boolean isRecordsAwaitingProcessing() {
        int partitionWorkRemainingCount = this.getWorkQueuedInShardsCount();
        boolean internalQueuesNotEmpty = this.hasWorkInMailboxes();
        return partitionWorkRemainingCount > 0 || internalQueuesNotEmpty;
    }

    boolean isRecordsAwaitingToBeCommitted() {
        int partitionWorkRemainingCount = this.getNumberOfEntriesInPartitionQueues();
        return partitionWorkRemainingCount > 0;
    }

    public WorkContainer<K, V> getWorkContainerForRecord(ConsumerRecord<K, V> rec) {
        Object key = this.computeShardKey(rec);
        NavigableMap<Long, WorkContainer<K, V>> longWorkContainerTreeMap = this.processingShards.get(key);
        long offset = rec.offset();
        WorkContainer wc = (WorkContainer)longWorkContainerTreeMap.get(offset);
        return wc;
    }

    Map<TopicPartition, OffsetAndMetadata> findCompletedEligibleOffsetsAndRemove() {
        return this.findCompletedEligibleOffsetsAndRemove(true);
    }

    boolean hasCommittableOffsets() {
        return this.isDirty();
    }

    <R> Map<TopicPartition, OffsetAndMetadata> findCompletedEligibleOffsetsAndRemove(boolean remove) {
        HashMap<TopicPartition, OffsetAndMetadata> offsetsToSend = new HashMap<TopicPartition, OffsetAndMetadata>();
        int count = 0;
        int removed = 0;
        log.trace("Scanning for in order in-flight work that has completed...");
        for (Map.Entry<TopicPartition, NavigableMap<Long, WorkContainer<K, V>>> partitionQueueEntry : this.partitionCommitQueues.entrySet()) {
            TopicPartition topicPartitionKey = partitionQueueEntry.getKey();
            log.trace("Starting scan of partition: {}", (Object)topicPartitionKey);
            NavigableMap<Long, WorkContainer<K, V>> partitionQueue = partitionQueueEntry.getValue();
            count += partitionQueue.size();
            LinkedList<WorkContainer> workToRemove = new LinkedList<WorkContainer>();
            LinkedHashSet<Long> incompleteOffsets = new LinkedHashSet<Long>();
            long lowWaterMark = -1L;
            boolean iteratedBeyondLowWaterMarkBeingLowestCommittableOffset = false;
            for (Map.Entry offsetAndItsWorkContainer : partitionQueue.entrySet()) {
                WorkContainer container = (WorkContainer)offsetAndItsWorkContainer.getValue();
                long offset = container.getCr().offset();
                boolean complete = container.isUserFunctionComplete();
                if (complete) {
                    if (container.getUserFunctionSucceeded().get().booleanValue() && !iteratedBeyondLowWaterMarkBeingLowestCommittableOffset) {
                        log.trace("Found offset candidate ({}) to add to offset commit map", (Object)container);
                        workToRemove.add(container);
                        long offsetOfNextExpectedMessageToBeCommitted = offset + 1L;
                        OffsetAndMetadata offsetData = new OffsetAndMetadata(offsetOfNextExpectedMessageToBeCommitted);
                        offsetsToSend.put(topicPartitionKey, offsetData);
                        continue;
                    }
                    if (container.getUserFunctionSucceeded().get().booleanValue() && iteratedBeyondLowWaterMarkBeingLowestCommittableOffset) {
                        log.trace("Offset {} is complete and succeeded, but we've iterated past the lowest committable offset ({}). Will mark as complete in the offset map.", (Object)container.getCr().offset(), (Object)lowWaterMark);
                        continue;
                    }
                    log.trace("Offset {} is complete, but failed processing. Will track in offset map as not complete. Can't do normal offset commit past this point.", (Object)container.getCr().offset());
                    iteratedBeyondLowWaterMarkBeingLowestCommittableOffset = true;
                    incompleteOffsets.add(offset);
                    continue;
                }
                lowWaterMark = container.offset();
                iteratedBeyondLowWaterMarkBeingLowestCommittableOffset = true;
                log.trace("Offset ({}) is incomplete, holding up the queue ({}) of size {}.", new Object[]{container.getCr().offset(), partitionQueueEntry.getKey(), partitionQueueEntry.getValue().size()});
                incompleteOffsets.add(offset);
            }
            this.addEncodedOffsets(offsetsToSend, topicPartitionKey, incompleteOffsets);
            if (!remove) continue;
            removed += workToRemove.size();
            for (WorkContainer workContainer : workToRemove) {
                long offset = workContainer.getCr().offset();
                partitionQueue.remove(offset);
            }
        }
        log.debug("Scan finished, {} were in flight, {} completed offsets removed, coalesced to {} offset(s) ({}) to be committed", new Object[]{count, removed, offsetsToSend.size(), offsetsToSend});
        return offsetsToSend;
    }

    private void addEncodedOffsets(Map<TopicPartition, OffsetAndMetadata> offsetsToSend, TopicPartition topicPartitionKey, LinkedHashSet<Long> incompleteOffsets) {
        boolean offsetEncodingNeeded;
        boolean bl = offsetEncodingNeeded = !incompleteOffsets.isEmpty();
        if (offsetEncodingNeeded) {
            OffsetAndMetadata finalOffsetOnly = offsetsToSend.get(topicPartitionKey);
            long offsetOfNextExpectedMessage = finalOffsetOnly == null ? ((Long)incompleteOffsets.iterator().next()).longValue() : finalOffsetOnly.offset();
            OffsetMapCodecManager<K, V> om = new OffsetMapCodecManager<K, V>(this, this.consumer);
            try {
                OffsetAndMetadata offsetWithExtraMap;
                boolean moreMessagesAllowed;
                String offsetMapPayload = om.makeOffsetMetadataPayload(offsetOfNextExpectedMessage, topicPartitionKey, incompleteOffsets);
                int metaPayloadLength = offsetMapPayload.length();
                double pressureThresholdValue = (double)OffsetMapCodecManager.DefaultMaxMetadataSize * USED_PAYLOAD_THRESHOLD_MULTIPLIER;
                if (metaPayloadLength > OffsetMapCodecManager.DefaultMaxMetadataSize) {
                    moreMessagesAllowed = false;
                    offsetWithExtraMap = new OffsetAndMetadata(offsetOfNextExpectedMessage);
                    log.warn("Offset map data too large (size: {}) to fit in metadata payload hard limit of {} - cannot include in commit. Warning: messages might be replayed on rebalance. See kafka.coordinator.group.OffsetConfig#DefaultMaxMetadataSize = {} and issue #47.", new Object[]{metaPayloadLength, OffsetMapCodecManager.DefaultMaxMetadataSize, OffsetMapCodecManager.DefaultMaxMetadataSize});
                } else if ((double)metaPayloadLength > pressureThresholdValue) {
                    moreMessagesAllowed = false;
                    offsetWithExtraMap = new OffsetAndMetadata(offsetOfNextExpectedMessage, offsetMapPayload);
                    log.warn("Payload size {} higher than threshold {}, but still lower than max {}. Will write payload, but will not allow further messages, in order to allow the offset data to shrink (via succeeding messages).", new Object[]{metaPayloadLength, pressureThresholdValue, OffsetMapCodecManager.DefaultMaxMetadataSize});
                } else {
                    moreMessagesAllowed = true;
                    offsetWithExtraMap = new OffsetAndMetadata(offsetOfNextExpectedMessage, offsetMapPayload);
                    log.debug("Payload size {} within threshold {}", (Object)metaPayloadLength, (Object)pressureThresholdValue);
                }
                this.partitionMoreRecordsAllowedToProcess.put(topicPartitionKey, moreMessagesAllowed);
                offsetsToSend.put(topicPartitionKey, offsetWithExtraMap);
            }
            catch (EncodingNotSupportedException e) {
                this.partitionMoreRecordsAllowedToProcess.put(topicPartitionKey, false);
                log.warn("No encodings could be used to encode the offset map, skipping. Warning: messages might be replayed on rebalance.", (Throwable)e);
            }
        } else {
            this.partitionMoreRecordsAllowedToProcess.put(topicPartitionKey, true);
        }
    }

    public void onOffsetCommitSuccess(Map<TopicPartition, OffsetAndMetadata> offsetsToSend) {
        offsetsToSend.forEach((tp, meta) -> {
            boolean trackedOffsetsForThisPartitionExist;
            Set<Long> offsets = this.partitionIncompleteOffsets.get(tp);
            boolean bl = trackedOffsetsForThisPartitionExist = offsets != null && !offsets.isEmpty();
            if (trackedOffsetsForThisPartitionExist) {
                long newLowWaterMark = meta.offset();
                offsets.removeIf(offset -> offset < newLowWaterMark);
            }
        });
    }

    boolean checkEpochIsStale(WorkContainer<K, V> workContainer) {
        TopicPartition topicPartitionKey = workContainer.getTopicPartition();
        Integer currentPartitionEpoch = this.partitionsAssignmentEpochs.get(topicPartitionKey);
        int workEpoch = workContainer.getEpoch();
        if (currentPartitionEpoch != workEpoch) {
            log.debug("Epoch mismatch {} vs {} for record {} - were partitions lost? Skipping message - it's already assigned to a different consumer (possibly me).", new Object[]{workEpoch, currentPartitionEpoch, workContainer});
            return true;
        }
        return false;
    }

    private void incrementPartitionAssignmentEpoch(Collection<TopicPartition> partitions) {
        for (TopicPartition partition : partitions) {
            int epoch = this.partitionsAssignmentEpochs.getOrDefault(partition, -1);
            this.partitionsAssignmentEpochs.put(partition, ++epoch);
        }
    }

    public boolean shouldThrottle() {
        return this.isSufficientlyLoaded();
    }

    boolean isSufficientlyLoaded() {
        return this.getWorkQueuedInMailboxCount() > this.options.getMaxConcurrency() * this.getLoadingFactor();
    }

    private int getLoadingFactor() {
        return this.dynamicLoadFactor.getCurrentFactor();
    }

    public boolean workIsWaitingToBeProcessed() {
        Collection<NavigableMap<Long, WorkContainer<K, V>>> values = this.processingShards.values();
        for (NavigableMap<Long, WorkContainer<K, V>> value : values) {
            if (value.isEmpty()) continue;
            return true;
        }
        return false;
    }

    public boolean hasWorkInFlight() {
        return this.getNumberRecordsOutForProcessing() != 0;
    }

    public boolean isClean() {
        return !this.isDirty();
    }

    private boolean isDirty() {
        return this.workStateIsDirtyNeedsCommitting.get();
    }

    public boolean hasWorkInMailboxes() {
        return this.getWorkQueuedInMailboxCount() > 0;
    }

    public boolean hasWorkInCommitQueues() {
        for (Map.Entry<TopicPartition, NavigableMap<Long, WorkContainer<K, V>>> e : this.partitionCommitQueues.entrySet()) {
            if (e.getValue().isEmpty()) continue;
            return true;
        }
        return false;
    }

    protected void handleFutureResult(WorkContainer<K, V> wc) {
        if (this.checkEpochIsStale(wc)) {
            log.debug("Work result received, but from an old generation. Dropping work from revoked partition {}", wc);
            return;
        }
        if (wc.getUserFunctionSucceeded().get().booleanValue()) {
            this.onSuccess(wc);
        } else {
            this.onFailure(wc);
        }
    }

    public static void setUSED_PAYLOAD_THRESHOLD_MULTIPLIER(double USED_PAYLOAD_THRESHOLD_MULTIPLIER) {
        WorkManager.USED_PAYLOAD_THRESHOLD_MULTIPLIER = USED_PAYLOAD_THRESHOLD_MULTIPLIER;
    }

    public ParallelConsumerOptions getOptions() {
        return this.options;
    }

    public int getNumberRecordsOutForProcessing() {
        return this.numberRecordsOutForProcessing;
    }

    List<java.util.function.Consumer<WorkContainer<K, V>>> getSuccessfulWorkListeners() {
        return this.successfulWorkListeners;
    }

    void setClock(WallClock clock) {
        this.clock = clock;
    }
}

