/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.kafka.consumer;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.common.impl.Helper;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumer;
import org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumerHelper;
import org.eclipse.hono.util.Pair;
import org.eclipse.hono.util.Strings;

public class AsyncHandlingAutoCommitKafkaConsumer
extends HonoKafkaConsumer {
    public static final String CONFIG_HONO_OFFSETS_SKIP_RECOMMIT_PERIOD_SECONDS = "hono.offsets.skip.recommit.period.seconds";
    public static final String CONFIG_HONO_OFFSETS_COMMIT_RECORD_COMPLETION_TIMEOUT_MILLIS = "hono.offsets.commit.record.completion.timeout.millis";
    public static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5L);
    public static final Duration DEFAULT_OFFSETS_SKIP_RECOMMIT_PERIOD = Duration.ofHours(1L);
    public static final Duration DEFAULT_OFFSETS_COMMIT_RECORD_COMPLETION_TIMEOUT = Duration.ofMillis(300L);
    public static final Integer DEFAULT_MAX_RECORDS_IN_PROCESSING = 501;
    public static final Integer MAX_RECORDS_IN_PROCESSING_RESUME_THRESHOLD_PERCENT = 5;
    private final int maxRecordsInProcessing;
    private final int maxRecordsInProcessingResumeThreshold;
    private final long commitIntervalMillis;
    private final long skipOffsetRecommitPeriodSeconds;
    private final long offsetsCommitRecordCompletionTimeoutMillis;
    private final Map<org.apache.kafka.common.TopicPartition, TopicPartitionOffsets> offsetsMap = new HashMap<org.apache.kafka.common.TopicPartition, TopicPartitionOffsets>();
    private final Map<org.apache.kafka.common.TopicPartition, Long> lastKnownCommittedOffsets = new HashMap<org.apache.kafka.common.TopicPartition, Long>();
    private final AtomicBoolean periodicCommitInvocationInProgress = new AtomicBoolean();
    private final AtomicInteger recordsInProcessingCounter = new AtomicInteger();
    private final AtomicReference<UncompletedRecordsCompletionLatch> uncompletedRecordsCompletionLatchRef = new AtomicReference();
    private Instant pauseStartTime = Instant.MAX;
    private Long periodicCommitTimerId;

    public AsyncHandlingAutoCommitKafkaConsumer(Vertx vertx, Set<String> topics, Function<KafkaConsumerRecord<String, Buffer>, Future<Void>> recordHandler, Map<String, String> consumerConfig) {
        this(new AtomicReference<AsyncHandlingAutoCommitKafkaConsumer>(), vertx, topics, null, recordHandler, consumerConfig);
    }

    public AsyncHandlingAutoCommitKafkaConsumer(Vertx vertx, Pattern topicPattern, Function<KafkaConsumerRecord<String, Buffer>, Future<Void>> recordHandler, Map<String, String> consumerConfig) {
        this(new AtomicReference<AsyncHandlingAutoCommitKafkaConsumer>(), vertx, null, topicPattern, recordHandler, consumerConfig);
    }

    private AsyncHandlingAutoCommitKafkaConsumer(AtomicReference<AsyncHandlingAutoCommitKafkaConsumer> selfRef, Vertx vertx, Set<String> topics, Pattern topicPattern, Function<KafkaConsumerRecord<String, Buffer>, Future<Void>> recordHandler, Map<String, String> consumerConfig) {
        super(vertx, topics, topicPattern, (Handler<KafkaConsumerRecord<String, Buffer>>)((Handler)record -> ((AsyncHandlingAutoCommitKafkaConsumer)selfRef.getPlain()).handleRecord((KafkaConsumerRecord<String, Buffer>)record, recordHandler)), AsyncHandlingAutoCommitKafkaConsumer.validateAndAdaptConsumerConfig(consumerConfig));
        selfRef.setPlain(this);
        this.maxRecordsInProcessing = this.getMaxRecordsInProcessing(consumerConfig);
        this.maxRecordsInProcessingResumeThreshold = this.maxRecordsInProcessing * MAX_RECORDS_IN_PROCESSING_RESUME_THRESHOLD_PERCENT / 100;
        this.commitIntervalMillis = AsyncHandlingAutoCommitKafkaConsumer.getCommitInterval(consumerConfig);
        this.skipOffsetRecommitPeriodSeconds = AsyncHandlingAutoCommitKafkaConsumer.getSkipOffsetRecommitPeriodSeconds(consumerConfig);
        this.offsetsCommitRecordCompletionTimeoutMillis = AsyncHandlingAutoCommitKafkaConsumer.getOffsetsCommitRecordCompletionTimeoutMillis(consumerConfig);
    }

    private void handleRecord(KafkaConsumerRecord<String, Buffer> record, Function<KafkaConsumerRecord<String, Buffer>, Future<Void>> recordHandler) {
        if (this.recordsInProcessingCounter.incrementAndGet() >= this.maxRecordsInProcessing && this.pause()) {
            this.log.info("paused consumer record polling; max no. of records in processing exceeded (current: {}, max: {})", (Object)this.recordsInProcessingCounter.get(), (Object)this.maxRecordsInProcessing);
            this.pauseStartTime = Instant.now();
        }
        org.apache.kafka.common.TopicPartition topicPartition = new org.apache.kafka.common.TopicPartition(record.topic(), record.partition());
        OffsetsQueueEntry offsetsQueueEntry = this.setRecordReceived(record.offset(), topicPartition);
        try {
            recordHandler.apply(record).onComplete(ar -> this.setRecordHandlingComplete(offsetsQueueEntry, topicPartition));
        }
        catch (Exception e) {
            this.log.warn("error handling record [topic: {}, partition: {}, offset: {}, headers: {}]", new Object[]{record.topic(), record.partition(), record.offset(), record.headers(), e});
            this.setRecordHandlingComplete(offsetsQueueEntry, topicPartition);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setRecordHandlingComplete(OffsetsQueueEntry offsetsQueueEntry, org.apache.kafka.common.TopicPartition topicPartition) {
        offsetsQueueEntry.setHandlingComplete();
        AtomicReference<UncompletedRecordsCompletionLatch> atomicReference = this.uncompletedRecordsCompletionLatchRef;
        synchronized (atomicReference) {
            Optional.ofNullable(this.uncompletedRecordsCompletionLatchRef.get()).ifPresent(latch -> latch.onRecordHandlingCompleted(topicPartition));
        }
        if (this.recordsInProcessingCounter.decrementAndGet() < this.maxRecordsInProcessing - this.maxRecordsInProcessingResumeThreshold && this.resume()) {
            this.log.info("resumed consumer record polling after {}ms; current no. of records in processing ({}) dropped below threshold ({})", new Object[]{Duration.between(this.pauseStartTime, Instant.now()).toMillis(), this.recordsInProcessingCounter.get(), this.maxRecordsInProcessing - this.maxRecordsInProcessingResumeThreshold});
        }
    }

    private static Map<String, String> validateAndAdaptConsumerConfig(Map<String, String> consumerConfig) {
        if (Strings.isNullOrEmpty((Object)consumerConfig.get("group.id"))) {
            throw new IllegalArgumentException("group.id config entry has to be set");
        }
        consumerConfig.put("enable.auto.commit", "false");
        return consumerConfig;
    }

    private int getMaxRecordsInProcessing(Map<String, String> consumerConfig) {
        return Optional.ofNullable(consumerConfig.get("max.poll.records")).map(s -> Integer.parseInt(s) + 1).orElse(DEFAULT_MAX_RECORDS_IN_PROCESSING);
    }

    private static long getCommitInterval(Map<String, String> consumerConfig) {
        return Optional.ofNullable(consumerConfig.get("auto.commit.interval.ms")).map(Long::parseLong).orElse(DEFAULT_COMMIT_INTERVAL.toMillis());
    }

    private static long getSkipOffsetRecommitPeriodSeconds(Map<String, String> consumerConfig) {
        return Optional.ofNullable(consumerConfig.get(CONFIG_HONO_OFFSETS_SKIP_RECOMMIT_PERIOD_SECONDS)).map(Long::parseLong).orElse(DEFAULT_OFFSETS_SKIP_RECOMMIT_PERIOD.toSeconds());
    }

    private static long getOffsetsCommitRecordCompletionTimeoutMillis(Map<String, String> consumerConfig) {
        return Optional.ofNullable(consumerConfig.get(CONFIG_HONO_OFFSETS_COMMIT_RECORD_COMPLETION_TIMEOUT_MILLIS)).map(Long::parseUnsignedLong).orElse(DEFAULT_OFFSETS_COMMIT_RECORD_COMPLETION_TIMEOUT.toMillis());
    }

    @Override
    protected void onRecordHandlerSkippedForExpiredRecord(KafkaConsumerRecord<String, Buffer> record) {
        OffsetsQueueEntry queueEntry = this.setRecordReceived(record.offset(), new org.apache.kafka.common.TopicPartition(record.topic(), record.partition()));
        queueEntry.setHandlingComplete();
    }

    @Override
    public Future<Void> start() {
        return super.start().onComplete(v -> this.startPeriodicCommitTimer());
    }

    @Override
    public Future<Void> stop() {
        if (this.periodicCommitTimerId != null) {
            this.vertx.cancelTimer(this.periodicCommitTimerId.longValue());
        }
        return super.stop().onComplete(v -> this.clearObsoleteTopicPartitionOffsets(List.of()));
    }

    @Override
    protected void onPartitionsAssignedBlocking(Set<TopicPartition> partitionsSet) {
        this.clearObsoleteTopicPartitionOffsets(this.getUnderlyingConsumer().assignment());
        if (this.topicPattern != null) {
            Set<String> subscribedTopicPatternTopics = this.getSubscribedTopicPatternTopics();
            this.lastKnownCommittedOffsets.entrySet().removeIf(entry -> !subscribedTopicPatternTopics.contains(((org.apache.kafka.common.TopicPartition)entry.getKey()).topic()));
        }
        if (!"earliest".equals(this.consumerConfig.get("auto.offset.reset")) && !partitionsSet.isEmpty()) {
            this.ensureOffsetCommitsExistForNewlyAssignedPartitions(partitionsSet);
        }
    }

    private synchronized void ensureOffsetCommitsExistForNewlyAssignedPartitions(Set<TopicPartition> partitionsSet) {
        LinkedList<org.apache.kafka.common.TopicPartition> partitionsForNextCommit = new LinkedList<org.apache.kafka.common.TopicPartition>();
        partitionsSet.stream().map(Helper::to).filter(partition -> !this.offsetsMap.containsKey(partition)).forEach(partition -> {
            try {
                long position = this.getUnderlyingConsumer().position(partition);
                boolean positionCommitted = Optional.ofNullable(this.lastKnownCommittedOffsets.get(partition)).map(committedPos -> committedPos.equals(position)).orElse(false);
                if (!positionCommitted) {
                    partitionsForNextCommit.add((org.apache.kafka.common.TopicPartition)partition);
                }
                this.offsetsMap.put((org.apache.kafka.common.TopicPartition)partition, new TopicPartitionOffsets((org.apache.kafka.common.TopicPartition)partition, position, positionCommitted));
            }
            catch (Exception ex) {
                this.log.warn("error fetching position for newly assigned partition [{}]", partition, (Object)ex);
            }
        });
        if (this.log.isDebugEnabled() && !partitionsForNextCommit.isEmpty()) {
            this.log.debug("onPartitionsAssigned: partitions to be part of next offset commit: [{}]", (Object)HonoKafkaConsumerHelper.getPartitionsDebugString(partitionsForNextCommit));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void onPartitionsRevokedBlocking(Set<TopicPartition> partitionsSet) {
        if (!partitionsSet.isEmpty() && this.offsetsCommitRecordCompletionTimeoutMillis > 0L) {
            UncompletedRecordsCompletionLatch latch = null;
            AtomicReference<UncompletedRecordsCompletionLatch> atomicReference = this.uncompletedRecordsCompletionLatchRef;
            synchronized (atomicReference) {
                Map<org.apache.kafka.common.TopicPartition, TopicPartitionOffsets> uncompletedRecordsPartitions = this.getUncompletedRecordsPartitions(Helper.to(partitionsSet));
                if (!uncompletedRecordsPartitions.isEmpty()) {
                    this.log.info("init latch to wait up to {}ms for the completion of record handling concerning {}", (Object)this.offsetsCommitRecordCompletionTimeoutMillis, uncompletedRecordsPartitions.size() <= 10 ? uncompletedRecordsPartitions.keySet() : uncompletedRecordsPartitions.size() + " partitions");
                    latch = new UncompletedRecordsCompletionLatch(uncompletedRecordsPartitions);
                    this.uncompletedRecordsCompletionLatchRef.set(latch);
                }
            }
            if (latch != null) {
                try {
                    if (latch.await(this.offsetsCommitRecordCompletionTimeoutMillis, TimeUnit.MILLISECONDS)) {
                        this.log.trace("latch to wait for the completion of record handling was released in time");
                    } else {
                        this.log.info("timed out waiting for record handling to finish after {}ms", (Object)this.offsetsCommitRecordCompletionTimeoutMillis);
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                finally {
                    this.uncompletedRecordsCompletionLatchRef.set(null);
                }
            }
        }
        this.commitOffsetsSync();
    }

    private void commitOffsetsSync() {
        if (Vertx.currentContext() != null) {
            throw new IllegalStateException("must be run on the polling thread");
        }
        Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> offsets = this.getOffsetsToCommit();
        if (!offsets.isEmpty()) {
            try {
                if (this.log.isTraceEnabled()) {
                    this.log.trace("commitSync; offsets: [{}]", (Object)HonoKafkaConsumerHelper.getOffsetsDebugString(offsets));
                }
                this.getUnderlyingConsumer().commitSync(offsets);
                this.setCommittedOffsets(offsets);
                this.log.trace("commitSync succeeded");
            }
            catch (Exception e) {
                this.log.warn("commit failed: {}", (Object)e.toString());
            }
        } else {
            this.log.trace("skip commitSync - no offsets to commit");
        }
    }

    private void startPeriodicCommitTimer() {
        this.periodicCommitTimerId = this.vertx.setPeriodic(this.commitIntervalMillis, tid -> {
            if (!this.periodicCommitInvocationInProgress.compareAndSet(false, true)) {
                this.log.trace("periodic commit already triggered, skipping invocation");
                return;
            }
            this.runOnKafkaWorkerThread((Handler<Void>)((Handler)v -> {
                Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> offsets = this.getOffsetsToCommit();
                if (!offsets.isEmpty()) {
                    if (this.log.isTraceEnabled()) {
                        this.log.trace("do periodic commit; offsets: [{}]", (Object)HonoKafkaConsumerHelper.getOffsetsDebugString(offsets));
                    }
                    try {
                        this.getUnderlyingConsumer().commitAsync(offsets, (committedOffsets, error) -> {
                            if (error != null) {
                                this.log.info("periodic commit failed: {}", (Object)error.toString());
                            } else {
                                this.log.trace("periodic commit succeeded");
                                this.setCommittedOffsets(committedOffsets);
                            }
                        });
                    }
                    catch (Exception ex) {
                        this.log.error("error doing periodic commit", (Throwable)ex);
                    }
                } else {
                    this.log.trace("skip periodic commit - no offsets to commit");
                }
                this.periodicCommitInvocationInProgress.set(false);
            }));
        });
    }

    private synchronized Map<org.apache.kafka.common.TopicPartition, TopicPartitionOffsets> getUncompletedRecordsPartitions(Set<org.apache.kafka.common.TopicPartition> partitions) {
        return this.offsetsMap.entrySet().stream().filter(entry -> partitions.contains(entry.getKey()) && !((TopicPartitionOffsets)entry.getValue()).allCompleted()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    private synchronized OffsetsQueueEntry setRecordReceived(long recordOffset, org.apache.kafka.common.TopicPartition topicPartition) {
        return this.offsetsMap.computeIfAbsent(topicPartition, k -> new TopicPartitionOffsets(topicPartition)).addOffset(recordOffset);
    }

    private synchronized void clearObsoleteTopicPartitionOffsets(Collection<org.apache.kafka.common.TopicPartition> currentlyAssignedPartitions) {
        Objects.requireNonNull(currentlyAssignedPartitions);
        Iterator<Map.Entry<org.apache.kafka.common.TopicPartition, TopicPartitionOffsets>> partitionOffsetsIterator = this.offsetsMap.entrySet().iterator();
        while (partitionOffsetsIterator.hasNext()) {
            Map.Entry<org.apache.kafka.common.TopicPartition, TopicPartitionOffsets> topicPartitionOffsetsEntry = partitionOffsetsIterator.next();
            if (currentlyAssignedPartitions.contains(topicPartitionOffsetsEntry.getKey())) continue;
            if (topicPartitionOffsetsEntry.getValue().needsCommit()) {
                this.log.warn("partition [{}] not assigned to consumer anymore but latest handled record offset hasn't been committed yet! {}", (Object)topicPartitionOffsetsEntry.getKey(), (Object)topicPartitionOffsetsEntry.getValue().getStateInfo());
            } else if (!topicPartitionOffsetsEntry.getValue().allCompleted()) {
                this.log.debug("partition [{}] not assigned to consumer anymore but not all read records have been fully processed yet! {}", (Object)topicPartitionOffsetsEntry.getKey(), (Object)topicPartitionOffsetsEntry.getValue().getStateInfo());
            } else {
                this.log.trace("partition [{}] not assigned to consumer anymore; no still outstanding offset commits there", (Object)topicPartitionOffsetsEntry.getKey());
            }
            partitionOffsetsIterator.remove();
        }
    }

    private synchronized Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> getOffsetsToCommit() {
        return this.offsetsMap.entrySet().stream().flatMap(entry -> ((TopicPartitionOffsets)entry.getValue()).getLastSequentiallyCompletedOffsetForCommit().stream().map(uncommittedOffset -> Pair.of((Object)((org.apache.kafka.common.TopicPartition)entry.getKey()), (Object)new OffsetAndMetadata(uncommittedOffset + 1L, "")))).collect(Collectors.toMap(Pair::one, Pair::two));
    }

    private synchronized void setCommittedOffsets(Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> offsets) {
        offsets.forEach((partition, offsetAndMetadata) -> {
            Optional.ofNullable(this.offsetsMap.get(partition)).ifPresent(queue -> queue.setLastCommittedOffset(offsetAndMetadata.offset() - 1L));
            this.lastKnownCommittedOffsets.put((org.apache.kafka.common.TopicPartition)partition, offsetAndMetadata.offset());
        });
    }

    static class UncompletedRecordsCompletionLatch {
        private final CountDownLatch latch = new CountDownLatch(1);
        private final Map<org.apache.kafka.common.TopicPartition, TopicPartitionOffsets> uncompletedRecordsPartitions;

        UncompletedRecordsCompletionLatch(Map<org.apache.kafka.common.TopicPartition, TopicPartitionOffsets> uncompletedRecordsPartitions) {
            this.uncompletedRecordsPartitions = uncompletedRecordsPartitions;
        }

        public void onRecordHandlingCompleted(org.apache.kafka.common.TopicPartition partition) {
            TopicPartitionOffsets offsets = this.uncompletedRecordsPartitions.get(partition);
            if (offsets != null && offsets.allCompleted()) {
                this.uncompletedRecordsPartitions.remove(partition);
                if (this.uncompletedRecordsPartitions.isEmpty()) {
                    this.latch.countDown();
                }
            }
        }

        public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
            return this.latch.await(timeout, unit);
        }
    }

    static class OffsetsQueueEntry {
        private final long offset;
        private final AtomicBoolean handlingComplete = new AtomicBoolean();

        OffsetsQueueEntry(long offset) {
            this.offset = offset;
        }

        public long getOffset() {
            return this.offset;
        }

        public void setHandlingComplete() {
            this.handlingComplete.set(true);
        }

        public boolean isHandlingComplete() {
            return this.handlingComplete.get();
        }

        public String toString() {
            return this.offset + (this.handlingComplete.get() ? " (completed)" : "");
        }
    }

    class TopicPartitionOffsets {
        private static final long UNDEFINED_OFFSET = -2L;
        private final org.apache.kafka.common.TopicPartition topicPartition;
        private final Deque<OffsetsQueueEntry> queue = new LinkedList<OffsetsQueueEntry>();
        private long lastSequentiallyCompletedOffset = -2L;
        private long lastCommittedOffset = -2L;
        private Instant lastCommitTime;

        TopicPartitionOffsets(org.apache.kafka.common.TopicPartition topicPartition) {
            this.topicPartition = Objects.requireNonNull(topicPartition);
        }

        TopicPartitionOffsets(org.apache.kafka.common.TopicPartition topicPartition, long initialPosition, boolean initialPositionCommitted) {
            this(topicPartition);
            this.lastSequentiallyCompletedOffset = initialPosition - 1L;
            this.lastCommittedOffset = initialPositionCommitted ? this.lastSequentiallyCompletedOffset : -2L;
        }

        public OffsetsQueueEntry addOffset(long offset) {
            this.cleanupAndUpdateLastCompletedOffset();
            OffsetsQueueEntry queueEntry = new OffsetsQueueEntry(offset);
            this.queue.add(queueEntry);
            return queueEntry;
        }

        public Optional<Long> getLastSequentiallyCompletedOffsetForCommit() {
            this.cleanupAndUpdateLastCompletedOffset();
            if (this.lastSequentiallyCompletedOffset == -2L) {
                return Optional.empty();
            }
            if (!this.queue.isEmpty()) {
                AsyncHandlingAutoCommitKafkaConsumer.this.log.trace("getOffsetsToCommit: offset {} to use for commit is {} entries behind last received offset {}; partition [{}]", new Object[]{this.lastSequentiallyCompletedOffset, this.queue.size(), this.queue.getLast().getOffset(), this.topicPartition});
            }
            if (this.lastSequentiallyCompletedOffset != this.lastCommittedOffset) {
                return Optional.of(this.lastSequentiallyCompletedOffset);
            }
            if (this.lastCommitTime != null && this.lastCommitTime.isBefore(Instant.now().minusSeconds(AsyncHandlingAutoCommitKafkaConsumer.this.skipOffsetRecommitPeriodSeconds))) {
                AsyncHandlingAutoCommitKafkaConsumer.this.log.trace("getOffsetsToCommit: offset {} will be recommitted (last commit {} too long ago); partition [{}]", new Object[]{this.lastSequentiallyCompletedOffset, this.lastCommitTime, this.topicPartition});
                return Optional.of(this.lastSequentiallyCompletedOffset);
            }
            return Optional.empty();
        }

        private void cleanupAndUpdateLastCompletedOffset() {
            while (Optional.ofNullable(this.queue.peek()).map(OffsetsQueueEntry::isHandlingComplete).orElse(false).booleanValue()) {
                this.lastSequentiallyCompletedOffset = this.queue.remove().getOffset();
            }
        }

        public void setLastCommittedOffset(long offset) {
            if (offset >= this.lastCommittedOffset) {
                this.lastCommitTime = Instant.now();
                this.lastCommittedOffset = offset;
            }
        }

        public boolean allCompleted() {
            this.cleanupAndUpdateLastCompletedOffset();
            return this.queue.isEmpty();
        }

        public boolean needsCommit() {
            this.cleanupAndUpdateLastCompletedOffset();
            return this.lastSequentiallyCompletedOffset != -2L && this.lastSequentiallyCompletedOffset != this.lastCommittedOffset;
        }

        public String getStateInfo() {
            this.cleanupAndUpdateLastCompletedOffset();
            return "{lastSequentiallyCompletedOffset=" + this.getOffsetString(this.lastSequentiallyCompletedOffset) + ", lastCommittedOffset=" + this.getOffsetString(this.lastCommittedOffset) + (this.queue.size() <= 20 ? ", queue=" + this.queue : ", queue.size=" + this.queue.size()) + "}";
        }

        private String getOffsetString(long offset) {
            return offset == -2L ? "undefined" : Long.toString(offset);
        }
    }
}

