/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.internal;

import io.confluent.csid.utils.BackportUtils;
import io.confluent.csid.utils.StringUtils;
import io.confluent.csid.utils.TimeUtils;
import io.confluent.parallelconsumer.ParallelConsumer;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.PollContextInternal;
import io.confluent.parallelconsumer.internal.BrokerPollSystem;
import io.confluent.parallelconsumer.internal.ConsumerManager;
import io.confluent.parallelconsumer.internal.DrainingCloseable;
import io.confluent.parallelconsumer.internal.DynamicLoadFactor;
import io.confluent.parallelconsumer.internal.EpochAndRecordsMap;
import io.confluent.parallelconsumer.internal.InternalRuntimeError;
import io.confluent.parallelconsumer.internal.OffsetCommitter;
import io.confluent.parallelconsumer.internal.ProducerManager;
import io.confluent.parallelconsumer.internal.RateLimiter;
import io.confluent.parallelconsumer.internal.State;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import java.io.Closeable;
import java.lang.reflect.Field;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public abstract class AbstractParallelEoSStreamProcessor<K, V>
implements ParallelConsumer<K, V>,
ConsumerRebalanceListener,
Closeable {
    private static final Logger log = LoggerFactory.getLogger(AbstractParallelEoSStreamProcessor.class);
    public static final String MDC_INSTANCE_ID = "pcId";
    public static final String MDC_OFFSET_MARKER = "offset";
    private static final String MDC_WORK_CONTAINER_DESCRIPTOR = "offset";
    protected final ParallelConsumerOptions options;
    private Clock clock = TimeUtils.getClock();
    private static final int KAFKA_DEFAULT_AUTO_COMMIT_FREQUENCY = 5000;
    private Duration timeBetweenCommits = Duration.ofMillis(5000L);
    private Instant lastCommitCheckTime = Instant.now();
    private final Optional<ProducerManager<K, V>> producerManager;
    private final Consumer<K, V> consumer;
    protected final ThreadPoolExecutor workerThreadPool;
    private Optional<Future<Boolean>> controlThreadFuture = Optional.empty();
    protected final WorkManager<K, V> wm;
    private final BlockingQueue<ControllerEventMessage<K, V>> workMailBox = new LinkedBlockingQueue<ControllerEventMessage<K, V>>();
    private final BrokerPollSystem<K, V> brokerPollSubsystem;
    private final List<Runnable> controlLoopHooks = new ArrayList<Runnable>();
    private Thread blockableControlThread;
    private final AtomicBoolean currentlyPollingWorkCompleteMailBox = new AtomicBoolean();
    private final OffsetCommitter committer;
    private final AtomicBoolean commitCommand = new AtomicBoolean(false);
    protected final DynamicLoadFactor dynamicExtraLoadFactor;
    private Exception failureReason;
    private Instant lastCommitTime;
    private State state = State.unused;
    private Optional<ConsumerRebalanceListener> usersConsumerRebalanceListener = Optional.empty();
    private int numberOfAssignedPartitions;
    private final RateLimiter queueStatsLimiter = new RateLimiter();
    private boolean lastWorkRequestWasFulfilled = false;
    private Optional<String> myId = Optional.empty();

    public boolean isClosedOrFailed() {
        boolean closed = this.state == State.closed;
        boolean doneOrCancelled = false;
        if (this.controlThreadFuture.isPresent()) {
            Future<Boolean> threadFuture = this.controlThreadFuture.get();
            doneOrCancelled = threadFuture.isDone() || threadFuture.isCancelled();
        }
        return closed || doneOrCancelled;
    }

    public Exception getFailureCause() {
        return this.failureReason;
    }

    public AbstractParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOptions) {
        Objects.requireNonNull(newOptions, "Options must be supplied");
        log.info("Confluent Parallel Consumer initialise... Options: {}", newOptions);
        this.options = newOptions;
        this.options.validate();
        this.dynamicExtraLoadFactor = new DynamicLoadFactor();
        this.consumer = this.options.getConsumer();
        this.checkGroupIdConfigured(this.consumer);
        this.checkNotSubscribed(this.consumer);
        this.checkAutoCommitIsDisabled(this.consumer);
        this.workerThreadPool = this.setupWorkerPool(newOptions.getMaxConcurrency());
        this.wm = new WorkManager<K, V>(newOptions, this.consumer, this.dynamicExtraLoadFactor, TimeUtils.getClock());
        ConsumerManager<K, V> consumerMgr = new ConsumerManager<K, V>(this.consumer);
        this.brokerPollSubsystem = new BrokerPollSystem<K, V>(consumerMgr, this.wm, this, newOptions);
        if (this.options.isProducerSupplied()) {
            this.producerManager = Optional.of(new ProducerManager(this.options.getProducer(), consumerMgr, this.wm, this.options));
            this.committer = this.options.isUsingTransactionalProducer() ? (OffsetCommitter)this.producerManager.get() : this.brokerPollSubsystem;
        } else {
            this.producerManager = Optional.empty();
            this.committer = this.brokerPollSubsystem;
        }
    }

    private void checkGroupIdConfigured(Consumer<K, V> consumer) {
        try {
            consumer.groupMetadata();
        }
        catch (RuntimeException e) {
            throw new IllegalArgumentException("Error validating Consumer configuration - no group metadata - missing a configured GroupId on your Consumer?", e);
        }
    }

    protected ThreadPoolExecutor setupWorkerPool(int poolSize) {
        ThreadFactory defaultFactory;
        try {
            defaultFactory = (ThreadFactory)InitialContext.doLookup(this.options.getManagedThreadFactory());
        }
        catch (NamingException e) {
            log.debug("Using Java SE Thread", (Throwable)e);
            defaultFactory = Executors.defaultThreadFactory();
        }
        ThreadFactory finalDefaultFactory = defaultFactory;
        ThreadFactory namingThreadFactory = r -> {
            Thread thread = finalDefaultFactory.newThread(r);
            String name = thread.getName();
            thread.setName("pc-" + name);
            return thread;
        };
        ThreadPoolExecutor.AbortPolicy rejectionHandler = new ThreadPoolExecutor.AbortPolicy();
        LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
        return new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, workQueue, namingThreadFactory, rejectionHandler);
    }

    private void checkNotSubscribed(Consumer<K, V> consumerToCheck) {
        if (consumerToCheck instanceof MockConsumer) {
            return;
        }
        Set subscription = consumerToCheck.subscription();
        Set assignment = consumerToCheck.assignment();
        if (!subscription.isEmpty() || !assignment.isEmpty()) {
            throw new IllegalStateException("Consumer subscription must be managed by this class. Use " + this.getClass().getName() + "#subcribe methods instead.");
        }
    }

    @Override
    public void subscribe(Collection<String> topics) {
        log.debug("Subscribing to {}", topics);
        this.consumer.subscribe(topics, (ConsumerRebalanceListener)this);
    }

    @Override
    public void subscribe(Pattern pattern) {
        log.debug("Subscribing to {}", (Object)pattern);
        this.consumer.subscribe(pattern, (ConsumerRebalanceListener)this);
    }

    @Override
    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
        log.debug("Subscribing to {}", topics);
        this.usersConsumerRebalanceListener = Optional.of(callback);
        this.consumer.subscribe(topics, (ConsumerRebalanceListener)this);
    }

    @Override
    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
        log.debug("Subscribing to {}", (Object)pattern);
        this.usersConsumerRebalanceListener = Optional.of(callback);
        this.consumer.subscribe(pattern, (ConsumerRebalanceListener)this);
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        log.debug("Partitions revoked {}, state: {}", partitions, (Object)this.state);
        this.numberOfAssignedPartitions -= partitions.size();
        try {
            this.wm.onPartitionsRevoked(partitions);
            this.commitOffsetsThatAreReady();
            this.usersConsumerRebalanceListener.ifPresent(x -> x.onPartitionsRevoked(partitions));
        }
        catch (Exception e) {
            throw new InternalRuntimeError("onPartitionsRevoked event error", e);
        }
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        this.numberOfAssignedPartitions += partitions.size();
        log.info("Assigned {} total ({} new) partition(s) {}", new Object[]{this.numberOfAssignedPartitions, partitions.size(), partitions});
        this.wm.onPartitionsAssigned(partitions);
        this.usersConsumerRebalanceListener.ifPresent(x -> x.onPartitionsAssigned(partitions));
        this.notifySomethingToDo();
    }

    public void onPartitionsLost(Collection<TopicPartition> partitions) {
        this.numberOfAssignedPartitions -= partitions.size();
        this.wm.onPartitionsLost(partitions);
        this.usersConsumerRebalanceListener.ifPresent(x -> x.onPartitionsLost(partitions));
    }

    private void checkAutoCommitIsDisabled(Consumer<K, V> consumer) {
        if (consumer instanceof KafkaConsumer) {
            Field coordinatorField = consumer.getClass().getDeclaredField("coordinator");
            coordinatorField.setAccessible(true);
            ConsumerCoordinator coordinator = (ConsumerCoordinator)coordinatorField.get(consumer);
            if (coordinator == null) {
                throw new IllegalStateException("Coordinator for Consumer is null - missing GroupId? Reflection broken?");
            }
            Field autoCommitEnabledField = coordinator.getClass().getDeclaredField("autoCommitEnabled");
            autoCommitEnabledField.setAccessible(true);
            Boolean isAutoCommitEnabled = (Boolean)autoCommitEnabledField.get(coordinator);
            if (isAutoCommitEnabled.booleanValue()) {
                throw new IllegalArgumentException("Consumer auto commit must be disabled, as commits are handled by the library.");
            }
        }
    }

    @Override
    public void close() {
        Duration timeout = DrainingCloseable.DEFAULT_TIMEOUT.multipliedBy(2L);
        this.closeDontDrainFirst(timeout);
    }

    @Override
    public void close(Duration timeout, DrainingCloseable.DrainingMode drainMode) {
        if (this.state == State.closed) {
            log.info("Already closed, checking end state..");
        } else {
            log.info("Signaling to close...");
            switch (drainMode) {
                case DRAIN: {
                    log.info("Will wait for all in flight to complete before");
                    this.transitionToDraining();
                    break;
                }
                case DONT_DRAIN: {
                    log.info("Not waiting for remaining queued to complete, will finish in flight, then close...");
                    this.transitionToClosing();
                }
            }
            this.waitForClose(timeout);
        }
        if (this.controlThreadFuture.isPresent()) {
            log.debug("Checking for control thread exception...");
            Future<Boolean> future = this.controlThreadFuture.get();
            future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
        }
        log.info("Close complete.");
    }

    private void waitForClose(Duration timeout) throws TimeoutException, ExecutionException {
        log.info("Waiting on closed state...");
        while (!this.state.equals((Object)State.closed)) {
            try {
                Future<Boolean> booleanFuture = this.controlThreadFuture.get();
                log.debug("Blocking on control future");
                boolean signaled = booleanFuture.get(BackportUtils.toSeconds(timeout), TimeUnit.SECONDS);
                if (!signaled) {
                    throw new TimeoutException("Timeout waiting for system to close (" + timeout + ")");
                }
            }
            catch (InterruptedException e) {
                log.trace("Interrupted", (Throwable)e);
            }
            catch (ExecutionException | TimeoutException e) {
                log.error("Execution or timeout exception while waiting for the control thread to close cleanly (state was {}). Try increasing your time-out to allow the system to drain, or close without draining.", (Object)this.state, (Object)e);
                throw e;
            }
            log.trace("Still waiting for system to close...");
        }
    }

    private void doClose(Duration timeout) throws TimeoutException, ExecutionException {
        log.debug("Starting close process (state: {})...", (Object)this.state);
        log.debug("Shutting down execution pool...");
        List<Runnable> unfinished = this.workerThreadPool.shutdownNow();
        if (!unfinished.isEmpty()) {
            log.warn("Threads not done count: {}", (Object)unfinished.size());
        }
        log.debug("Awaiting worker pool termination...");
        boolean interrupted = true;
        while (interrupted) {
            log.debug("Still interrupted");
            try {
                boolean terminationFinishedWithoutTimeout = this.workerThreadPool.awaitTermination(BackportUtils.toSeconds(timeout), TimeUnit.SECONDS);
                interrupted = false;
                if (terminationFinishedWithoutTimeout) continue;
                log.warn("Thread execution pool termination await timeout ({})! Were any processing jobs dead locked or otherwise stuck?", (Object)timeout);
                boolean shutdown = this.workerThreadPool.isShutdown();
                boolean bl = this.workerThreadPool.isTerminated();
            }
            catch (InterruptedException e) {
                log.error("InterruptedException", (Throwable)e);
                interrupted = true;
            }
        }
        log.debug("Worker pool terminated.");
        this.processWorkCompleteMailBox();
        this.commitOffsetsThatAreReady();
        log.debug("Closing and waiting for broker poll system...");
        this.brokerPollSubsystem.closeAndWait();
        this.maybeCloseConsumer();
        this.producerManager.ifPresent(x -> x.close(timeout));
        log.debug("Close complete.");
        this.state = State.closed;
    }

    private void maybeCloseConsumer() {
        if (this.isResponsibleForCommits()) {
            this.consumer.close();
        }
    }

    private boolean isResponsibleForCommits() {
        return this.committer instanceof ProducerManager;
    }

    @Deprecated
    public void waitForProcessedNotCommitted(Duration timeout) {
        log.debug("Waiting processed but not committed...");
        Timer timer = Time.SYSTEM.timer(timeout);
        while (this.wm.isRecordsAwaitingToBeCommitted()) {
            log.trace("Waiting for no in processing...");
            Thread.sleep(100L);
            timer.update();
            if (!timer.isExpired()) continue;
            throw new TimeoutException("Waiting for no more records in processing");
        }
        log.debug("No longer anything in flight.");
    }

    private boolean isRecordsAwaitingProcessing() {
        boolean isRecordsAwaitingProcessing = this.wm.isRecordsAwaitingProcessing();
        boolean threadsDone = this.areMyThreadsDone();
        log.trace("isRecordsAwaitingProcessing {} || threadsDone {}", (Object)isRecordsAwaitingProcessing, (Object)threadsDone);
        return isRecordsAwaitingProcessing || threadsDone;
    }

    private void transitionToDraining() {
        log.debug("Transitioning to draining...");
        this.state = State.draining;
        this.notifySomethingToDo();
    }

    private void interruptControlThread() {
        if (this.blockableControlThread != null) {
            log.debug("Interrupting {} thread in case it's waiting for work", (Object)this.blockableControlThread.getName());
            this.blockableControlThread.interrupt();
        }
    }

    private boolean areMyThreadsDone() {
        if (BackportUtils.isEmpty(this.controlThreadFuture)) {
            return false;
        }
        return this.controlThreadFuture.get().isDone();
    }

    protected <R> void supervisorLoop(Function<PollContextInternal<K, V>, List<R>> userFunctionWrapped, java.util.function.Consumer<R> callback) {
        ExecutorService executorService;
        if (this.state != State.unused) {
            throw new IllegalStateException(StringUtils.msg("Invalid state - the consumer cannot be used more than once (current state is {})", new Object[]{this.state}));
        }
        this.state = State.running;
        this.brokerPollSubsystem.start(this.options.getManagedExecutorService());
        try {
            executorService = (ExecutorService)InitialContext.doLookup(this.options.getManagedExecutorService());
        }
        catch (NamingException e) {
            log.debug("Using Java SE Thread", (Throwable)e);
            executorService = Executors.newSingleThreadExecutor();
        }
        Callable<Boolean> controlTask = () -> {
            this.addInstanceMDC();
            log.info("Control loop starting up...");
            Thread controlThread = Thread.currentThread();
            controlThread.setName("pc-control");
            this.blockableControlThread = controlThread;
            while (this.state != State.closed) {
                try {
                    this.controlLoop(userFunctionWrapped, callback);
                }
                catch (Exception e) {
                    log.error("Error from poll control thread, will attempt controlled shutdown, then rethrow. Error: " + e.getMessage(), (Throwable)e);
                    this.doClose(DrainingCloseable.DEFAULT_TIMEOUT);
                    this.failureReason = new RuntimeException("Error from poll control thread: " + e.getMessage(), e);
                    throw this.failureReason;
                }
            }
            log.info("Control loop ending clean (state:{})...", (Object)this.state);
            return true;
        };
        Future<Boolean> controlTaskFutureResult = executorService.submit(controlTask);
        this.controlThreadFuture = Optional.of(controlTaskFutureResult);
    }

    private void addInstanceMDC() {
        this.myId.ifPresent(id -> MDC.put((String)MDC_INSTANCE_ID, (String)id));
    }

    private <R> void controlLoop(Function<PollContextInternal<K, V>, List<R>> userFunction, java.util.function.Consumer<R> callback) throws TimeoutException, ExecutionException {
        int newWork = this.handleWork(userFunction, callback);
        if (this.state == State.running && !this.wm.isSufficientlyLoaded() & this.brokerPollSubsystem.isPaused()) {
            log.debug("Found Poller paused with not enough front loaded messages, ensuring poller is awake (mail: {} vs target: {})", (Object)this.wm.getNumberOfWorkQueuedInShardsAwaitingSelection(), (Object)this.options.getTargetAmountOfRecordsInFlight());
            this.brokerPollSubsystem.wakeupIfPaused();
        }
        log.trace("Loop: Process mailbox");
        this.processWorkCompleteMailBox();
        if (this.isIdlingOrRunning()) {
            log.trace("Loop: Maybe commit");
            this.commitOffsetsMaybe();
        }
        log.trace("Loop: Running {} loop end plugin(s)", (Object)this.controlLoopHooks.size());
        this.controlLoopHooks.forEach(Runnable::run);
        log.trace("Current state: {}", (Object)this.state);
        switch (this.state) {
            case draining: {
                this.drain();
                break;
            }
            case closing: {
                this.doClose(DrainingCloseable.DEFAULT_TIMEOUT);
            }
        }
        this.brokerPollSubsystem.supervise();
        Duration duration = Duration.ofMillis(1L);
        try {
            Thread.sleep(duration.toMillis());
        }
        catch (InterruptedException e) {
            log.trace("Woke up", (Throwable)e);
        }
        log.trace("End of control loop, waiting processing {}, remaining in partition queues: {}, out for processing: {}. In state: {}", new Object[]{this.wm.getNumberOfWorkQueuedInShardsAwaitingSelection(), this.wm.getNumberOfEntriesInPartitionQueues(), this.wm.getNumberRecordsOutForProcessing(), this.state});
    }

    private <R> int handleWork(Function<PollContextInternal<K, V>, List<R>> userFunction, java.util.function.Consumer<R> callback) {
        this.checkPipelinePressure();
        int gotWorkCount = 0;
        if (this.state == State.running || this.state == State.draining) {
            int delta = this.calculateQuantityToRequest();
            List<WorkContainer<K, V>> records = this.wm.getWorkIfAvailable(delta);
            gotWorkCount = records.size();
            this.lastWorkRequestWasFulfilled = gotWorkCount >= delta;
            log.trace("Loop: Submit to pool");
            this.submitWorkToPool(userFunction, callback, records);
        }
        this.queueStatsLimiter.performIfNotLimited(() -> {
            int queueSize = this.getNumberOfUserFunctionsQueued();
            log.debug("Stats: \n- pool active: {} queued:{} \n- queue size: {} target: {} loading factor: {}", new Object[]{this.workerThreadPool.getActiveCount(), queueSize, queueSize, this.getPoolLoadTarget(), this.dynamicExtraLoadFactor.getCurrentFactor()});
        });
        return gotWorkCount;
    }

    protected <R> void submitWorkToPool(Function<PollContextInternal<K, V>, List<R>> usersFunction, java.util.function.Consumer<R> callback, List<WorkContainer<K, V>> workToProcess) {
        if (!workToProcess.isEmpty()) {
            log.debug("New work incoming: {}, Pool stats: {}", (Object)workToProcess.size(), (Object)this.workerThreadPool);
            List<List<WorkContainer<K, V>>> batches = this.makeBatches(workToProcess);
            if (log.isDebugEnabled()) {
                List sizes = batches.stream().map(List::size).sorted().collect(Collectors.toList());
                log.debug("Number batches: {}, smallest {}, sizes {}", new Object[]{batches.size(), sizes.stream().findFirst().get(), sizes});
                List integerStream = sizes.stream().filter(x -> x < this.options.getBatchSize()).collect(Collectors.toList());
                if (integerStream.size() > 1) {
                    log.warn("More than one batch isn't target size: {}. Input number of batches: {}", integerStream, (Object)batches.size());
                }
            }
            for (List<WorkContainer<K, V>> batch : batches) {
                this.submitWorkToPoolInner(usersFunction, callback, batch);
            }
        }
    }

    private <R> void submitWorkToPoolInner(Function<PollContextInternal<K, V>, List<R>> usersFunction, java.util.function.Consumer<R> callback, List<WorkContainer<K, V>> batch) {
        log.trace("Sending work ({}) to pool", batch);
        Future<List<?>> outputRecordFuture = this.workerThreadPool.submit(() -> {
            this.addInstanceMDC();
            return this.runUserFunction(usersFunction, callback, batch);
        });
        for (WorkContainer<K, V> workContainer : batch) {
            workContainer.setFuture(outputRecordFuture);
        }
    }

    private List<List<WorkContainer<K, V>>> makeBatches(List<WorkContainer<K, V>> workToProcess) {
        int maxBatchSize = this.options.getBatchSize();
        return AbstractParallelEoSStreamProcessor.partition(workToProcess, maxBatchSize);
    }

    private static <T> List<List<T>> partition(Collection<T> sourceCollection, int maxBatchSize) {
        ArrayList<List<T>> listOfBatches = new ArrayList<List<T>>();
        ArrayList<T> batchInConstruction = new ArrayList<T>();
        for (T item : sourceCollection) {
            batchInConstruction.add(item);
            if (batchInConstruction.size() != maxBatchSize) continue;
            listOfBatches.add(batchInConstruction);
            batchInConstruction = new ArrayList();
        }
        if (!batchInConstruction.isEmpty()) {
            listOfBatches.add(batchInConstruction);
        }
        log.debug("sourceCollection.size() {}, batches: {}, batch sizes {}", new Object[]{sourceCollection.size(), listOfBatches.size(), listOfBatches.stream().map(List::size).collect(Collectors.toList())});
        return listOfBatches;
    }

    protected int calculateQuantityToRequest() {
        int batchSize;
        int modulo;
        int target = this.getTargetOutForProcessing();
        int current = this.wm.getNumberRecordsOutForProcessing();
        int delta = target - current;
        if (this.options.isUsingBatching() && (modulo = delta % (batchSize = this.options.getBatchSize().intValue())) > 0) {
            int extraToFillBatch = target - modulo;
            delta += extraToFillBatch;
        }
        log.debug("Loop: Will try to get work - target: {}, current queue size: {}, requesting: {}, loading factor: {}", new Object[]{target, current, delta, this.dynamicExtraLoadFactor.getCurrentFactor()});
        return delta;
    }

    protected int getTargetOutForProcessing() {
        return this.getQueueTargetLoaded();
    }

    protected int getQueueTargetLoaded() {
        int batch = this.options.getBatchSize();
        return this.getPoolLoadTarget() * this.dynamicExtraLoadFactor.getCurrentFactor() * batch;
    }

    protected void checkPipelinePressure() {
        if (log.isTraceEnabled()) {
            log.trace("Queue pressure check: (current size: {}, loaded target: {}, factor: {}) if (isPoolQueueLow() {} && lastWorkRequestWasFulfilled {}))", new Object[]{this.getNumberOfUserFunctionsQueued(), this.getQueueTargetLoaded(), this.dynamicExtraLoadFactor.getCurrentFactor(), this.isPoolQueueLow(), this.lastWorkRequestWasFulfilled});
        }
        if (this.isPoolQueueLow() && this.lastWorkRequestWasFulfilled) {
            boolean steppedUp = this.dynamicExtraLoadFactor.maybeStepUp();
            if (steppedUp) {
                log.debug("isPoolQueueLow(): Executor pool queue is not loaded with enough work (queue: {} vs target: {}), stepped up loading factor to {}", new Object[]{this.getNumberOfUserFunctionsQueued(), this.getPoolLoadTarget(), this.dynamicExtraLoadFactor.getCurrentFactor()});
            } else if (this.dynamicExtraLoadFactor.isMaxReached()) {
                log.warn("isPoolQueueLow(): Max loading factor steps reached: {}/{}", (Object)this.dynamicExtraLoadFactor.getCurrentFactor(), (Object)this.dynamicExtraLoadFactor.getMaxFactor());
            }
        }
    }

    private int getPoolLoadTarget() {
        return this.options.getTargetAmountOfRecordsInFlight();
    }

    private boolean isPoolQueueLow() {
        int queueTarget;
        int queueSize = this.getNumberOfUserFunctionsQueued();
        boolean workAmountBelowTarget = queueSize <= (queueTarget = this.getPoolLoadTarget());
        log.debug("isPoolQueueLow()? workAmountBelowTarget {} {} vs {};", new Object[]{workAmountBelowTarget, queueSize, queueTarget});
        return workAmountBelowTarget;
    }

    private void drain() {
        log.debug("Signaling to drain...");
        this.brokerPollSubsystem.drain();
        if (!this.isRecordsAwaitingProcessing()) {
            this.transitionToClosing();
        } else {
            log.debug("Records still waiting processing, won't transition to closing.");
        }
    }

    private void transitionToClosing() {
        log.debug("Transitioning to closing...");
        this.state = this.state == State.unused ? State.closed : State.closing;
        this.notifySomethingToDo();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processWorkCompleteMailBox() {
        log.trace("Processing mailbox (might block waiting for results)...");
        ArrayDeque<ControllerEventMessage<K, V>> results = new ArrayDeque<ControllerEventMessage<K, V>>();
        Duration timeToBlockFor = this.getTimeToBlockFor();
        if (timeToBlockFor.toMillis() > 0L) {
            this.currentlyPollingWorkCompleteMailBox.getAndSet(true);
            if (log.isDebugEnabled()) {
                log.debug("Blocking poll on work until next scheduled offset commit attempt for {}. active threads: {}, queue: {}", new Object[]{timeToBlockFor, this.workerThreadPool.getActiveCount(), this.getNumberOfUserFunctionsQueued()});
            }
            log.trace("Blocking poll {}", (Object)timeToBlockFor);
            try {
                ControllerEventMessage<K, V> firstBlockingPoll = this.workMailBox.poll(timeToBlockFor.toMillis(), TimeUnit.MILLISECONDS);
                if (firstBlockingPoll == null) {
                    log.debug("Mailbox results returned null, indicating timeToBlockFor (which was set as {})", (Object)timeToBlockFor);
                } else {
                    log.debug("Work arrived in mailbox during blocking poll. (Timeout was set as {})", (Object)timeToBlockFor);
                    results.add(firstBlockingPoll);
                }
            }
            catch (InterruptedException e) {
                log.debug("Interrupted waiting on work results");
            }
            finally {
                this.currentlyPollingWorkCompleteMailBox.getAndSet(false);
            }
            log.trace("Blocking poll finish");
        }
        int size = this.workMailBox.size();
        log.trace("Draining {} more, got {} already...", (Object)size, (Object)results.size());
        this.workMailBox.drainTo(results, size);
        log.trace("Processing drained work {}...", (Object)results.size());
        for (ControllerEventMessage controllerEventMessage : results) {
            if (controllerEventMessage.isNewConsumerRecords()) {
                this.wm.registerWork(controllerEventMessage.getConsumerRecords());
                continue;
            }
            WorkContainer work = controllerEventMessage.getWorkContainer();
            MDC.put((String)"offset", (String)work.toString());
            this.wm.handleFutureResult(work);
            MDC.remove((String)"offset");
        }
    }

    private Duration getTimeToBlockFor() {
        Optional<Duration> lowestScheduledOpt;
        if (!this.wm.isWorkInFlightMeetingTarget() && (lowestScheduledOpt = this.wm.getLowestRetryTime()).isPresent()) {
            Duration retryDelay = this.options.getDefaultMessageRetryDelay();
            Duration lowestScheduled = lowestScheduledOpt.get();
            Duration timeBetweenCommits = this.getTimeBetweenCommits();
            Duration effectiveRetryDelay = lowestScheduled.toMillis() < retryDelay.toMillis() ? retryDelay : lowestScheduled;
            Duration result = timeBetweenCommits.toMillis() < effectiveRetryDelay.toMillis() ? timeBetweenCommits : effectiveRetryDelay;
            log.debug("Not enough work in flight, while work is waiting to be retried - so will only sleep until next retry time of {}", (Object)result);
            return result;
        }
        Duration effectiveCommitAttemptDelay = this.getTimeToNextCommitCheck();
        log.debug("Blocking normally until next commit time of {}", (Object)effectiveCommitAttemptDelay);
        return effectiveCommitAttemptDelay;
    }

    private boolean isIdlingOrRunning() {
        return this.state == State.running || this.state == State.draining || this.state == State.paused;
    }

    private void commitOffsetsMaybe() {
        if (this.isShouldCommitNow()) {
            this.commitOffsetsThatAreReady();
        }
        this.updateLastCommitCheckTime();
    }

    private boolean isShouldCommitNow() {
        boolean shouldCommitNow;
        Duration elapsedSinceLastCommit = this.lastCommitTime == null ? Duration.ofDays(1L) : Duration.between(this.lastCommitTime, Instant.now());
        boolean commitFrequencyOK = elapsedSinceLastCommit.compareTo(this.timeBetweenCommits) > 0;
        boolean lingerBeneficial = this.lingeringOnCommitWouldBeBeneficial();
        boolean isCommandedToCommit = this.isCommandedToCommit();
        boolean shouldDoANormalCommit = commitFrequencyOK && !lingerBeneficial;
        boolean bl = shouldCommitNow = shouldDoANormalCommit || isCommandedToCommit;
        if (log.isDebugEnabled()) {
            log.debug("Should commit this cycle? shouldCommitNow? " + shouldCommitNow + " : shouldDoANormalCommit? " + shouldDoANormalCommit + ", commitFrequencyOK? " + commitFrequencyOK + ", lingerBeneficial? " + lingerBeneficial + ", isCommandedToCommit? " + isCommandedToCommit);
        }
        return shouldCommitNow;
    }

    private int getNumberOfUserFunctionsQueued() {
        return this.workerThreadPool.getQueue().size();
    }

    private boolean lingeringOnCommitWouldBeBeneficial() {
        boolean workIsWaitingToBeCompletedSuccessfully = this.wm.workIsWaitingToBeProcessed();
        boolean workInFlight = this.wm.hasWorkInFlight();
        boolean workWaitingInMailbox = !this.workMailBox.isEmpty();
        boolean workWaitingToCommit = this.wm.hasWorkInCommitQueues();
        log.trace("workIsWaitingToBeCompletedSuccessfully {} || workInFlight {} || workWaitingInMailbox {} || !workWaitingToCommit {};", new Object[]{workIsWaitingToBeCompletedSuccessfully, workInFlight, workWaitingInMailbox, !workWaitingToCommit});
        boolean result = workIsWaitingToBeCompletedSuccessfully || workInFlight || workWaitingInMailbox || !workWaitingToCommit;
        return false;
    }

    private Duration getTimeToNextCommitCheck() {
        if (this.isIdlingOrRunning()) {
            Duration timeSinceLastCommit = this.getTimeSinceLastCheck();
            Duration timeBetweenCommits = this.getTimeBetweenCommits();
            Duration minus = timeBetweenCommits.minus(timeSinceLastCommit);
            return minus;
        }
        log.debug("System not {} (state: {}), so don't wait to commit, only a small thread yield time", (Object)State.running, (Object)this.state);
        return Duration.ZERO;
    }

    private Duration getTimeSinceLastCheck() {
        Instant now = this.clock.instant();
        return Duration.between(this.lastCommitCheckTime, now);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void commitOffsetsThatAreReady() {
        log.debug("Committing offsets that are ready...");
        AtomicBoolean atomicBoolean = this.commitCommand;
        synchronized (atomicBoolean) {
            log.debug("Committing offsets that are ready...");
            this.committer.retrieveOffsetsAndCommit();
            this.clearCommitCommand();
            this.lastCommitTime = Instant.now();
        }
    }

    private void updateLastCommitCheckTime() {
        this.lastCommitCheckTime = Instant.now();
    }

    protected <R> List<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> runUserFunction(Function<PollContextInternal<K, V>, List<R>> usersFunction, java.util.function.Consumer<R> callback, List<WorkContainer<K, V>> workContainerBatch) {
        try {
            if (log.isDebugEnabled()) {
                MDC.put((String)"offset", (String)(workContainerBatch.get(0).offset() + ""));
            }
            log.trace("Pool received: {}", workContainerBatch);
            boolean workIsStale = this.wm.checkIfWorkIsStale(workContainerBatch);
            if (workIsStale) {
                log.debug("Pool found work from old generation of assigned work, skipping message as epoch doesn't match current {}", workContainerBatch);
                return null;
            }
            PollContextInternal<K, V> context = new PollContextInternal<K, V>(workContainerBatch);
            List<R> resultsFromUserFunction = usersFunction.apply(context);
            for (WorkContainer<K, V> kvWorkContainer : workContainerBatch) {
                this.onUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction);
            }
            ArrayList<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> intermediateResults = new ArrayList<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>>();
            for (R r : resultsFromUserFunction) {
                log.trace("Running users call back...");
                callback.accept(r);
            }
            for (WorkContainer workContainer : workContainerBatch) {
                this.addToMailBoxOnUserFunctionSuccess(workContainer, resultsFromUserFunction);
            }
            log.trace("User function future registered");
            return intermediateResults;
        }
        catch (Exception e) {
            log.error("Exception caught in user function running stage, registering WC as failed, returning to mailbox", (Throwable)e);
            for (WorkContainer<K, V> wc : workContainerBatch) {
                wc.onUserFunctionFailure(e);
                this.addToMailbox(wc);
            }
            throw e;
        }
    }

    protected void addToMailBoxOnUserFunctionSuccess(WorkContainer<K, V> wc, List<?> resultsFromUserFunction) {
        this.addToMailbox(wc);
    }

    protected void onUserFunctionSuccess(WorkContainer<K, V> wc, List<?> resultsFromUserFunction) {
        log.trace("User function success");
        wc.onUserFunctionSuccess();
    }

    protected void addToMailbox(WorkContainer<K, V> wc) {
        String state = wc.isUserFunctionSucceeded() ? "succeeded" : "FAILED";
        log.trace("Adding {} {} to mailbox...", (Object)state, wc);
        this.workMailBox.add(ControllerEventMessage.of(wc));
    }

    public void registerWork(EpochAndRecordsMap<K, V> polledRecords) {
        log.debug("Adding {} to mailbox...", polledRecords);
        this.workMailBox.add(ControllerEventMessage.of(polledRecords));
    }

    public void notifySomethingToDo() {
        boolean noTransactionInProgress;
        boolean bl = noTransactionInProgress = this.producerManager.map(ProducerManager::isTransactionInProgress).orElse(false) == false;
        if (noTransactionInProgress) {
            log.trace("Interrupting control thread: Knock knock, wake up! You've got mail (tm)!");
            this.interruptControlThread();
        } else {
            log.trace("Would have interrupted control thread, but TX in progress");
        }
    }

    @Override
    public long workRemaining() {
        return this.wm.getNumberOfEntriesInPartitionQueues();
    }

    public void addLoopEndCallBack(Runnable r) {
        this.controlLoopHooks.add(r);
    }

    public void setLongPollTimeout(Duration ofMillis) {
        BrokerPollSystem.setLongPollTimeout(ofMillis);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void requestCommitAsap() {
        log.debug("Registering command to commit next chance");
        AtomicBoolean atomicBoolean = this.commitCommand;
        synchronized (atomicBoolean) {
            this.commitCommand.set(true);
        }
        this.notifySomethingToDo();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isCommandedToCommit() {
        AtomicBoolean atomicBoolean = this.commitCommand;
        synchronized (atomicBoolean) {
            return this.commitCommand.get();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void clearCommitCommand() {
        AtomicBoolean atomicBoolean = this.commitCommand;
        synchronized (atomicBoolean) {
            if (this.commitCommand.get()) {
                log.debug("Command to commit asap received, clearing");
                this.commitCommand.set(false);
            }
        }
    }

    @Override
    public void pauseIfRunning() {
        if (this.state == State.running) {
            log.info("Transitioning parallel consumer to state paused.");
            this.state = State.paused;
        } else {
            log.debug("Skipping transition of parallel consumer to state paused. Current state is {}.", (Object)this.state);
        }
    }

    @Override
    public void resumeIfPaused() {
        if (this.state == State.paused) {
            log.info("Transitioning paarallel consumer to state running.");
            this.state = State.running;
            this.notifySomethingToDo();
        } else {
            log.debug("Skipping transition of parallel consumer to state running. Current state is {}.", (Object)this.state);
        }
    }

    protected ParallelConsumerOptions getOptions() {
        return this.options;
    }

    void setClock(Clock clock) {
        this.clock = clock;
    }

    public void setTimeBetweenCommits(Duration timeBetweenCommits) {
        this.timeBetweenCommits = timeBetweenCommits;
    }

    public Duration getTimeBetweenCommits() {
        return this.timeBetweenCommits;
    }

    protected Optional<ProducerManager<K, V>> getProducerManager() {
        return this.producerManager;
    }

    public WorkManager<K, V> getWm() {
        return this.wm;
    }

    protected BlockingQueue<ControllerEventMessage<K, V>> getWorkMailBox() {
        return this.workMailBox;
    }

    public int getNumberOfAssignedPartitions() {
        return this.numberOfAssignedPartitions;
    }

    public void setMyId(Optional<String> myId) {
        this.myId = myId;
    }

    public Optional<String> getMyId() {
        return this.myId;
    }

    private static final class ControllerEventMessage<K, V> {
        private final WorkContainer<K, V> workContainer;
        private final EpochAndRecordsMap<K, V> consumerRecords;

        private boolean isWorkResult() {
            return this.workContainer != null;
        }

        private boolean isNewConsumerRecords() {
            return !this.isWorkResult();
        }

        private static <K, V> ControllerEventMessage<K, V> of(EpochAndRecordsMap<K, V> polledRecords) {
            return new ControllerEventMessage<K, V>(null, polledRecords);
        }

        public static <K, V> ControllerEventMessage<K, V> of(WorkContainer<K, V> work) {
            return new ControllerEventMessage<K, V>(work, null);
        }

        public WorkContainer<K, V> getWorkContainer() {
            return this.workContainer;
        }

        public EpochAndRecordsMap<K, V> getConsumerRecords() {
            return this.consumerRecords;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof ControllerEventMessage)) {
                return false;
            }
            ControllerEventMessage other = (ControllerEventMessage)o;
            WorkContainer<K, V> this$workContainer = this.getWorkContainer();
            WorkContainer<K, V> other$workContainer = other.getWorkContainer();
            if (this$workContainer == null ? other$workContainer != null : !((Object)this$workContainer).equals(other$workContainer)) {
                return false;
            }
            EpochAndRecordsMap<K, V> this$consumerRecords = this.getConsumerRecords();
            EpochAndRecordsMap<K, V> other$consumerRecords = other.getConsumerRecords();
            return !(this$consumerRecords == null ? other$consumerRecords != null : !((Object)this$consumerRecords).equals(other$consumerRecords));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            WorkContainer<K, V> $workContainer = this.getWorkContainer();
            result = result * 59 + ($workContainer == null ? 43 : ((Object)$workContainer).hashCode());
            EpochAndRecordsMap<K, V> $consumerRecords = this.getConsumerRecords();
            result = result * 59 + ($consumerRecords == null ? 43 : ((Object)$consumerRecords).hashCode());
            return result;
        }

        public String toString() {
            return "AbstractParallelEoSStreamProcessor.ControllerEventMessage(workContainer=" + this.getWorkContainer() + ", consumerRecords=" + this.getConsumerRecords() + ")";
        }

        private ControllerEventMessage(WorkContainer<K, V> workContainer, EpochAndRecordsMap<K, V> consumerRecords) {
            this.workContainer = workContainer;
            this.consumerRecords = consumerRecords;
        }
    }
}

