package org.apache.flink.runtime.registration;

import java.util.ArrayDeque;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Mockito;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/registration/RetryingRegistrationTest.class */
class RetryingRegistrationTest {

    @RegisterExtension
    public static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();
    private TestingRpcService rpcService;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/registration/RetryingRegistrationTest$TestRegistrationRejection.class */
    public static class TestRegistrationRejection extends RegistrationResponse.Rejection {
        private final RejectionReason rejectionReason;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/flink/runtime/registration/RetryingRegistrationTest$TestRegistrationRejection$RejectionReason.class */
        public enum RejectionReason {
            REJECTED
        }

        public TestRegistrationRejection(RejectionReason rejectionReason) {
            this.rejectionReason = rejectionReason;
        }

        public RejectionReason getRejectionReason() {
            return this.rejectionReason;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/registration/RetryingRegistrationTest$TestRegistrationSuccess.class */
    public static class TestRegistrationSuccess extends RegistrationResponse.Success {
        private static final long serialVersionUID = 5542698790917150604L;
        private final String correlationId;

        public TestRegistrationSuccess(String str) {
            this.correlationId = str;
        }

        public String getCorrelationId() {
            return this.correlationId;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/registration/RetryingRegistrationTest$TestRetryingRegistration.class */
    static class TestRetryingRegistration extends RetryingRegistration<UUID, TestRegistrationGateway, TestRegistrationSuccess, TestRegistrationRejection> {
        static final long MAX_TIMEOUT = 200;
        static final long DELAY_ON_ERROR = 200;
        static final long DELAY_ON_FAILURE = 200;
        static final long INITIAL_TIMEOUT = 20;
        static final RetryingRegistrationConfiguration RETRYING_REGISTRATION_CONFIGURATION = new RetryingRegistrationConfiguration(INITIAL_TIMEOUT, 200, 200, 200);

        public TestRetryingRegistration(RpcService rpcService, String str, UUID uuid) {
            this(rpcService, str, uuid, RETRYING_REGISTRATION_CONFIGURATION);
        }

        public TestRetryingRegistration(RpcService rpcService, String str, UUID uuid, RetryingRegistrationConfiguration retryingRegistrationConfiguration) {
            super(LoggerFactory.getLogger(RetryingRegistrationTest.class), rpcService, "TestEndpoint", TestRegistrationGateway.class, str, uuid, retryingRegistrationConfiguration);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public CompletableFuture<RegistrationResponse> invokeRegistration(TestRegistrationGateway testRegistrationGateway, UUID uuid, long j) {
            return testRegistrationGateway.registrationCall(uuid, j);
        }
    }

    RetryingRegistrationTest() {
    }

    @BeforeEach
    void setup() {
        this.rpcService = new TestingRpcService();
    }

    @AfterEach
    void tearDown() throws ExecutionException, InterruptedException {
        if (this.rpcService != null) {
            this.rpcService.closeAsync().get();
        }
    }

    @Test
    void testSimpleSuccessfulRegistration() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        ManualResponseTestRegistrationGateway manualResponseTestRegistrationGateway = new ManualResponseTestRegistrationGateway(new TestRegistrationSuccess("laissez les bon temps roulez"));
        try {
            this.rpcService.registerGateway("<test-address>", manualResponseTestRegistrationGateway);
            TestRetryingRegistration testRetryingRegistration = new TestRetryingRegistration(this.rpcService, "<test-address>", randomUUID);
            testRetryingRegistration.startRegistration();
            CompletableFuture future = testRetryingRegistration.getFuture();
            Assertions.assertThat(future).isNotNull();
            Assertions.assertThat(testRetryingRegistration.getFuture()).isEqualTo(future);
            Assertions.assertThat(((TestRegistrationSuccess) ((RetryingRegistration.RetryingRegistrationResult) future.get(10L, TimeUnit.SECONDS)).getSuccess()).getCorrelationId()).isEqualTo("laissez les bon temps roulez");
            Assertions.assertThat(manualResponseTestRegistrationGateway.getInvocations().take().leaderId()).isEqualTo(randomUUID);
            manualResponseTestRegistrationGateway.stop();
        } catch (Throwable th) {
            manualResponseTestRegistrationGateway.stop();
            throw th;
        }
    }

    @Test
    void testPropagateFailures() throws Exception {
        RpcService rpcService = (RpcService) Mockito.mock(RpcService.class);
        Mockito.when(rpcService.connect(Mockito.anyString(), (Class) Mockito.any(Class.class))).thenThrow(new Throwable[]{new RuntimeException("testExceptionMessage")});
        TestRetryingRegistration testRetryingRegistration = new TestRetryingRegistration(rpcService, "testaddress", UUID.randomUUID());
        testRetryingRegistration.startRegistration();
        CompletableFuture future = testRetryingRegistration.getFuture();
        Assertions.assertThat(future).isDone();
        future.getClass();
        Assertions.assertThatThrownBy(future::get).withFailMessage("We expected an ExecutionException.", new Object[0]).isInstanceOf(ExecutionException.class).cause().hasMessage("testExceptionMessage");
    }

    @Test
    void testRetryConnectOnFailure() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        ScheduledExecutorServiceAdapter scheduledExecutorServiceAdapter = new ScheduledExecutorServiceAdapter((ScheduledExecutorService) EXECUTOR_EXTENSION.getExecutor());
        ManualResponseTestRegistrationGateway manualResponseTestRegistrationGateway = new ManualResponseTestRegistrationGateway(new TestRegistrationSuccess("laissez les bon temps roulez"));
        try {
            RpcService rpcService = (RpcService) Mockito.mock(RpcService.class);
            Mockito.when(rpcService.connect(Mockito.anyString(), (Class) Mockito.any(Class.class))).thenReturn(FutureUtils.completedExceptionally(new Exception("test connect failure")), new Object[]{CompletableFuture.completedFuture(manualResponseTestRegistrationGateway)});
            Mockito.when(rpcService.getScheduledExecutor()).thenReturn(scheduledExecutorServiceAdapter);
            TestRetryingRegistration testRetryingRegistration = new TestRetryingRegistration(rpcService, "foobar address", randomUUID);
            long currentTimeMillis = System.currentTimeMillis();
            testRetryingRegistration.startRegistration();
            RetryingRegistration.RetryingRegistrationResult retryingRegistrationResult = (RetryingRegistration.RetryingRegistrationResult) testRetryingRegistration.getFuture().get(10L, TimeUnit.SECONDS);
            Assertions.assertThat(System.currentTimeMillis() - currentTimeMillis).withFailMessage("The registration should have failed the first time. Thus the duration should be longer than at least a single error delay.", new Object[0]).isGreaterThan(200L);
            Assertions.assertThat(((TestRegistrationSuccess) retryingRegistrationResult.getSuccess()).getCorrelationId()).isEqualTo("laissez les bon temps roulez");
            Assertions.assertThat(manualResponseTestRegistrationGateway.getInvocations().take().leaderId()).isEqualTo(randomUUID);
            manualResponseTestRegistrationGateway.stop();
        } catch (Throwable th) {
            manualResponseTestRegistrationGateway.stop();
            throw th;
        }
    }

    @Timeout(10000)
    @Test
    void testRetriesOnTimeouts() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        ManualResponseTestRegistrationGateway manualResponseTestRegistrationGateway = new ManualResponseTestRegistrationGateway(null, null, new TestRegistrationSuccess("rien ne va plus"));
        try {
            this.rpcService.registerGateway("<test-address>", manualResponseTestRegistrationGateway);
            TestRetryingRegistration testRetryingRegistration = new TestRetryingRegistration(this.rpcService, "<test-address>", randomUUID, new RetryingRegistrationConfiguration(20L, 1000L, 15000L, 15000L));
            long nanoTime = System.nanoTime();
            testRetryingRegistration.startRegistration();
            RetryingRegistration.RetryingRegistrationResult retryingRegistrationResult = (RetryingRegistration.RetryingRegistrationResult) testRetryingRegistration.getFuture().get(10L, TimeUnit.SECONDS);
            long nanoTime2 = (System.nanoTime() - nanoTime) / 1000000;
            Assertions.assertThat(((TestRegistrationSuccess) retryingRegistrationResult.getSuccess()).getCorrelationId()).isEqualTo("rien ne va plus");
            Assertions.assertThat(manualResponseTestRegistrationGateway.getInvocations().take().leaderId()).isEqualTo(randomUUID);
            Assertions.assertThat(nanoTime2).withFailMessage("retries did not properly back off", new Object[0]).isGreaterThanOrEqualTo(60L);
            manualResponseTestRegistrationGateway.stop();
        } catch (Throwable th) {
            manualResponseTestRegistrationGateway.stop();
            throw th;
        }
    }

    @Test
    void testFailure() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        ManualResponseTestRegistrationGateway manualResponseTestRegistrationGateway = new ManualResponseTestRegistrationGateway(null, new RegistrationResponse.Failure(new FlinkException("no reason")), null, new TestRegistrationSuccess("qui a coupe le fromage"));
        try {
            this.rpcService.registerGateway("<test-address>", manualResponseTestRegistrationGateway);
            TestRetryingRegistration testRetryingRegistration = new TestRetryingRegistration(this.rpcService, "<test-address>", randomUUID);
            long nanoTime = System.nanoTime();
            testRetryingRegistration.startRegistration();
            RetryingRegistration.RetryingRegistrationResult retryingRegistrationResult = (RetryingRegistration.RetryingRegistrationResult) testRetryingRegistration.getFuture().get(10L, TimeUnit.SECONDS);
            long nanoTime2 = (System.nanoTime() - nanoTime) / 1000000;
            Assertions.assertThat(((TestRegistrationSuccess) retryingRegistrationResult.getSuccess()).getCorrelationId()).isEqualTo("qui a coupe le fromage");
            Assertions.assertThat(manualResponseTestRegistrationGateway.getInvocations().take().leaderId()).isEqualTo(randomUUID);
            Assertions.assertThat(nanoTime2).withFailMessage("retries did not properly back off", new Object[0]).isGreaterThanOrEqualTo(240L);
            manualResponseTestRegistrationGateway.stop();
        } catch (Throwable th) {
            manualResponseTestRegistrationGateway.stop();
            throw th;
        }
    }

    @Test
    void testRegistrationRejection() {
        ManualResponseTestRegistrationGateway manualResponseTestRegistrationGateway = new ManualResponseTestRegistrationGateway(new TestRegistrationRejection(TestRegistrationRejection.RejectionReason.REJECTED));
        this.rpcService.registerGateway(manualResponseTestRegistrationGateway.getAddress(), manualResponseTestRegistrationGateway);
        TestRetryingRegistration testRetryingRegistration = new TestRetryingRegistration(this.rpcService, manualResponseTestRegistrationGateway.getAddress(), UUID.randomUUID());
        testRetryingRegistration.startRegistration();
        RetryingRegistration.RetryingRegistrationResult retryingRegistrationResult = (RetryingRegistration.RetryingRegistrationResult) testRetryingRegistration.getFuture().join();
        Assertions.assertThat(retryingRegistrationResult.isRejection()).isTrue();
        Assertions.assertThat(((TestRegistrationRejection) retryingRegistrationResult.getRejection()).getRejectionReason()).isEqualTo(TestRegistrationRejection.RejectionReason.REJECTED);
    }

    @Test
    void testRetryOnError() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        ArrayDeque arrayDeque = new ArrayDeque(2);
        arrayDeque.add(FutureUtils.completedExceptionally(new Exception("test exception")));
        arrayDeque.add(CompletableFuture.completedFuture(new TestRegistrationSuccess("Petit a petit, l'oiseau fait son nid")));
        this.rpcService.registerGateway("<test-address>", DefaultTestRegistrationGateway.newBuilder().setRegistrationFunction((uuid, l) -> {
            return (CompletableFuture) arrayDeque.poll();
        }).build());
        TestRetryingRegistration testRetryingRegistration = new TestRetryingRegistration(this.rpcService, "<test-address>", randomUUID);
        long nanoTime = System.nanoTime();
        testRetryingRegistration.startRegistration();
        RetryingRegistration.RetryingRegistrationResult retryingRegistrationResult = (RetryingRegistration.RetryingRegistrationResult) testRetryingRegistration.getFuture().get(10L, TimeUnit.SECONDS);
        long nanoTime2 = (System.nanoTime() - nanoTime) / 1000000;
        Assertions.assertThat(((TestRegistrationSuccess) retryingRegistrationResult.getSuccess()).getCorrelationId()).isEqualTo("Petit a petit, l'oiseau fait son nid");
        Assertions.assertThat(nanoTime2).withFailMessage("retries did not properly back off", new Object[0]).isGreaterThanOrEqualTo(200L);
    }

    @Test
    void testCancellation() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        CompletableFuture completableFuture = new CompletableFuture();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        this.rpcService.registerGateway("my-test-address", DefaultTestRegistrationGateway.newBuilder().setRegistrationFunction((uuid, l) -> {
            atomicInteger.incrementAndGet();
            return completableFuture;
        }).build());
        TestRetryingRegistration testRetryingRegistration = new TestRetryingRegistration(this.rpcService, "my-test-address", randomUUID);
        testRetryingRegistration.startRegistration();
        testRetryingRegistration.cancel();
        completableFuture.completeExceptionally(new TimeoutException());
        Assertions.assertThat(atomicInteger).hasValueLessThanOrEqualTo(1);
    }
}
