package org.apache.flink.runtime.leaderelection;

import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService;
import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/leaderelection/LeaderElectionTest.class */
public class LeaderElectionTest {

    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    @RegisterExtension
    private final TestingFatalErrorHandlerExtension testingFatalErrorHandlerResource = new TestingFatalErrorHandlerExtension();

    @Parameter
    public ServiceClass serviceClass;

    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/LeaderElectionTest$EmbeddedServiceClass.class */
    private static final class EmbeddedServiceClass implements ServiceClass {
        private EmbeddedLeaderService embeddedLeaderService;

        private EmbeddedServiceClass() {
        }

        @Override // org.apache.flink.runtime.leaderelection.LeaderElectionTest.ServiceClass
        public void setup(FatalErrorHandler fatalErrorHandler) {
            this.embeddedLeaderService = new EmbeddedLeaderService(LeaderElectionTest.EXECUTOR_RESOURCE.getExecutor());
        }

        @Override // org.apache.flink.runtime.leaderelection.LeaderElectionTest.ServiceClass
        public void teardown() {
            if (this.embeddedLeaderService != null) {
                this.embeddedLeaderService.shutdown();
                this.embeddedLeaderService = null;
            }
        }

        @Override // org.apache.flink.runtime.leaderelection.LeaderElectionTest.ServiceClass
        public LeaderElection createLeaderElection() {
            return this.embeddedLeaderService.createLeaderElectionService("embedded_leader_election");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/LeaderElectionTest$ManualLeaderContender.class */
    private static final class ManualLeaderContender implements LeaderContender {
        private static final UUID NULL_LEADER_SESSION_ID = new UUID(0, 0);
        private final ArrayBlockingQueue<UUID> leaderSessionIds;
        private volatile Exception exception;

        private ManualLeaderContender() {
            this.leaderSessionIds = new ArrayBlockingQueue<>(10);
        }

        public void grantLeadership(UUID uuid) {
            this.leaderSessionIds.offer(uuid);
        }

        public void revokeLeadership() {
            this.leaderSessionIds.offer(NULL_LEADER_SESSION_ID);
        }

        public void handleError(Exception exc) {
            this.exception = exc;
        }

        void rethrowError() throws Exception {
            if (this.exception != null) {
                throw this.exception;
            }
        }

        UUID waitForLeaderSessionId() throws InterruptedException {
            return this.leaderSessionIds.take();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/LeaderElectionTest$ServiceClass.class */
    private interface ServiceClass {
        void setup(FatalErrorHandler fatalErrorHandler) throws Exception;

        void teardown() throws Exception;

        LeaderElection createLeaderElection() throws Exception;
    }

    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/LeaderElectionTest$StandaloneServiceClass.class */
    private static final class StandaloneServiceClass implements ServiceClass {
        private StandaloneServiceClass() {
        }

        @Override // org.apache.flink.runtime.leaderelection.LeaderElectionTest.ServiceClass
        public void setup(FatalErrorHandler fatalErrorHandler) {
        }

        @Override // org.apache.flink.runtime.leaderelection.LeaderElectionTest.ServiceClass
        public void teardown() {
        }

        @Override // org.apache.flink.runtime.leaderelection.LeaderElectionTest.ServiceClass
        public LeaderElection createLeaderElection() {
            return new StandaloneLeaderElection(UUID.randomUUID());
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/LeaderElectionTest$ZooKeeperServiceClass.class */
    private static final class ZooKeeperServiceClass implements ServiceClass {
        private TestingServer testingServer;
        private CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper;
        private DefaultLeaderElectionService leaderElectionService;

        private ZooKeeperServiceClass() {
        }

        @Override // org.apache.flink.runtime.leaderelection.LeaderElectionTest.ServiceClass
        public void setup(FatalErrorHandler fatalErrorHandler) throws Exception {
            try {
                this.testingServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
                Configuration configuration = new Configuration();
                configuration.set(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, this.testingServer.getConnectString());
                configuration.set(HighAvailabilityOptions.HA_MODE, "zookeeper");
                this.curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework(configuration, fatalErrorHandler);
                this.leaderElectionService = new DefaultLeaderElectionService(new ZooKeeperLeaderElectionDriverFactory(this.curatorFrameworkWrapper.asCuratorFramework()));
            } catch (Exception e) {
                throw new RuntimeException("Could not start ZooKeeper testing cluster.", e);
            }
        }

        @Override // org.apache.flink.runtime.leaderelection.LeaderElectionTest.ServiceClass
        public void teardown() throws Exception {
            if (this.leaderElectionService != null) {
                this.leaderElectionService.close();
            }
            if (this.curatorFrameworkWrapper != null) {
                this.curatorFrameworkWrapper.close();
                this.curatorFrameworkWrapper = null;
            }
            if (this.testingServer != null) {
                this.testingServer.close();
                this.testingServer = null;
            }
        }

        @Override // org.apache.flink.runtime.leaderelection.LeaderElectionTest.ServiceClass
        public LeaderElection createLeaderElection() {
            return this.leaderElectionService.createLeaderElection("random-component-id");
        }
    }

    @Parameters(name = "Leader election: {0}")
    public static Collection<ServiceClass> parameters() {
        return Arrays.asList(new ZooKeeperServiceClass(), new EmbeddedServiceClass(), new StandaloneServiceClass());
    }

    @BeforeEach
    void setup() throws Exception {
        this.serviceClass.setup(this.testingFatalErrorHandlerResource.getTestingFatalErrorHandler());
    }

    @AfterEach
    void teardown() throws Exception {
        this.serviceClass.teardown();
    }

    @TestTemplate
    void testHasLeadership() throws Exception {
        ManualLeaderContender manualLeaderContender = new ManualLeaderContender();
        try {
            LeaderElection createLeaderElection = this.serviceClass.createLeaderElection();
            createLeaderElection.startLeaderElection(manualLeaderContender);
            UUID waitForLeaderSessionId = manualLeaderContender.waitForLeaderSessionId();
            Assertions.assertThat(createLeaderElection.hasLeadership(waitForLeaderSessionId)).isTrue();
            Assertions.assertThat(createLeaderElection.hasLeadership(UUID.randomUUID())).isFalse();
            createLeaderElection.confirmLeadership(waitForLeaderSessionId, "foobar");
            Assertions.assertThat(createLeaderElection.hasLeadership(waitForLeaderSessionId)).isTrue();
            createLeaderElection.close();
            Assertions.assertThat(createLeaderElection.hasLeadership(waitForLeaderSessionId)).isFalse();
            Assertions.assertThat(manualLeaderContender.waitForLeaderSessionId()).as("The leadership has been revoked from the contender.", new Object[0]).isEqualTo(ManualLeaderContender.NULL_LEADER_SESSION_ID);
            manualLeaderContender.rethrowError();
        } catch (Throwable th) {
            manualLeaderContender.rethrowError();
            throw th;
        }
    }
}
