/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

public class ConsumerNetworkClientTest {
    private String topicName = "test";
    private MockTime time = new MockTime(1L);
    private Cluster cluster = TestUtils.singletonCluster(this.topicName, 1);
    private Node node = (Node)this.cluster.nodes().get(0);
    private Metadata metadata = new Metadata(100L, 50000L, new LogContext(), new ClusterResourceListeners());
    private MockClient client = new MockClient((Time)this.time, this.metadata);
    private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(), (KafkaClient)this.client, this.metadata, (Time)this.time, 100L, 1000, Integer.MAX_VALUE);

    @Test
    public void send() {
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(Errors.NONE));
        RequestFuture future = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount(this.node));
        Assert.assertFalse((boolean)future.isDone());
        this.consumerClient.poll(future);
        Assert.assertTrue((boolean)future.isDone());
        Assert.assertTrue((boolean)future.succeeded());
        ClientResponse clientResponse = (ClientResponse)future.value();
        HeartbeatResponse response = (HeartbeatResponse)clientResponse.responseBody();
        Assert.assertEquals((Object)Errors.NONE, (Object)response.error());
    }

    @Test
    public void sendWithinBlackoutPeriodAfterAuthenticationFailure() {
        this.client.authenticationFailed(this.node, 300L);
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(Errors.NONE));
        RequestFuture future = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        this.consumerClient.poll(future);
        Assert.assertTrue((boolean)future.failed());
        Assert.assertTrue((String)"Expected only an authentication error.", (boolean)(future.exception() instanceof AuthenticationException));
        this.time.sleep(30L);
        Assert.assertTrue((boolean)this.client.connectionFailed(this.node));
        RequestFuture future2 = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        this.consumerClient.poll(future2);
        Assert.assertTrue((boolean)future2.failed());
        Assert.assertTrue((String)"Expected only an authentication error.", (boolean)(future2.exception() instanceof AuthenticationException));
    }

    @Test
    public void multiSend() {
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(Errors.NONE));
        RequestFuture future1 = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        RequestFuture future2 = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        Assert.assertEquals((long)2L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertEquals((long)2L, (long)this.consumerClient.pendingRequestCount(this.node));
        this.consumerClient.awaitPendingRequests(this.node, this.time.timer(Long.MAX_VALUE));
        Assert.assertTrue((boolean)future1.succeeded());
        Assert.assertTrue((boolean)future2.succeeded());
    }

    @Test
    public void testDisconnectWithUnsentRequests() {
        RequestFuture future = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        Assert.assertTrue((boolean)this.consumerClient.hasPendingRequests(this.node));
        Assert.assertFalse((boolean)this.client.hasInFlightRequests(this.node.idString()));
        this.consumerClient.disconnectAsync(this.node);
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue((boolean)future.failed());
        Assert.assertTrue((boolean)(future.exception() instanceof DisconnectException));
    }

    @Test
    public void testDisconnectWithInFlightRequests() {
        RequestFuture future = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue((boolean)this.consumerClient.hasPendingRequests(this.node));
        Assert.assertTrue((boolean)this.client.hasInFlightRequests(this.node.idString()));
        this.consumerClient.disconnectAsync(this.node);
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue((boolean)future.failed());
        Assert.assertTrue((boolean)(future.exception() instanceof DisconnectException));
    }

    @Test
    public void testTimeoutUnsentRequest() {
        this.client.delayReady(this.node, 1000L);
        RequestFuture future = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat(), 500);
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue((boolean)this.consumerClient.hasPendingRequests());
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        this.time.sleep(501L);
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse((boolean)this.consumerClient.hasPendingRequests());
        Assert.assertTrue((boolean)future.failed());
        Assert.assertTrue((boolean)(future.exception() instanceof TimeoutException));
    }

    @Test
    public void doNotBlockIfPollConditionIsSatisfied() {
        NetworkClient mockNetworkClient = (NetworkClient)Mockito.mock(NetworkClient.class);
        ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(), (KafkaClient)mockNetworkClient, this.metadata, (Time)this.time, 100L, 1000, Integer.MAX_VALUE);
        consumerClient.poll(this.time.timer(Long.MAX_VALUE), () -> false);
        ((NetworkClient)Mockito.verify((Object)mockNetworkClient)).poll(ArgumentMatchers.eq((long)0L), ArgumentMatchers.anyLong());
    }

    @Test
    public void blockWhenPollConditionNotSatisfied() {
        long timeout = 4000L;
        NetworkClient mockNetworkClient = (NetworkClient)Mockito.mock(NetworkClient.class);
        ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(), (KafkaClient)mockNetworkClient, this.metadata, (Time)this.time, 100L, 1000, Integer.MAX_VALUE);
        Mockito.when((Object)mockNetworkClient.inFlightRequestCount()).thenReturn((Object)1);
        consumerClient.poll(this.time.timer(timeout), () -> true);
        ((NetworkClient)Mockito.verify((Object)mockNetworkClient)).poll(ArgumentMatchers.eq((long)timeout), ArgumentMatchers.anyLong());
    }

    @Test
    public void blockOnlyForRetryBackoffIfNoInflightRequests() {
        long retryBackoffMs = 100L;
        NetworkClient mockNetworkClient = (NetworkClient)Mockito.mock(NetworkClient.class);
        ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(), (KafkaClient)mockNetworkClient, this.metadata, (Time)this.time, retryBackoffMs, 1000, Integer.MAX_VALUE);
        Mockito.when((Object)mockNetworkClient.inFlightRequestCount()).thenReturn((Object)0);
        consumerClient.poll(this.time.timer(Long.MAX_VALUE), () -> true);
        ((NetworkClient)Mockito.verify((Object)mockNetworkClient)).poll(ArgumentMatchers.eq((long)retryBackoffMs), ArgumentMatchers.anyLong());
    }

    @Test
    public void wakeup() {
        RequestFuture future = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        this.consumerClient.wakeup();
        try {
            this.consumerClient.poll(this.time.timer(0L));
            Assert.fail();
        }
        catch (WakeupException wakeupException) {
            // empty catch block
        }
        this.client.respond((AbstractResponse)this.heartbeatResponse(Errors.NONE));
        this.consumerClient.poll(future);
        Assert.assertTrue((boolean)future.isDone());
    }

    @Test
    public void testDisconnectWakesUpPoll() throws Exception {
        final RequestFuture future = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        this.client.enableBlockingUntilWakeup(1);
        Thread t = new Thread(){

            @Override
            public void run() {
                ConsumerNetworkClientTest.this.consumerClient.poll(future);
            }
        };
        t.start();
        this.consumerClient.disconnectAsync(this.node);
        t.join();
        Assert.assertTrue((boolean)future.failed());
        Assert.assertTrue((boolean)(future.exception() instanceof DisconnectException));
    }

    @Test
    public void testAuthenticationExceptionPropagatedFromMetadata() {
        this.metadata.failedUpdate(this.time.milliseconds(), (KafkaException)((Object)new AuthenticationException("Authentication failed")));
        try {
            this.consumerClient.poll(this.time.timer(Duration.ZERO));
            Assert.fail((String)"Expected authentication error thrown");
        }
        catch (AuthenticationException e) {
            Assert.assertNull((Object)((Object)this.metadata.getAndClearMetadataException()));
        }
    }

    @Test(expected=InvalidTopicException.class)
    public void testInvalidTopicExceptionPropagatedFromMetadata() {
        MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("clusterId", 1, Collections.singletonMap("topic", Errors.INVALID_TOPIC_EXCEPTION), Collections.emptyMap());
        this.metadata.update(metadataResponse, this.time.milliseconds());
        this.consumerClient.poll(this.time.timer(Duration.ZERO));
    }

    @Test(expected=TopicAuthorizationException.class)
    public void testTopicAuthorizationExceptionPropagatedFromMetadata() {
        MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("clusterId", 1, Collections.singletonMap("topic", Errors.TOPIC_AUTHORIZATION_FAILED), Collections.emptyMap());
        this.metadata.update(metadataResponse, this.time.milliseconds());
        this.consumerClient.poll(this.time.timer(Duration.ZERO));
    }

    @Test
    public void testMetadataFailurePropagated() {
        KafkaException metadataException = new KafkaException();
        this.metadata.failedUpdate(this.time.milliseconds(), metadataException);
        try {
            this.consumerClient.poll(this.time.timer(Duration.ZERO));
            Assert.fail((String)"Expected poll to throw exception");
        }
        catch (Exception e) {
            Assert.assertEquals((Object)((Object)metadataException), (Object)e);
        }
    }

    @Test
    public void testFutureCompletionOutsidePoll() throws Exception {
        final RequestFuture future = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        this.consumerClient.pollNoWakeup();
        this.client.enableBlockingUntilWakeup(2);
        Thread t1 = new Thread(){

            @Override
            public void run() {
                ConsumerNetworkClientTest.this.consumerClient.pollNoWakeup();
            }
        };
        t1.start();
        Thread.sleep(50L);
        Thread t2 = new Thread(){

            @Override
            public void run() {
                ConsumerNetworkClientTest.this.consumerClient.poll(future);
            }
        };
        t2.start();
        Thread.sleep(50L);
        this.client.respond((AbstractResponse)this.heartbeatResponse(Errors.NONE));
        this.client.wakeup();
        t1.join();
        t2.join();
        Assert.assertTrue((boolean)future.succeeded());
    }

    @Test
    public void testAwaitForMetadataUpdateWithTimeout() {
        Assert.assertFalse((boolean)this.consumerClient.awaitMetadataUpdate(this.time.timer(10L)));
    }

    @Test
    public void sendExpiry() {
        int requestTimeoutMs = 10;
        final AtomicBoolean isReady = new AtomicBoolean();
        final AtomicBoolean disconnected = new AtomicBoolean();
        this.client = new MockClient(this.time, this.metadata){

            @Override
            public boolean ready(Node node, long now) {
                if (isReady.get()) {
                    return super.ready(node, now);
                }
                return false;
            }

            @Override
            public boolean connectionFailed(Node node) {
                return disconnected.get();
            }
        };
        this.consumerClient = new ConsumerNetworkClient(new LogContext(), (KafkaClient)this.client, this.metadata, (Time)this.time, 100L, requestTimeoutMs, Integer.MAX_VALUE);
        RequestFuture future1 = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount(this.node));
        Assert.assertFalse((boolean)future1.isDone());
        this.time.sleep(requestTimeoutMs + 1);
        RequestFuture future2 = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        Assert.assertEquals((long)2L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertEquals((long)2L, (long)this.consumerClient.pendingRequestCount(this.node));
        Assert.assertFalse((boolean)future2.isDone());
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)future1.isDone());
        Assert.assertFalse((boolean)future1.succeeded());
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount(this.node));
        Assert.assertFalse((boolean)future2.isDone());
        isReady.set(true);
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(Errors.NONE));
        this.consumerClient.poll(future2);
        ClientResponse clientResponse = (ClientResponse)future2.value();
        HeartbeatResponse response = (HeartbeatResponse)clientResponse.responseBody();
        Assert.assertEquals((Object)Errors.NONE, (Object)response.error());
        isReady.set(false);
        RequestFuture future3 = this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount(this.node));
        disconnected.set(true);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue((boolean)future3.isDone());
        Assert.assertFalse((boolean)future3.succeeded());
        Assert.assertEquals((long)0L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertEquals((long)0L, (long)this.consumerClient.pendingRequestCount(this.node));
    }

    @Test
    public void testTrySend() {
        final AtomicBoolean isReady = new AtomicBoolean();
        final AtomicInteger checkCount = new AtomicInteger();
        this.client = new MockClient(this.time, this.metadata){

            @Override
            public boolean ready(Node node, long now) {
                checkCount.incrementAndGet();
                if (isReady.get()) {
                    return super.ready(node, now);
                }
                return false;
            }
        };
        this.consumerClient = new ConsumerNetworkClient(new LogContext(), (KafkaClient)this.client, this.metadata, (Time)this.time, 100L, 10, Integer.MAX_VALUE);
        this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        this.consumerClient.send(this.node, (AbstractRequest.Builder)this.heartbeat());
        Assert.assertEquals((long)2L, (long)this.consumerClient.pendingRequestCount(this.node));
        Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount(this.node.idString()));
        this.consumerClient.trySend(this.time.milliseconds());
        Assert.assertEquals((long)1L, (long)checkCount.getAndSet(0));
        Assert.assertEquals((long)2L, (long)this.consumerClient.pendingRequestCount(this.node));
        Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount(this.node.idString()));
        isReady.set(true);
        this.consumerClient.trySend(this.time.milliseconds());
        Assert.assertEquals((long)2L, (long)checkCount.getAndSet(0));
        Assert.assertEquals((long)2L, (long)this.consumerClient.pendingRequestCount(this.node));
        Assert.assertEquals((long)2L, (long)this.client.inFlightRequestCount(this.node.idString()));
    }

    private HeartbeatRequest.Builder heartbeat() {
        return new HeartbeatRequest.Builder(new HeartbeatRequestData().setGroupId("group").setGenerationId(1).setMemberId("memberId"));
    }

    private HeartbeatResponse heartbeatResponse(Errors error) {
        return new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(error.code()));
    }
}

