package org.apache.flink.runtime.leaderelection;

import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
import org.apache.flink.runtime.leaderelection.LeaderElectionDriver;
import org.apache.flink.runtime.leaderelection.LeaderElectionEvent;
import org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriverFactory;
import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.RunnableWithException;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriverTest.class */
public class ZooKeeperLeaderElectionDriverTest {
    private final ZooKeeperExtension zooKeeperExtension = new ZooKeeperExtension();

    @RegisterExtension
    private final EachCallbackWrapper<ZooKeeperExtension> eachWrapper = new EachCallbackWrapper<>(this.zooKeeperExtension);

    @RegisterExtension
    final TestingFatalErrorHandlerExtension testingFatalErrorHandlerResource = new TestingFatalErrorHandlerExtension();

    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriverTest$Context.class */
    private class Context {
        protected final TestingLeaderElectionListener leaderElectionListener;
        protected final CuratorFrameworkWithUnhandledErrorListener curatorFramework;
        protected final ZooKeeperLeaderElectionDriver leaderElectionDriver;

        private Context() throws Exception {
            this.leaderElectionListener = new TestingLeaderElectionListener();
            this.curatorFramework = ZooKeeperLeaderElectionDriverTest.this.startCuratorFramework();
            this.leaderElectionDriver = new ZooKeeperLeaderElectionDriver(this.curatorFramework.asCuratorFramework(), this.leaderElectionListener);
        }

        protected final void runTest(RunnableWithException runnableWithException) throws Exception {
            try {
                runnableWithException.run();
            } finally {
                close();
                this.leaderElectionListener.failIfErrorEventHappened();
            }
        }

        private void close() throws Exception {
            this.leaderElectionDriver.close();
            this.curatorFramework.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriverTest$ElectionDriver.class */
    public static final class ElectionDriver {
        private final ZooKeeperLeaderElectionDriver leaderElectionDriver;
        private final SimpleLeaderElectionListener leaderElectionListener;

        private ElectionDriver(ZooKeeperLeaderElectionDriver zooKeeperLeaderElectionDriver, SimpleLeaderElectionListener simpleLeaderElectionListener) {
            this.leaderElectionDriver = zooKeeperLeaderElectionDriver;
            this.leaderElectionListener = simpleLeaderElectionListener;
        }

        void close() throws Exception {
            this.leaderElectionDriver.close();
        }

        boolean hasLeadership() {
            return this.leaderElectionDriver.hasLeadership();
        }

        CompletableFuture<Void> getLeadershipFuture() {
            return this.leaderElectionListener.getLeadershipFuture();
        }

        void publishLeaderInformation(String str, LeaderInformation leaderInformation) throws Exception {
            this.leaderElectionDriver.publishLeaderInformation(str, leaderInformation);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriverTest$SimpleLeaderElectionListener.class */
    public static final class SimpleLeaderElectionListener implements LeaderElectionDriver.Listener {
        private final CompletableFuture<Void> leadershipFuture = new CompletableFuture<>();
        private final FatalErrorHandler fatalErrorHandler;

        public SimpleLeaderElectionListener(FatalErrorHandler fatalErrorHandler) {
            this.fatalErrorHandler = fatalErrorHandler;
        }

        CompletableFuture<Void> getLeadershipFuture() {
            return this.leadershipFuture;
        }

        public void onGrantLeadership(UUID uuid) {
            this.leadershipFuture.complete(null);
        }

        public void onRevokeLeadership() {
        }

        public void onLeaderInformationChange(String str, LeaderInformation leaderInformation) {
        }

        public void onLeaderInformationChange(LeaderInformationRegister leaderInformationRegister) {
        }

        public void onError(Throwable th) {
            this.fatalErrorHandler.onFatalError(th);
        }
    }

    ZooKeeperLeaderElectionDriverTest() {
    }

    @Test
    void testElectionDriverGainsLeadershipAtStartup() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriverTest.1
            {
                runTest(() -> {
                });
            }
        };
    }

    @Test
    void testElectionDriverLosesLeadership() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriverTest.2
            {
                runTest(() -> {
                    this.leaderElectionListener.await(LeaderElectionEvent.IsLeaderEvent.class);
                    ZooKeeperLeaderElectionDriverTest.this.zooKeeperExtension.stop();
                    this.leaderElectionListener.await(LeaderElectionEvent.NotLeaderEvent.class);
                });
            }
        };
    }

    @Test
    void testPublishLeaderInformation() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriverTest.3
            {
                runTest(() -> {
                    this.leaderElectionListener.await(LeaderElectionEvent.IsLeaderEvent.class);
                    DefaultLeaderRetrievalService defaultLeaderRetrievalService = new DefaultLeaderRetrievalService(new ZooKeeperLeaderRetrievalDriverFactory(this.curatorFramework.asCuratorFramework(), "retrieved-component", ZooKeeperLeaderRetrievalDriver.LeaderInformationClearancePolicy.ON_LOST_CONNECTION));
                    TestingListener testingListener = new TestingListener();
                    defaultLeaderRetrievalService.start(testingListener);
                    LeaderInformation known = LeaderInformation.known(UUID.randomUUID(), "foobar");
                    this.leaderElectionDriver.publishLeaderInformation("retrieved-component", known);
                    testingListener.waitForNewLeader();
                    Assertions.assertThat(testingListener.getLeader()).isEqualTo(known);
                });
            }
        };
    }

    @Test
    void testPublishEmptyLeaderInformation() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriverTest.4
            {
                runTest(() -> {
                    this.leaderElectionListener.await(LeaderElectionEvent.IsLeaderEvent.class);
                    DefaultLeaderRetrievalService defaultLeaderRetrievalService = new DefaultLeaderRetrievalService(new ZooKeeperLeaderRetrievalDriverFactory(this.curatorFramework.asCuratorFramework(), "retrieved-component", ZooKeeperLeaderRetrievalDriver.LeaderInformationClearancePolicy.ON_LOST_CONNECTION));
                    TestingListener testingListener = new TestingListener();
                    defaultLeaderRetrievalService.start(testingListener);
                    this.leaderElectionDriver.publishLeaderInformation("retrieved-component", LeaderInformation.known(UUID.randomUUID(), "foobar"));
                    testingListener.waitForNewLeader();
                    this.leaderElectionDriver.publishLeaderInformation("retrieved-component", LeaderInformation.empty());
                    testingListener.waitForEmptyLeaderInformation();
                    Assertions.assertThat(testingListener.getLeader()).isEqualTo(LeaderInformation.empty());
                });
            }
        };
    }

    @Test
    void testToStringContainingLeaderLatchPath() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriverTest.5
            {
                runTest(() -> {
                    ((AbstractStringAssert) Assertions.assertThat(this.leaderElectionDriver.toString()).as("toString() should contain the leader latch path for human-readable representation of the driver instance.", new Object[0])).contains(new CharSequence[]{ZooKeeperUtils.generateLeaderLatchPath(this.curatorFramework.asCuratorFramework().getNamespace())});
                });
            }
        };
    }

    @Test
    void testNonLeaderCannotPublishLeaderInformation() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriverTest.6
            {
                runTest(() -> {
                    ElectionDriver electionDriver = null;
                    try {
                        this.leaderElectionListener.await(LeaderElectionEvent.IsLeaderEvent.class);
                        electionDriver = ZooKeeperLeaderElectionDriverTest.createLeaderElectionDriver(this.curatorFramework.asCuratorFramework(), ZooKeeperLeaderElectionDriverTest.this.testingFatalErrorHandlerResource.getTestingFatalErrorHandler());
                        Assertions.assertThat(electionDriver.hasLeadership()).isFalse();
                        electionDriver.publishLeaderInformation("component-id", LeaderInformation.known(UUID.randomUUID(), "localhost"));
                        Assertions.assertThat(this.leaderElectionListener.await(LeaderElectionEvent.LeaderInformationChangeEvent.class, Duration.ofMillis(50L))).isEmpty();
                        if (electionDriver != null) {
                            electionDriver.close();
                        }
                    } catch (Throwable th) {
                        if (electionDriver != null) {
                            electionDriver.close();
                        }
                        throw th;
                    }
                });
            }
        };
    }

    @Test
    void testLeaderInformationChange() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriverTest.7
            {
                runTest(() -> {
                    this.leaderElectionListener.await(LeaderElectionEvent.IsLeaderEvent.class);
                    LeaderInformation known = LeaderInformation.known(UUID.randomUUID(), "foobar");
                    ZooKeeperUtils.writeLeaderInformationToZooKeeper(known, this.curatorFramework.asCuratorFramework(), () -> {
                        return true;
                    }, ZooKeeperUtils.generateConnectionInformationPath("componentId"));
                    LeaderElectionEvent.LeaderInformationChangeEvent leaderInformationChangeEvent = (LeaderElectionEvent.LeaderInformationChangeEvent) this.leaderElectionListener.await(LeaderElectionEvent.LeaderInformationChangeEvent.class);
                    Assertions.assertThat(leaderInformationChangeEvent.getComponentId()).isEqualTo("componentId");
                    Assertions.assertThat(leaderInformationChangeEvent.getLeaderInformation()).isEqualTo(known);
                });
            }
        };
    }

    @Test
    void testLeaderElectionWithMultipleDrivers() throws Exception {
        CuratorFrameworkWithUnhandledErrorListener startCuratorFramework = startCuratorFramework();
        try {
            Set set = (Set) Stream.generate(() -> {
                return createLeaderElectionDriver(startCuratorFramework.asCuratorFramework(), this.testingFatalErrorHandlerResource.getTestingFatalErrorHandler());
            }).limit(3L).collect(Collectors.toSet());
            while (!set.isEmpty()) {
                CompletableFuture.anyOf((CompletableFuture[]) set.stream().map((v0) -> {
                    return v0.getLeadershipFuture();
                }).toArray(i -> {
                    return new CompletableFuture[i];
                })).join();
                Map map = (Map) set.stream().collect(Collectors.partitioningBy((v0) -> {
                    return v0.hasLeadership();
                }, Collectors.toSet()));
                Assertions.assertThat((Collection) map.get(true)).hasSize(1);
                Thread.sleep(100L);
                ((ElectionDriver) Iterables.getOnlyElement((Iterable) map.get(true))).close();
                set = (Set) map.get(false);
            }
        } finally {
            startCuratorFramework.close();
        }
    }

    @Test
    void testLeaderInformationNodeRemovalLeadsToLeaderChangeWithEmptyLeaderInformation() throws Exception {
        new Context() { // from class: org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriverTest.8
            {
                runTest(() -> {
                    this.leaderElectionListener.await(LeaderElectionEvent.IsLeaderEvent.class);
                    LeaderInformation known = LeaderInformation.known(UUID.randomUUID(), "foobar");
                    String generateConnectionInformationPath = ZooKeeperUtils.generateConnectionInformationPath("componentId");
                    ZooKeeperUtils.writeLeaderInformationToZooKeeper(known, this.curatorFramework.asCuratorFramework(), () -> {
                        return true;
                    }, generateConnectionInformationPath);
                    this.leaderElectionListener.await(LeaderElectionEvent.LeaderInformationChangeEvent.class);
                    this.curatorFramework.asCuratorFramework().delete().forPath(generateConnectionInformationPath);
                    LeaderElectionEvent.LeaderInformationChangeEvent leaderInformationChangeEvent = (LeaderElectionEvent.LeaderInformationChangeEvent) this.leaderElectionListener.await(LeaderElectionEvent.LeaderInformationChangeEvent.class);
                    Assertions.assertThat(leaderInformationChangeEvent.getComponentId()).isEqualTo("componentId");
                    Assertions.assertThat(leaderInformationChangeEvent.getLeaderInformation()).isEqualTo(LeaderInformation.empty());
                });
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static ElectionDriver createLeaderElectionDriver(CuratorFramework curatorFramework, FatalErrorHandler fatalErrorHandler) {
        SimpleLeaderElectionListener simpleLeaderElectionListener = new SimpleLeaderElectionListener(fatalErrorHandler);
        try {
            return new ElectionDriver(new ZooKeeperLeaderElectionDriver(curatorFramework, simpleLeaderElectionListener), simpleLeaderElectionListener);
        } catch (Exception e) {
            ExceptionUtils.rethrow(e);
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public CuratorFrameworkWithUnhandledErrorListener startCuratorFramework() {
        Configuration configuration = new Configuration();
        configuration.set(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, this.zooKeeperExtension.getConnectString());
        return ZooKeeperUtils.startCuratorFramework(configuration, NoOpFatalErrorHandler.INSTANCE);
    }
}
