/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.CompletedFetch;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.Deserializers;
import org.apache.kafka.clients.consumer.internals.Fetch;
import org.apache.kafka.clients.consumer.internals.FetchBuffer;
import org.apache.kafka.clients.consumer.internals.FetchCollector;
import org.apache.kafka.clients.consumer.internals.FetchConfig;
import org.apache.kafka.clients.consumer.internals.FetchMetricsAggregator;
import org.apache.kafka.clients.consumer.internals.FetchMetricsManager;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class FetchCollectorTest {
    private static final int DEFAULT_RECORD_COUNT = 10;
    private static final int DEFAULT_MAX_POLL_RECORDS = 500;
    private final Time time = new MockTime(0L, 0L, 0L);
    private final TopicPartition topicAPartition0 = new TopicPartition("topic-a", 0);
    private final TopicPartition topicAPartition1 = new TopicPartition("topic-a", 1);
    private final TopicPartition topicAPartition2 = new TopicPartition("topic-a", 2);
    private final Set<TopicPartition> allPartitions = FetchCollectorTest.partitions(this.topicAPartition0, this.topicAPartition1, this.topicAPartition2);
    private LogContext logContext;
    private SubscriptionState subscriptions;
    private FetchConfig fetchConfig;
    private FetchMetricsManager metricsManager;
    private ConsumerMetadata metadata;
    private FetchBuffer fetchBuffer;
    private Deserializers<String, String> deserializers;
    private FetchCollector<String, String> fetchCollector;
    private CompletedFetchBuilder completedFetchBuilder;

    @Test
    public void testFetchNormal() {
        int recordCount = 500;
        this.buildDependencies();
        this.assignAndSeek(this.topicAPartition0);
        CompletedFetch completedFetch = this.completedFetchBuilder.recordCount(recordCount).build();
        Assertions.assertTrue((boolean)this.fetchBuffer.isEmpty());
        this.fetchBuffer.add(completedFetch);
        Assertions.assertFalse((boolean)this.fetchBuffer.isEmpty());
        Assertions.assertFalse((boolean)completedFetch.isInitialized());
        Fetch fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertFalse((boolean)fetch.isEmpty());
        Assertions.assertEquals((int)recordCount, (int)fetch.numRecords());
        Assertions.assertTrue((boolean)completedFetch.isInitialized());
        Assertions.assertFalse((boolean)completedFetch.isConsumed());
        Assertions.assertTrue((boolean)this.fetchBuffer.isEmpty());
        Assertions.assertNull((Object)this.fetchBuffer.peek());
        Assertions.assertNull((Object)this.fetchBuffer.poll());
        Assertions.assertNotNull((Object)this.fetchBuffer.nextInLineFetch());
        SubscriptionState.FetchPosition position = this.subscriptions.position(this.topicAPartition0);
        Assertions.assertEquals((long)recordCount, (long)position.offset);
        fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertEquals((int)0, (int)fetch.numRecords());
        Assertions.assertTrue((boolean)fetch.isEmpty());
        Assertions.assertTrue((boolean)completedFetch.isConsumed());
    }

    @Test
    public void testFetchWithReadReplica() {
        this.buildDependencies();
        this.assignAndSeek(this.topicAPartition0);
        int preferredReadReplicaId = 67;
        this.subscriptions.updatePreferredReadReplica(this.topicAPartition0, preferredReadReplicaId, () -> ((Time)this.time).milliseconds());
        Assertions.assertNotNull((Object)this.subscriptions.preferredReadReplica(this.topicAPartition0, this.time.milliseconds()));
        Assertions.assertEquals(Optional.of(preferredReadReplicaId), (Object)this.subscriptions.preferredReadReplica(this.topicAPartition0, this.time.milliseconds()));
        CompletedFetch completedFetch = this.completedFetchBuilder.build();
        this.fetchBuffer.add(completedFetch);
        Fetch fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertEquals((int)10, (int)fetch.numRecords());
        Assertions.assertEquals(Optional.of(preferredReadReplicaId), (Object)this.subscriptions.preferredReadReplica(this.topicAPartition0, this.time.milliseconds()));
    }

    @Test
    public void testNoResultsIfInitializing() {
        this.buildDependencies();
        this.assign(this.topicAPartition0);
        Assertions.assertNull((Object)this.subscriptions.position(this.topicAPartition0));
        Assertions.assertFalse((boolean)this.subscriptions.isFetchable(this.topicAPartition0));
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.topicAPartition0));
        CompletedFetch completedFetch = this.completedFetchBuilder.build();
        this.fetchBuffer.add(completedFetch);
        Fetch fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertEquals((int)0, (int)fetch.numRecords());
    }

    @ParameterizedTest
    @MethodSource(value={"testErrorInInitializeSource"})
    public void testErrorInInitialize(int recordCount, final RuntimeException expectedException) {
        this.buildDependencies();
        this.assignAndSeek(this.topicAPartition0);
        this.fetchCollector = new FetchCollector<String, String>(this.logContext, this.metadata, this.subscriptions, this.fetchConfig, this.deserializers, this.metricsManager, this.time){

            protected CompletedFetch initialize(CompletedFetch completedFetch) {
                throw expectedException;
            }
        };
        CompletedFetch completedFetch = this.completedFetchBuilder.recordCount(recordCount).build();
        this.fetchBuffer.add(completedFetch);
        Assertions.assertFalse((boolean)this.fetchBuffer.isEmpty());
        Assertions.assertThrows(expectedException.getClass(), () -> this.fetchCollector.collectFetch(this.fetchBuffer));
        Assertions.assertEquals((Object)(recordCount == 0 ? 1 : 0), (Object)this.fetchBuffer.isEmpty());
    }

    @Test
    public void testFetchingPausedPartitionsYieldsNoRecords() {
        this.buildDependencies();
        this.assignAndSeek(this.topicAPartition0);
        Assertions.assertFalse((boolean)this.subscriptions.isPaused(this.topicAPartition0));
        this.subscriptions.pause(this.topicAPartition0);
        Assertions.assertTrue((boolean)this.subscriptions.isPaused(this.topicAPartition0));
        CompletedFetch completedFetch = this.completedFetchBuilder.build();
        this.fetchBuffer.setNextInLineFetch(completedFetch);
        Assertions.assertSame((Object)this.fetchBuffer.nextInLineFetch(), (Object)completedFetch);
        Assertions.assertTrue((boolean)this.fetchBuffer.isEmpty());
        Assertions.assertTrue((boolean)this.subscriptions.isPaused(completedFetch.partition));
        Fetch fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertEquals((int)0, (int)fetch.numRecords());
        Assertions.assertFalse((boolean)this.fetchBuffer.isEmpty());
        Assertions.assertNull((Object)this.fetchBuffer.nextInLineFetch());
    }

    @ParameterizedTest
    @MethodSource(value={"testFetchWithMetadataRefreshErrorsSource"})
    public void testFetchWithMetadataRefreshErrors(Errors error) {
        this.buildDependencies();
        this.assignAndSeek(this.topicAPartition0);
        CompletedFetch completedFetch = this.completedFetchBuilder.error(error).build();
        this.fetchBuffer.add(completedFetch);
        int preferredReadReplicaId = 5;
        this.subscriptions.updatePreferredReadReplica(this.topicAPartition0, preferredReadReplicaId, () -> ((Time)this.time).milliseconds());
        Assertions.assertNotNull((Object)this.subscriptions.preferredReadReplica(this.topicAPartition0, this.time.milliseconds()));
        Assertions.assertEquals(Optional.of(preferredReadReplicaId), (Object)this.subscriptions.preferredReadReplica(this.topicAPartition0, this.time.milliseconds()));
        Fetch fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertTrue((boolean)fetch.isEmpty());
        Assertions.assertTrue((boolean)this.metadata.updateRequested());
        Assertions.assertEquals(Optional.empty(), (Object)this.subscriptions.preferredReadReplica(this.topicAPartition0, this.time.milliseconds()));
    }

    @Test
    public void testFetchWithOffsetOutOfRange() {
        this.buildDependencies();
        this.assignAndSeek(this.topicAPartition0);
        CompletedFetch completedFetch = this.completedFetchBuilder.build();
        this.fetchBuffer.add(completedFetch);
        Fetch fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertFalse((boolean)fetch.isEmpty());
        Assertions.assertEquals((int)10, (int)fetch.numRecords());
        completedFetch = this.completedFetchBuilder.fetchOffset(fetch.numRecords()).error(Errors.OFFSET_OUT_OF_RANGE).build();
        this.fetchBuffer.add(completedFetch);
        fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertTrue((boolean)fetch.isEmpty());
        completedFetch = this.completedFetchBuilder.fetchOffset(fetch.numRecords()).error(Errors.OFFSET_OUT_OF_RANGE).build();
        this.fetchBuffer.add(completedFetch);
        fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertTrue((boolean)fetch.isEmpty());
    }

    @Test
    public void testFetchWithOffsetOutOfRangeWithPreferredReadReplica() {
        int records = 10;
        this.buildDependencies(records);
        this.assignAndSeek(this.topicAPartition0);
        int preferredReadReplicaId = 67;
        this.subscriptions.updatePreferredReadReplica(this.topicAPartition0, preferredReadReplicaId, () -> ((Time)this.time).milliseconds());
        Assertions.assertNotNull((Object)this.subscriptions.preferredReadReplica(this.topicAPartition0, this.time.milliseconds()));
        Assertions.assertEquals(Optional.of(preferredReadReplicaId), (Object)this.subscriptions.preferredReadReplica(this.topicAPartition0, this.time.milliseconds()));
        CompletedFetch completedFetch = this.completedFetchBuilder.error(Errors.OFFSET_OUT_OF_RANGE).build();
        this.fetchBuffer.add(completedFetch);
        Fetch fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertTrue((boolean)fetch.isEmpty());
        Assertions.assertEquals(Optional.empty(), (Object)this.subscriptions.preferredReadReplica(this.topicAPartition0, this.time.milliseconds()));
    }

    @Test
    public void testFetchWithTopicAuthorizationFailed() {
        this.buildDependencies();
        this.assignAndSeek(this.topicAPartition0);
        CompletedFetch completedFetch = this.completedFetchBuilder.error(Errors.TOPIC_AUTHORIZATION_FAILED).build();
        this.fetchBuffer.add(completedFetch);
        Assertions.assertThrows(TopicAuthorizationException.class, () -> this.fetchCollector.collectFetch(this.fetchBuffer));
    }

    @Test
    public void testFetchWithUnknownLeaderEpoch() {
        this.buildDependencies();
        this.assignAndSeek(this.topicAPartition0);
        CompletedFetch completedFetch = this.completedFetchBuilder.error(Errors.UNKNOWN_LEADER_EPOCH).build();
        this.fetchBuffer.add(completedFetch);
        Fetch fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertTrue((boolean)fetch.isEmpty());
    }

    @Test
    public void testFetchWithUnknownServerError() {
        this.buildDependencies();
        this.assignAndSeek(this.topicAPartition0);
        CompletedFetch completedFetch = this.completedFetchBuilder.error(Errors.UNKNOWN_SERVER_ERROR).build();
        this.fetchBuffer.add(completedFetch);
        Fetch fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        Assertions.assertTrue((boolean)fetch.isEmpty());
    }

    @Test
    public void testFetchWithCorruptMessage() {
        this.buildDependencies();
        this.assignAndSeek(this.topicAPartition0);
        CompletedFetch completedFetch = this.completedFetchBuilder.error(Errors.CORRUPT_MESSAGE).build();
        this.fetchBuffer.add(completedFetch);
        Assertions.assertThrows(KafkaException.class, () -> this.fetchCollector.collectFetch(this.fetchBuffer));
    }

    @ParameterizedTest
    @MethodSource(value={"testFetchWithOtherErrorsSource"})
    public void testFetchWithOtherErrors(Errors error) {
        this.buildDependencies();
        this.assignAndSeek(this.topicAPartition0);
        CompletedFetch completedFetch = this.completedFetchBuilder.error(error).build();
        this.fetchBuffer.add(completedFetch);
        Assertions.assertThrows(IllegalStateException.class, () -> this.fetchCollector.collectFetch(this.fetchBuffer));
    }

    private static Set<TopicPartition> partitions(TopicPartition ... partitions) {
        return new HashSet<TopicPartition>(Arrays.asList(partitions));
    }

    private void buildDependencies() {
        this.buildDependencies(500);
    }

    private void buildDependencies(int maxPollRecords) {
        this.logContext = new LogContext();
        Properties p = new Properties();
        p.put("bootstrap.servers", "localhost:9092");
        p.put("key.deserializer", StringSerializer.class.getName());
        p.put("value.deserializer", StringSerializer.class.getName());
        p.put("max.poll.records", String.valueOf(maxPollRecords));
        ConsumerConfig config = new ConsumerConfig(p);
        this.deserializers = new Deserializers((Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
        this.subscriptions = ConsumerUtils.createSubscriptionState((ConsumerConfig)config, (LogContext)this.logContext);
        this.fetchConfig = new FetchConfig(config);
        Metrics metrics = ConsumerUtils.createMetrics((ConsumerConfig)config, (Time)this.time);
        this.metricsManager = ConsumerUtils.createFetchMetricsManager((Metrics)metrics);
        this.metadata = new ConsumerMetadata(0L, 1000L, 10000L, false, false, this.subscriptions, this.logContext, new ClusterResourceListeners());
        this.fetchCollector = new FetchCollector(this.logContext, this.metadata, this.subscriptions, this.fetchConfig, this.deserializers, this.metricsManager, this.time);
        this.fetchBuffer = new FetchBuffer(this.logContext);
        this.completedFetchBuilder = new CompletedFetchBuilder();
    }

    private void assign(TopicPartition ... partitions) {
        this.subscriptions.assignFromUser(FetchCollectorTest.partitions(partitions));
    }

    private void assignAndSeek(TopicPartition tp) {
        this.assign(tp);
        this.subscriptions.seek(tp, 0L);
    }

    private static Stream<Arguments> testFetchWithMetadataRefreshErrorsSource() {
        List<Errors> errors = Arrays.asList(Errors.NOT_LEADER_OR_FOLLOWER, Errors.REPLICA_NOT_AVAILABLE, Errors.KAFKA_STORAGE_ERROR, Errors.FENCED_LEADER_EPOCH, Errors.OFFSET_NOT_AVAILABLE, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_ID, Errors.INCONSISTENT_TOPIC_ID);
        return errors.stream().map(xva$0 -> Arguments.of((Object[])new Object[]{xva$0}));
    }

    private static Stream<Arguments> testFetchWithOtherErrorsSource() {
        ArrayList<Errors> errors = new ArrayList<Errors>(Arrays.asList(Errors.values()));
        errors.removeAll(Arrays.asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER, Errors.REPLICA_NOT_AVAILABLE, Errors.KAFKA_STORAGE_ERROR, Errors.FENCED_LEADER_EPOCH, Errors.OFFSET_NOT_AVAILABLE, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_ID, Errors.INCONSISTENT_TOPIC_ID, Errors.OFFSET_OUT_OF_RANGE, Errors.TOPIC_AUTHORIZATION_FAILED, Errors.UNKNOWN_LEADER_EPOCH, Errors.UNKNOWN_SERVER_ERROR, Errors.CORRUPT_MESSAGE));
        return errors.stream().map(xva$0 -> Arguments.of((Object[])new Object[]{xva$0}));
    }

    private static Stream<Arguments> testErrorInInitializeSource() {
        return Stream.of(Arguments.of((Object[])new Object[]{10, new RuntimeException()}), Arguments.of((Object[])new Object[]{0, new RuntimeException()}), Arguments.of((Object[])new Object[]{10, new KafkaException()}), Arguments.of((Object[])new Object[]{0, new KafkaException()}));
    }

    private class CompletedFetchBuilder {
        private long fetchOffset = 0L;
        private int recordCount = 10;
        private Errors error = null;

        private CompletedFetchBuilder() {
        }

        private CompletedFetchBuilder fetchOffset(long fetchOffset) {
            this.fetchOffset = fetchOffset;
            return this;
        }

        private CompletedFetchBuilder recordCount(int recordCount) {
            this.recordCount = recordCount;
            return this;
        }

        private CompletedFetchBuilder error(Errors error) {
            this.error = error;
            return this;
        }

        private CompletedFetch build() {
            MemoryRecords records;
            ByteBuffer allocate = ByteBuffer.allocate(1024);
            try (MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)allocate, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);){
                for (int i = 0; i < this.recordCount; ++i) {
                    builder.append(0L, "key".getBytes(), ("value-" + i).getBytes());
                }
                records = builder.build();
            }
            FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData().setPartitionIndex(FetchCollectorTest.this.topicAPartition0.partition()).setHighWatermark(1000L).setRecords((BaseRecords)records);
            if (this.error != null) {
                partitionData.setErrorCode(this.error.code());
            }
            FetchMetricsAggregator metricsAggregator = new FetchMetricsAggregator(FetchCollectorTest.this.metricsManager, FetchCollectorTest.this.allPartitions);
            return new CompletedFetch(FetchCollectorTest.this.logContext, FetchCollectorTest.this.subscriptions, BufferSupplier.create(), FetchCollectorTest.this.topicAPartition0, partitionData, metricsAggregator, Long.valueOf(this.fetchOffset), ApiKeys.FETCH.latestVersion());
        }
    }
}

