/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.internals.CompletedFetch;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.Deserializers;
import org.apache.kafka.clients.consumer.internals.Fetch;
import org.apache.kafka.clients.consumer.internals.FetchBuffer;
import org.apache.kafka.clients.consumer.internals.FetchConfig;
import org.apache.kafka.clients.consumer.internals.FetchMetricsManager;
import org.apache.kafka.clients.consumer.internals.FetchUtils;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;

public class FetchCollector<K, V> {
    private final Logger log;
    private final ConsumerMetadata metadata;
    private final SubscriptionState subscriptions;
    private final FetchConfig fetchConfig;
    private final Deserializers<K, V> deserializers;
    private final FetchMetricsManager metricsManager;
    private final Time time;

    public FetchCollector(LogContext logContext, ConsumerMetadata metadata, SubscriptionState subscriptions, FetchConfig fetchConfig, Deserializers<K, V> deserializers, FetchMetricsManager metricsManager, Time time) {
        this.log = logContext.logger(FetchCollector.class);
        this.metadata = metadata;
        this.subscriptions = subscriptions;
        this.fetchConfig = fetchConfig;
        this.deserializers = deserializers;
        this.metricsManager = metricsManager;
        this.time = time;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Fetch<K, V> collectFetch(FetchBuffer fetchBuffer) {
        Fetch<K, V> fetch = Fetch.empty();
        ArrayDeque<CompletedFetch> pausedCompletedFetches = new ArrayDeque<CompletedFetch>();
        int recordsRemaining = this.fetchConfig.maxPollRecords;
        try {
            while (recordsRemaining > 0) {
                CompletedFetch nextInLineFetch = fetchBuffer.nextInLineFetch();
                if (nextInLineFetch == null || nextInLineFetch.isConsumed()) {
                    CompletedFetch completedFetch = fetchBuffer.peek();
                    if (completedFetch == null) {
                        break;
                    }
                    if (!completedFetch.isInitialized()) {
                        try {
                            fetchBuffer.setNextInLineFetch(this.initialize(completedFetch));
                        }
                        catch (Exception e) {
                            if (fetch.isEmpty() && FetchResponse.recordsOrFail(completedFetch.partitionData).sizeInBytes() == 0) {
                                fetchBuffer.poll();
                            }
                            throw e;
                        }
                    } else {
                        fetchBuffer.setNextInLineFetch(completedFetch);
                    }
                    fetchBuffer.poll();
                    continue;
                }
                if (this.subscriptions.isPaused(nextInLineFetch.partition)) {
                    this.log.debug("Skipping fetching records for assigned partition {} because it is paused", (Object)nextInLineFetch.partition);
                    pausedCompletedFetches.add(nextInLineFetch);
                    fetchBuffer.setNextInLineFetch(null);
                    continue;
                }
                Fetch<K, V> nextFetch = this.fetchRecords(nextInLineFetch, recordsRemaining);
                recordsRemaining -= nextFetch.numRecords();
                fetch.add(nextFetch);
            }
        }
        catch (KafkaException e) {
            if (fetch.isEmpty()) {
                throw e;
            }
        }
        finally {
            fetchBuffer.addAll(pausedCompletedFetches);
        }
        return fetch;
    }

    private Fetch<K, V> fetchRecords(CompletedFetch nextInLineFetch, int maxRecords) {
        TopicPartition tp = nextInLineFetch.partition;
        if (!this.subscriptions.isAssigned(tp)) {
            this.log.debug("Not returning fetched records for partition {} since it is no longer assigned", (Object)tp);
        } else if (!this.subscriptions.isFetchable(tp)) {
            this.log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", (Object)tp);
        } else {
            SubscriptionState.FetchPosition position = this.subscriptions.position(tp);
            if (position == null) {
                throw new IllegalStateException("Missing position for fetchable partition " + tp);
            }
            if (nextInLineFetch.nextFetchOffset() == position.offset) {
                Long lead;
                Long partitionLag;
                List<ConsumerRecord<K, V>> partRecords = nextInLineFetch.fetchRecords(this.fetchConfig, this.deserializers, maxRecords);
                this.log.trace("Returning {} fetched records at offset {} for assigned partition {}", partRecords.size(), position, tp);
                boolean positionAdvanced = false;
                if (nextInLineFetch.nextFetchOffset() > position.offset) {
                    SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition(nextInLineFetch.nextFetchOffset(), nextInLineFetch.lastEpoch(), position.currentLeader);
                    this.log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`", position, nextPosition, tp, partRecords.size());
                    this.subscriptions.position(tp, nextPosition);
                    positionAdvanced = true;
                }
                if ((partitionLag = this.subscriptions.partitionLag(tp, this.fetchConfig.isolationLevel)) != null) {
                    this.metricsManager.recordPartitionLag(tp, partitionLag);
                }
                if ((lead = this.subscriptions.partitionLead(tp)) != null) {
                    this.metricsManager.recordPartitionLead(tp, lead);
                }
                return Fetch.forPartition(tp, partRecords, positionAdvanced);
            }
            this.log.debug("Ignoring fetched records for {} at offset {} since the current position is {}", tp, nextInLineFetch.nextFetchOffset(), position);
        }
        this.log.trace("Draining fetched records for partition {}", (Object)tp);
        nextInLineFetch.drain();
        return Fetch.empty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected CompletedFetch initialize(CompletedFetch completedFetch) {
        TopicPartition tp = completedFetch.partition;
        Errors error = Errors.forCode(completedFetch.partitionData.errorCode());
        boolean recordMetrics = true;
        try {
            if (!this.subscriptions.hasValidPosition(tp)) {
                this.log.debug("Ignoring fetched records for partition {} since it no longer has valid position", (Object)tp);
                CompletedFetch completedFetch2 = null;
                return completedFetch2;
            }
            if (error == Errors.NONE) {
                CompletedFetch ret = this.handleInitializeSuccess(completedFetch);
                recordMetrics = ret == null;
                CompletedFetch completedFetch3 = ret;
                return completedFetch3;
            }
            this.handleInitializeErrors(completedFetch, error);
            CompletedFetch completedFetch4 = null;
            return completedFetch4;
        }
        finally {
            if (recordMetrics) {
                completedFetch.recordAggregatedMetrics(0, 0);
            }
            if (error != Errors.NONE) {
                this.subscriptions.movePartitionToEnd(tp);
            }
        }
    }

    private CompletedFetch handleInitializeSuccess(CompletedFetch completedFetch) {
        TopicPartition tp = completedFetch.partition;
        long fetchOffset = completedFetch.nextFetchOffset();
        SubscriptionState.FetchPosition position = this.subscriptions.positionOrNull(tp);
        if (position == null || position.offset != fetchOffset) {
            this.log.debug("Discarding stale fetch response for partition {} since its offset {} does not match the expected offset {} or the partition has been unassigned", tp, fetchOffset, position);
            return null;
        }
        FetchResponseData.PartitionData partition = completedFetch.partitionData;
        this.log.trace("Preparing to read {} bytes of data for partition {} with offset {}", FetchResponse.recordsSize(partition), tp, position);
        Iterator<? extends RecordBatch> batches = FetchResponse.recordsOrFail(partition).batches().iterator();
        if (!batches.hasNext() && FetchResponse.recordsSize(partition) > 0) {
            if (completedFetch.requestVersion < 3) {
                Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
                throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " + recordTooLargePartitions + " whose size is larger than the fetch size " + this.fetchConfig.fetchSize + " and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or newer to avoid this issue. Alternately, increase the fetch size on the client (using " + "max.partition.fetch.bytes" + ")", recordTooLargePartitions);
            }
            throw new KafkaException("Failed to make progress reading messages at " + tp + "=" + fetchOffset + ". Received a non-empty fetch response from the server, but no complete records were found.");
        }
        if (!this.updatePartitionState(partition, tp)) {
            return null;
        }
        completedFetch.setInitialized();
        return completedFetch;
    }

    private boolean updatePartitionState(FetchResponseData.PartitionData partitionData, TopicPartition tp) {
        if (partitionData.highWatermark() >= 0L) {
            this.log.trace("Updating high watermark for partition {} to {}", (Object)tp, (Object)partitionData.highWatermark());
            if (!this.subscriptions.tryUpdatingHighWatermark(tp, partitionData.highWatermark())) {
                return false;
            }
        }
        if (partitionData.logStartOffset() >= 0L) {
            this.log.trace("Updating log start offset for partition {} to {}", (Object)tp, (Object)partitionData.logStartOffset());
            if (!this.subscriptions.tryUpdatingLogStartOffset(tp, partitionData.logStartOffset())) {
                return false;
            }
        }
        if (partitionData.lastStableOffset() >= 0L) {
            this.log.trace("Updating last stable offset for partition {} to {}", (Object)tp, (Object)partitionData.lastStableOffset());
            if (!this.subscriptions.tryUpdatingLastStableOffset(tp, partitionData.lastStableOffset())) {
                return false;
            }
        }
        if (FetchResponse.isPreferredReplica(partitionData)) {
            return this.subscriptions.tryUpdatingPreferredReadReplica(tp, partitionData.preferredReadReplica(), () -> {
                long expireTimeMs = this.time.milliseconds() + this.metadata.metadataExpireMs();
                this.log.debug("Updating preferred read replica for partition {} to {}, set to expire at {}", tp, partitionData.preferredReadReplica(), expireTimeMs);
                return expireTimeMs;
            });
        }
        return true;
    }

    /*
     * Enabled aggressive block sorting
     */
    private void handleInitializeErrors(CompletedFetch completedFetch, Errors error) {
        TopicPartition tp = completedFetch.partition;
        long fetchOffset = completedFetch.nextFetchOffset();
        if (error == Errors.NOT_LEADER_OR_FOLLOWER || error == Errors.REPLICA_NOT_AVAILABLE || error == Errors.KAFKA_STORAGE_ERROR || error == Errors.FENCED_LEADER_EPOCH || error == Errors.OFFSET_NOT_AVAILABLE) {
            this.log.debug("Error in fetch for partition {}: {}", (Object)tp, (Object)error.exceptionName());
            FetchUtils.requestMetadataUpdate(this.metadata, this.subscriptions, tp);
            return;
        }
        if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
            this.log.warn("Received unknown topic or partition error in fetch for partition {}", (Object)tp);
            FetchUtils.requestMetadataUpdate(this.metadata, this.subscriptions, tp);
            return;
        }
        if (error == Errors.UNKNOWN_TOPIC_ID) {
            this.log.warn("Received unknown topic ID error in fetch for partition {}", (Object)tp);
            FetchUtils.requestMetadataUpdate(this.metadata, this.subscriptions, tp);
            return;
        }
        if (error == Errors.INCONSISTENT_TOPIC_ID) {
            this.log.warn("Received inconsistent topic ID error in fetch for partition {}", (Object)tp);
            FetchUtils.requestMetadataUpdate(this.metadata, this.subscriptions, tp);
            return;
        }
        if (error == Errors.OFFSET_OUT_OF_RANGE) {
            Optional<Integer> clearedReplicaId = this.subscriptions.clearPreferredReadReplica(tp);
            if (clearedReplicaId.isPresent()) {
                this.log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}", new Object[]{clearedReplicaId.get(), tp, error, fetchOffset});
                return;
            }
            SubscriptionState.FetchPosition position = this.subscriptions.positionOrNull(tp);
            if (position == null || fetchOffset != position.offset) {
                this.log.debug("Discarding stale fetch response for partition {} since the fetched offset {} does not match the current offset {} or the partition has been unassigned", tp, fetchOffset, position);
                return;
            }
            String errorMessage = "Fetch position " + position + " is out of range for partition " + tp;
            if (this.subscriptions.hasDefaultOffsetResetPolicy()) {
                this.log.info("{}, resetting offset", (Object)errorMessage);
                this.subscriptions.requestOffsetResetIfPartitionAssigned(tp);
                return;
            }
            this.log.info("{}, raising error to the application since no reset policy is configured", (Object)errorMessage);
            throw new OffsetOutOfRangeException(errorMessage, Collections.singletonMap(tp, position.offset));
        }
        if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
            this.log.warn("Not authorized to read from partition {}.", (Object)tp);
            throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
        }
        if (error == Errors.UNKNOWN_LEADER_EPOCH) {
            this.log.debug("Received unknown leader epoch error in fetch for partition {}", (Object)tp);
            return;
        }
        if (error == Errors.UNKNOWN_SERVER_ERROR) {
            this.log.warn("Unknown server error while fetching offset {} for topic-partition {}", (Object)fetchOffset, (Object)tp);
            return;
        }
        if (error != Errors.CORRUPT_MESSAGE) throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching at offset " + fetchOffset + " from topic-partition " + tp);
        throw new KafkaException("Encountered corrupt message when fetching offset " + fetchOffset + " for topic-partition " + tp);
    }
}

