/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer;

import io.confluent.parallelconsumer.CountingCRLinkedList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkMailBoxManager<K, V> {
    private static final Logger log = LoggerFactory.getLogger(WorkMailBoxManager.class);
    private int sharedBoxNestedRecordCount;
    private final LinkedBlockingQueue<ConsumerRecords<K, V>> workInbox = new LinkedBlockingQueue();
    private final CountingCRLinkedList<K, V> internalBatchMailQueue = new CountingCRLinkedList();
    private final Queue<ConsumerRecord<K, V>> internalFlattenedMailQueue = new LinkedList<ConsumerRecord<K, V>>();

    Integer getWorkQueuedInMailboxCount() {
        return this.sharedBoxNestedRecordCount + this.internalBatchMailQueue.getNestedCount() + this.internalFlattenedMailQueue.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerWork(ConsumerRecords<K, V> records) {
        LinkedBlockingQueue<ConsumerRecords<K, V>> linkedBlockingQueue = this.workInbox;
        synchronized (linkedBlockingQueue) {
            this.sharedBoxNestedRecordCount += records.count();
            this.workInbox.add(records);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void drainSharedMailbox() {
        LinkedBlockingQueue<ConsumerRecords<K, V>> linkedBlockingQueue = this.workInbox;
        synchronized (linkedBlockingQueue) {
            this.workInbox.drainTo(this.internalBatchMailQueue);
            this.sharedBoxNestedRecordCount = 0;
        }
    }

    public synchronized void processInbox() {
        this.drainSharedMailbox();
        while (!this.internalBatchMailQueue.isEmpty()) {
            ConsumerRecords<K, V> consumerRecords = this.internalBatchMailQueue.poll();
            log.debug("Flattening {} records", (Object)consumerRecords.count());
            for (ConsumerRecord consumerRecord : consumerRecords) {
                this.internalFlattenedMailQueue.add(consumerRecord);
            }
        }
    }

    public synchronized void onPartitionsRemoved(Collection<TopicPartition> removedPartitions) {
        log.debug("Removing stale work from inbox queues");
        this.processInbox();
        this.internalFlattenedMailQueue.removeIf(x -> {
            TopicPartition topicPartition = new TopicPartition(x.topic(), x.partition());
            boolean recordShouldBeRemoved = removedPartitions.contains(topicPartition);
            return recordShouldBeRemoved;
        });
    }

    public synchronized boolean internalFlattenedMailQueueIsEmpty() {
        return this.internalFlattenedMailQueue.isEmpty();
    }

    public synchronized ConsumerRecord<K, V> internalFlattenedMailQueuePoll() {
        return this.internalFlattenedMailQueue.poll();
    }

    public int internalFlattenedMailQueueSize() {
        return this.internalFlattenedMailQueue.size();
    }
}

