/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.telemetry.internals;

import io.opentelemetry.proto.common.v1.KeyValue;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.message.PushTelemetryRequestData;
import org.apache.kafka.common.message.PushTelemetryResponseData;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.PushTelemetryRequest;
import org.apache.kafka.common.requests.PushTelemetryResponse;
import org.apache.kafka.common.telemetry.ClientTelemetryState;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector;
import org.apache.kafka.common.telemetry.internals.MetricKey;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class ClientTelemetryReporterTest {
    private MockTime time;
    private ClientTelemetryReporter clientTelemetryReporter;
    private Map<String, Object> configs;
    private MetricsContext metricsContext;
    private Uuid uuid;
    private ClientTelemetryReporter.ClientTelemetrySubscription subscription;

    @BeforeEach
    public void setUp() {
        this.time = new MockTime();
        this.clientTelemetryReporter = new ClientTelemetryReporter((Time)this.time);
        this.configs = new HashMap<String, Object>();
        this.metricsContext = new KafkaMetricsContext("test");
        this.uuid = Uuid.randomUuid();
        this.subscription = new ClientTelemetryReporter.ClientTelemetrySubscription(this.uuid, 1234, 20000, Collections.emptyList(), true, null);
    }

    @Test
    public void testInitTelemetryReporter() {
        this.configs.put("client.id", "test-client");
        this.configs.put("client.rack", "rack");
        this.clientTelemetryReporter.configure(this.configs);
        this.clientTelemetryReporter.contextChange(this.metricsContext);
        Assertions.assertNotNull((Object)this.clientTelemetryReporter.metricsCollector());
        Assertions.assertNotNull((Object)this.clientTelemetryReporter.telemetryProvider().resource());
        Assertions.assertEquals((int)1, (int)this.clientTelemetryReporter.telemetryProvider().resource().getAttributesCount());
        Assertions.assertEquals((Object)"client_rack", (Object)this.clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getKey());
        Assertions.assertEquals((Object)"rack", (Object)this.clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getValue().getStringValue());
    }

    @Test
    public void testInitTelemetryReporterNoCollector() {
        MetricsContext metricsContext = Collections::emptyMap;
        this.clientTelemetryReporter.configure(this.configs);
        this.clientTelemetryReporter.contextChange(metricsContext);
        Assertions.assertNull((Object)this.clientTelemetryReporter.metricsCollector());
    }

    @Test
    public void testProducerLabels() {
        this.configs.put("client.id", "test-client");
        this.configs.put("group.id", "group-id");
        this.configs.put("group.instance.id", "group-instance-id");
        this.configs.put("transactional.id", "transaction-id");
        this.configs.put("client.rack", "rack");
        this.clientTelemetryReporter.configure(this.configs);
        this.clientTelemetryReporter.contextChange((MetricsContext)new KafkaMetricsContext("kafka.producer"));
        Assertions.assertNotNull((Object)this.clientTelemetryReporter.metricsCollector());
        Assertions.assertNotNull((Object)this.clientTelemetryReporter.telemetryProvider().resource());
        List attributes = this.clientTelemetryReporter.telemetryProvider().resource().getAttributesList();
        Assertions.assertEquals((int)2, (int)attributes.size());
        attributes.forEach(attribute -> {
            if (attribute.getKey().equals("client_rack")) {
                Assertions.assertEquals((Object)"rack", (Object)attribute.getValue().getStringValue());
            } else if (attribute.getKey().equals("transactional_id")) {
                Assertions.assertEquals((Object)"transaction-id", (Object)attribute.getValue().getStringValue());
            }
        });
    }

    @Test
    public void testConsumerLabels() {
        this.configs.put("client.id", "test-client");
        this.configs.put("group.id", "group-id");
        this.configs.put("group.instance.id", "group-instance-id");
        this.configs.put("transactional.id", "transaction-id");
        this.configs.put("client.rack", "rack");
        this.clientTelemetryReporter.configure(this.configs);
        this.clientTelemetryReporter.contextChange((MetricsContext)new KafkaMetricsContext("kafka.consumer"));
        Assertions.assertNotNull((Object)this.clientTelemetryReporter.metricsCollector());
        Assertions.assertNotNull((Object)this.clientTelemetryReporter.telemetryProvider().resource());
        List attributes = this.clientTelemetryReporter.telemetryProvider().resource().getAttributesList();
        Assertions.assertEquals((int)3, (int)attributes.size());
        attributes.forEach(attribute -> {
            if (attribute.getKey().equals("client_rack")) {
                Assertions.assertEquals((Object)"rack", (Object)attribute.getValue().getStringValue());
            } else if (attribute.getKey().equals("group_id")) {
                Assertions.assertEquals((Object)"group-id", (Object)attribute.getValue().getStringValue());
            } else if (attribute.getKey().equals("group_instance_id")) {
                Assertions.assertEquals((Object)"group-instance-id", (Object)attribute.getValue().getStringValue());
            }
        });
    }

    @Test
    public void testTelemetryReporterClose() {
        this.clientTelemetryReporter.close();
        Assertions.assertEquals((Object)ClientTelemetryState.TERMINATED, (Object)((ClientTelemetryReporter.DefaultClientTelemetrySender)this.clientTelemetryReporter.telemetrySender()).state());
    }

    @Test
    public void testTelemetryReporterCloseMultipleTimesNoException() {
        this.clientTelemetryReporter.close();
        this.clientTelemetryReporter.close();
        Assertions.assertEquals((Object)ClientTelemetryState.TERMINATED, (Object)((ClientTelemetryReporter.DefaultClientTelemetrySender)this.clientTelemetryReporter.telemetrySender()).state());
    }

    @Test
    public void testUpdateMetricsLabels() {
        this.clientTelemetryReporter.configure(this.configs);
        this.clientTelemetryReporter.contextChange(this.metricsContext);
        Assertions.assertTrue((boolean)this.clientTelemetryReporter.telemetryProvider().resource().getAttributesList().isEmpty());
        this.clientTelemetryReporter.updateMetricsLabels(Collections.singletonMap("key1", "value1"));
        Assertions.assertEquals((int)1, (int)this.clientTelemetryReporter.telemetryProvider().resource().getAttributesList().size());
        Assertions.assertEquals((Object)"key1", (Object)((KeyValue)this.clientTelemetryReporter.telemetryProvider().resource().getAttributesList().get(0)).getKey());
        Assertions.assertEquals((Object)"value1", (Object)((KeyValue)this.clientTelemetryReporter.telemetryProvider().resource().getAttributesList().get(0)).getValue().getStringValue());
        this.clientTelemetryReporter.updateMetricsLabels(Collections.singletonMap("key2", "value2"));
        Assertions.assertEquals((int)2, (int)this.clientTelemetryReporter.telemetryProvider().resource().getAttributesList().size());
        this.clientTelemetryReporter.telemetryProvider().resource().getAttributesList().forEach(attribute -> {
            if (attribute.getKey().equals("key1")) {
                Assertions.assertEquals((Object)"value1", (Object)attribute.getValue().getStringValue());
            } else {
                Assertions.assertEquals((Object)"key2", (Object)attribute.getKey());
                Assertions.assertEquals((Object)"value2", (Object)attribute.getValue().getStringValue());
            }
        });
        this.clientTelemetryReporter.updateMetricsLabels(Collections.singletonMap("key2", "valueUpdated"));
        Assertions.assertEquals((int)2, (int)this.clientTelemetryReporter.telemetryProvider().resource().getAttributesList().size());
        this.clientTelemetryReporter.telemetryProvider().resource().getAttributesList().forEach(attribute -> {
            if (attribute.getKey().equals("key1")) {
                Assertions.assertEquals((Object)"value1", (Object)attribute.getValue().getStringValue());
            } else {
                Assertions.assertEquals((Object)"key2", (Object)attribute.getKey());
                Assertions.assertEquals((Object)"valueUpdated", (Object)attribute.getValue().getStringValue());
            }
        });
    }

    @Test
    public void testTelemetrySenderTimeToNextUpdate() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender)this.clientTelemetryReporter.telemetrySender();
        Assertions.assertEquals((Object)ClientTelemetryState.SUBSCRIPTION_NEEDED, (Object)telemetrySender.state());
        Assertions.assertEquals((long)0L, (long)telemetrySender.timeToNextUpdate(100L));
        telemetrySender.updateSubscriptionResult(this.subscription, this.time.milliseconds());
        Assertions.assertEquals((float)20000.0f, (float)telemetrySender.timeToNextUpdate(100L), (float)200.0f);
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Assertions.assertEquals((long)100L, (long)telemetrySender.timeToNextUpdate(100L));
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        long time = telemetrySender.timeToNextUpdate(100L);
        Assertions.assertTrue((time > 0L && (double)time >= 0.5 * (double)time && (double)time <= 1.5 * (double)time ? 1 : 0) != 0);
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
        Assertions.assertEquals((long)100L, (long)telemetrySender.timeToNextUpdate(100L));
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_NEEDED));
        Assertions.assertEquals((long)0L, (long)telemetrySender.timeToNextUpdate(100L));
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS));
        Assertions.assertEquals((long)Long.MAX_VALUE, (long)telemetrySender.timeToNextUpdate(100L));
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.TERMINATED));
        Assertions.assertThrows(IllegalStateException.class, () -> telemetrySender.timeToNextUpdate(100L));
    }

    @Test
    public void testCreateRequestSubscriptionNeeded() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender)this.clientTelemetryReporter.telemetrySender();
        Assertions.assertEquals((Object)ClientTelemetryState.SUBSCRIPTION_NEEDED, (Object)telemetrySender.state());
        Optional requestOptional = telemetrySender.createRequest();
        Assertions.assertNotNull((Object)requestOptional);
        Assertions.assertTrue((boolean)requestOptional.isPresent());
        Assertions.assertTrue((boolean)(((AbstractRequest.Builder)requestOptional.get()).build() instanceof GetTelemetrySubscriptionsRequest));
        GetTelemetrySubscriptionsRequest request = (GetTelemetrySubscriptionsRequest)((AbstractRequest.Builder)requestOptional.get()).build();
        GetTelemetrySubscriptionsRequest expectedResult = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData().setClientInstanceId(Uuid.ZERO_UUID), true).build();
        Assertions.assertEquals((Object)expectedResult.data(), (Object)request.data());
        Assertions.assertEquals((Object)ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS, (Object)telemetrySender.state());
    }

    @Test
    public void testCreateRequestSubscriptionNeededAfterExistingSubscription() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender)this.clientTelemetryReporter.telemetrySender();
        telemetrySender.updateSubscriptionResult(this.subscription, this.time.milliseconds());
        Assertions.assertEquals((Object)ClientTelemetryState.SUBSCRIPTION_NEEDED, (Object)telemetrySender.state());
        Optional requestOptional = telemetrySender.createRequest();
        Assertions.assertNotNull((Object)requestOptional);
        Assertions.assertTrue((boolean)requestOptional.isPresent());
        Assertions.assertTrue((boolean)(((AbstractRequest.Builder)requestOptional.get()).build() instanceof GetTelemetrySubscriptionsRequest));
        GetTelemetrySubscriptionsRequest request = (GetTelemetrySubscriptionsRequest)((AbstractRequest.Builder)requestOptional.get()).build();
        GetTelemetrySubscriptionsRequest expectedResult = (GetTelemetrySubscriptionsRequest)new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData().setClientInstanceId(this.subscription.clientInstanceId()), true).build();
        Assertions.assertEquals((Object)expectedResult.data(), (Object)request.data());
        Assertions.assertEquals((Object)ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS, (Object)telemetrySender.state());
    }

    @Test
    public void testCreateRequestPushNeeded() {
        this.clientTelemetryReporter.configure(this.configs);
        this.clientTelemetryReporter.contextChange(this.metricsContext);
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender)this.clientTelemetryReporter.telemetrySender();
        telemetrySender.updateSubscriptionResult(this.subscription, this.time.milliseconds());
        telemetrySender.createRequest();
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Optional requestOptional = telemetrySender.createRequest();
        Assertions.assertNotNull((Object)requestOptional);
        Assertions.assertTrue((boolean)requestOptional.isPresent());
        Assertions.assertTrue((boolean)(((AbstractRequest.Builder)requestOptional.get()).build() instanceof PushTelemetryRequest));
        PushTelemetryRequest request = (PushTelemetryRequest)((AbstractRequest.Builder)requestOptional.get()).build();
        PushTelemetryRequest expectedResult = (PushTelemetryRequest)new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(this.subscription.clientInstanceId()).setSubscriptionId(this.subscription.subscriptionId()), true).build();
        Assertions.assertEquals((Object)expectedResult.data(), (Object)request.data());
        Assertions.assertEquals((Object)ClientTelemetryState.PUSH_IN_PROGRESS, (Object)telemetrySender.state());
    }

    @Test
    public void testCreateRequestPushNeededWithoutSubscription() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender)this.clientTelemetryReporter.telemetrySender();
        telemetrySender.createRequest();
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Optional requestOptional = telemetrySender.createRequest();
        Assertions.assertNotNull((Object)requestOptional);
        Assertions.assertFalse((boolean)requestOptional.isPresent());
        Assertions.assertEquals((Object)ClientTelemetryState.SUBSCRIPTION_NEEDED, (Object)telemetrySender.state());
    }

    @Test
    public void testCreateRequestInvalidState() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender)this.clientTelemetryReporter.telemetrySender();
        telemetrySender.updateSubscriptionResult(this.subscription, this.time.milliseconds());
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Assertions.assertFalse((boolean)telemetrySender.createRequest().isPresent());
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
        Assertions.assertFalse((boolean)telemetrySender.createRequest().isPresent());
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_NEEDED));
        Assertions.assertFalse((boolean)telemetrySender.createRequest().isPresent());
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS));
        Assertions.assertFalse((boolean)telemetrySender.createRequest().isPresent());
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.TERMINATED));
        Assertions.assertFalse((boolean)telemetrySender.createRequest().isPresent());
    }

    @Test
    public void testCreateRequestPushNoCollector() {
        long now = this.time.milliseconds();
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender)this.clientTelemetryReporter.telemetrySender();
        telemetrySender.createRequest();
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        telemetrySender.updateSubscriptionResult(this.subscription, now);
        long interval = telemetrySender.timeToNextUpdate(100L);
        Assertions.assertTrue((interval > 0L && interval != 2000L && (double)interval >= 0.5 * (double)interval && (double)interval <= 1.5 * (double)interval ? 1 : 0) != 0);
        this.time.sleep(1000L);
        Optional requestOptional = telemetrySender.createRequest();
        Assertions.assertFalse((boolean)requestOptional.isPresent());
        Assertions.assertEquals((long)20000L, (long)telemetrySender.timeToNextUpdate(100L));
        Assertions.assertEquals((long)(now + 1000L), (long)telemetrySender.lastRequestMs());
    }

    @Test
    public void testHandleResponseGetSubscriptions() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender)this.clientTelemetryReporter.telemetrySender();
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Uuid clientInstanceId = Uuid.randomUuid();
        GetTelemetrySubscriptionsResponse response = new GetTelemetrySubscriptionsResponse(new GetTelemetrySubscriptionsResponseData().setClientInstanceId(clientInstanceId).setSubscriptionId(5678).setAcceptedCompressionTypes(Collections.singletonList(CompressionType.GZIP.id)).setPushIntervalMs(20000).setRequestedMetrics(Collections.singletonList("*")));
        telemetrySender.handleResponse(response);
        Assertions.assertEquals((Object)ClientTelemetryState.PUSH_NEEDED, (Object)telemetrySender.state());
        ClientTelemetryReporter.ClientTelemetrySubscription subscription = telemetrySender.subscription();
        Assertions.assertNotNull((Object)subscription);
        Assertions.assertEquals((Object)clientInstanceId, (Object)subscription.clientInstanceId());
        Assertions.assertEquals((int)5678, (int)subscription.subscriptionId());
        Assertions.assertEquals(Collections.singletonList(CompressionType.GZIP), (Object)subscription.acceptedCompressionTypes());
        Assertions.assertEquals((int)20000, (int)subscription.pushIntervalMs());
        Assertions.assertEquals((Object)ClientTelemetryUtils.SELECTOR_ALL_METRICS, (Object)subscription.selector());
    }

    @Test
    public void testHandleResponseGetSubscriptionsWithoutMetrics() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender)this.clientTelemetryReporter.telemetrySender();
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Uuid clientInstanceId = Uuid.randomUuid();
        GetTelemetrySubscriptionsResponse response = new GetTelemetrySubscriptionsResponse(new GetTelemetrySubscriptionsResponseData().setClientInstanceId(clientInstanceId).setSubscriptionId(5678).setAcceptedCompressionTypes(Collections.singletonList(CompressionType.GZIP.id)).setPushIntervalMs(20000));
        telemetrySender.handleResponse(response);
        Assertions.assertEquals((Object)ClientTelemetryState.SUBSCRIPTION_NEEDED, (Object)telemetrySender.state());
        ClientTelemetryReporter.ClientTelemetrySubscription subscription = telemetrySender.subscription();
        Assertions.assertNotNull((Object)subscription);
        Assertions.assertEquals((Object)clientInstanceId, (Object)subscription.clientInstanceId());
        Assertions.assertEquals((int)5678, (int)subscription.subscriptionId());
        Assertions.assertEquals(Collections.singletonList(CompressionType.GZIP), (Object)subscription.acceptedCompressionTypes());
        Assertions.assertEquals((int)20000, (int)subscription.pushIntervalMs());
        Assertions.assertEquals((Object)ClientTelemetryUtils.SELECTOR_NO_METRICS, (Object)subscription.selector());
    }

    @Test
    public void testHandleResponseGetTelemetryErrorResponse() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender)this.clientTelemetryReporter.telemetrySender();
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        GetTelemetrySubscriptionsResponse response = new GetTelemetrySubscriptionsResponse(new GetTelemetrySubscriptionsResponseData().setErrorCode(Errors.THROTTLING_QUOTA_EXCEEDED.code()));
        telemetrySender.handleResponse(response);
        Assertions.assertEquals((Object)ClientTelemetryState.SUBSCRIPTION_NEEDED, (Object)telemetrySender.state());
        Assertions.assertEquals((long)300000L, (long)telemetrySender.intervalMs());
        Assertions.assertTrue((boolean)telemetrySender.enabled());
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        response = new GetTelemetrySubscriptionsResponse(new GetTelemetrySubscriptionsResponseData().setErrorCode(Errors.INVALID_REQUEST.code()));
        telemetrySender.handleResponse(response);
        Assertions.assertEquals((Object)ClientTelemetryState.SUBSCRIPTION_NEEDED, (Object)telemetrySender.state());
        Assertions.assertEquals((long)Integer.MAX_VALUE, (long)telemetrySender.intervalMs());
        Assertions.assertFalse((boolean)telemetrySender.enabled());
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        telemetrySender.enabled(true);
        response = new GetTelemetrySubscriptionsResponse(new GetTelemetrySubscriptionsResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code()));
        telemetrySender.handleResponse(response);
        Assertions.assertEquals((Object)ClientTelemetryState.SUBSCRIPTION_NEEDED, (Object)telemetrySender.state());
        Assertions.assertEquals((long)Integer.MAX_VALUE, (long)telemetrySender.intervalMs());
        Assertions.assertFalse((boolean)telemetrySender.enabled());
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        telemetrySender.enabled(true);
        response = new GetTelemetrySubscriptionsResponse(new GetTelemetrySubscriptionsResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()));
        telemetrySender.handleResponse(response);
        Assertions.assertEquals((Object)ClientTelemetryState.SUBSCRIPTION_NEEDED, (Object)telemetrySender.state());
        Assertions.assertEquals((long)Integer.MAX_VALUE, (long)telemetrySender.intervalMs());
        Assertions.assertFalse((boolean)telemetrySender.enabled());
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
    }

    @Test
    public void testHandleResponseSubscriptionChange() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender)this.clientTelemetryReporter.telemetrySender();
        telemetrySender.updateSubscriptionResult(this.subscription, this.time.milliseconds());
        KafkaMetricsCollector kafkaMetricsCollector = (KafkaMetricsCollector)Mockito.mock(KafkaMetricsCollector.class);
        this.clientTelemetryReporter.metricsCollector(kafkaMetricsCollector);
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Uuid clientInstanceId = Uuid.randomUuid();
        GetTelemetrySubscriptionsResponse response = new GetTelemetrySubscriptionsResponse(new GetTelemetrySubscriptionsResponseData().setClientInstanceId(clientInstanceId).setSubscriptionId(15678).setAcceptedCompressionTypes(Collections.singletonList(CompressionType.ZSTD.id)).setPushIntervalMs(10000).setDeltaTemporality(false).setRequestedMetrics(Collections.singletonList("org.apache.kafka.producer")));
        telemetrySender.handleResponse(response);
        Assertions.assertEquals((Object)ClientTelemetryState.PUSH_NEEDED, (Object)telemetrySender.state());
        ClientTelemetryReporter.ClientTelemetrySubscription responseSubscription = telemetrySender.subscription();
        Assertions.assertNotNull((Object)responseSubscription);
        Assertions.assertEquals((Object)clientInstanceId, (Object)responseSubscription.clientInstanceId());
        Assertions.assertEquals((int)15678, (int)responseSubscription.subscriptionId());
        Assertions.assertEquals(Collections.singletonList(CompressionType.ZSTD), (Object)responseSubscription.acceptedCompressionTypes());
        Assertions.assertEquals((int)10000, (int)responseSubscription.pushIntervalMs());
        Assertions.assertFalse((boolean)responseSubscription.deltaTemporality());
        Assertions.assertTrue((boolean)responseSubscription.selector().test(new MetricKey("org.apache.kafka.producer")));
        Assertions.assertTrue((boolean)responseSubscription.selector().test(new MetricKey("org.apache.kafka.producerabc")));
        Assertions.assertTrue((boolean)responseSubscription.selector().test(new MetricKey("org.apache.kafka.producer.abc")));
        Assertions.assertFalse((boolean)responseSubscription.selector().test(new MetricKey("org.apache.kafka.produce")));
        ((KafkaMetricsCollector)Mockito.verify((Object)kafkaMetricsCollector, (VerificationMode)Mockito.times((int)1))).metricsReset();
    }

    @Test
    public void testHandleResponsePushTelemetry() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender)this.clientTelemetryReporter.telemetrySender();
        telemetrySender.updateSubscriptionResult(this.subscription, this.time.milliseconds());
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
        PushTelemetryResponse response = new PushTelemetryResponse(new PushTelemetryResponseData());
        telemetrySender.handleResponse(response);
        Assertions.assertEquals((Object)ClientTelemetryState.PUSH_NEEDED, (Object)telemetrySender.state());
        Assertions.assertEquals((long)this.subscription.pushIntervalMs(), (long)telemetrySender.intervalMs());
        Assertions.assertTrue((boolean)telemetrySender.enabled());
    }

    @Test
    public void testHandleResponsePushTelemetryErrorResponse() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender)this.clientTelemetryReporter.telemetrySender();
        telemetrySender.updateSubscriptionResult(this.subscription, this.time.milliseconds());
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
        PushTelemetryResponse response = new PushTelemetryResponse(new PushTelemetryResponseData().setErrorCode(Errors.UNKNOWN_SUBSCRIPTION_ID.code()));
        telemetrySender.handleResponse(response);
        Assertions.assertEquals((Object)ClientTelemetryState.SUBSCRIPTION_NEEDED, (Object)telemetrySender.state());
        Assertions.assertEquals((long)0L, (long)telemetrySender.intervalMs());
        Assertions.assertTrue((boolean)telemetrySender.enabled());
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
        response = new PushTelemetryResponse(new PushTelemetryResponseData().setErrorCode(Errors.UNSUPPORTED_COMPRESSION_TYPE.code()));
        telemetrySender.handleResponse(response);
        Assertions.assertEquals((Object)ClientTelemetryState.SUBSCRIPTION_NEEDED, (Object)telemetrySender.state());
        Assertions.assertEquals((long)0L, (long)telemetrySender.intervalMs());
        Assertions.assertTrue((boolean)telemetrySender.enabled());
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
        response = new PushTelemetryResponse(new PushTelemetryResponseData().setErrorCode(Errors.TELEMETRY_TOO_LARGE.code()));
        telemetrySender.handleResponse(response);
        Assertions.assertEquals((Object)ClientTelemetryState.SUBSCRIPTION_NEEDED, (Object)telemetrySender.state());
        Assertions.assertEquals((long)20000L, (long)telemetrySender.intervalMs());
        Assertions.assertTrue((boolean)telemetrySender.enabled());
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
        response = new PushTelemetryResponse(new PushTelemetryResponseData().setErrorCode(Errors.THROTTLING_QUOTA_EXCEEDED.code()));
        telemetrySender.handleResponse(response);
        Assertions.assertEquals((Object)ClientTelemetryState.SUBSCRIPTION_NEEDED, (Object)telemetrySender.state());
        Assertions.assertEquals((long)20000L, (long)telemetrySender.intervalMs());
        Assertions.assertTrue((boolean)telemetrySender.enabled());
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
        response = new PushTelemetryResponse(new PushTelemetryResponseData().setErrorCode(Errors.INVALID_REQUEST.code()));
        telemetrySender.handleResponse(response);
        Assertions.assertEquals((Object)ClientTelemetryState.SUBSCRIPTION_NEEDED, (Object)telemetrySender.state());
        Assertions.assertEquals((long)Integer.MAX_VALUE, (long)telemetrySender.intervalMs());
        Assertions.assertFalse((boolean)telemetrySender.enabled());
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
        telemetrySender.enabled(true);
        response = new PushTelemetryResponse(new PushTelemetryResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code()));
        telemetrySender.handleResponse(response);
        Assertions.assertEquals((Object)ClientTelemetryState.SUBSCRIPTION_NEEDED, (Object)telemetrySender.state());
        Assertions.assertEquals((long)Integer.MAX_VALUE, (long)telemetrySender.intervalMs());
        Assertions.assertFalse((boolean)telemetrySender.enabled());
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
        telemetrySender.enabled(true);
        response = new PushTelemetryResponse(new PushTelemetryResponseData().setErrorCode(Errors.INVALID_RECORD.code()));
        telemetrySender.handleResponse(response);
        Assertions.assertEquals((Object)ClientTelemetryState.SUBSCRIPTION_NEEDED, (Object)telemetrySender.state());
        Assertions.assertEquals((long)Integer.MAX_VALUE, (long)telemetrySender.intervalMs());
        Assertions.assertFalse((boolean)telemetrySender.enabled());
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
        telemetrySender.enabled(true);
        response = new PushTelemetryResponse(new PushTelemetryResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()));
        telemetrySender.handleResponse(response);
        Assertions.assertEquals((Object)ClientTelemetryState.SUBSCRIPTION_NEEDED, (Object)telemetrySender.state());
        Assertions.assertEquals((long)Integer.MAX_VALUE, (long)telemetrySender.intervalMs());
        Assertions.assertFalse((boolean)telemetrySender.enabled());
    }

    @Test
    public void testClientInstanceId() throws InterruptedException {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender)this.clientTelemetryReporter.telemetrySender();
        Assertions.assertTrue((boolean)telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        CountDownLatch lock = new CountDownLatch(2);
        AtomicReference clientInstanceId = new AtomicReference();
        new Thread(() -> {
            try {
                clientInstanceId.set(telemetrySender.clientInstanceId(Duration.ofMillis(10000L)));
            }
            finally {
                lock.countDown();
            }
        }).start();
        new Thread(() -> {
            try {
                telemetrySender.updateSubscriptionResult(this.subscription, this.time.milliseconds());
            }
            finally {
                lock.countDown();
            }
        }).start();
        Assertions.assertTrue((boolean)lock.await(2000L, TimeUnit.MILLISECONDS));
        Assertions.assertNotNull(clientInstanceId.get());
        Assertions.assertTrue((boolean)((Optional)clientInstanceId.get()).isPresent());
        Assertions.assertEquals((Object)this.uuid, ((Optional)clientInstanceId.get()).get());
    }

    @Test
    public void testComputeStaggeredIntervalMs() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender)this.clientTelemetryReporter.telemetrySender();
        Assertions.assertEquals((int)0, (int)telemetrySender.computeStaggeredIntervalMs(0, 0.5, 1.5));
        Assertions.assertEquals((int)1, (int)telemetrySender.computeStaggeredIntervalMs(1, 0.99, 1.0));
        long timeMs = telemetrySender.computeStaggeredIntervalMs(1000, 0.5, 1.5);
        Assertions.assertTrue((timeMs >= 500L && timeMs <= 1500L ? 1 : 0) != 0);
    }

    @AfterEach
    public void tearDown() {
        this.clientTelemetryReporter.close();
    }
}

