/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.leaderretrieval;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.testutils.CustomExtension;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperLeaderElectionHaServices;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.leaderelection.LeaderElectionDriver;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.leaderelection.TestingContender;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionListener;
import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.RpcSystemUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ZooKeeperLeaderRetrievalTest {
    private static final RpcSystem RPC_SYSTEM = RpcSystem.load();
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private final ZooKeeperExtension zooKeeperExtension = new ZooKeeperExtension();
    @RegisterExtension
    private final EachCallbackWrapper<ZooKeeperExtension> eachWrapper = new EachCallbackWrapper((CustomExtension)this.zooKeeperExtension);
    @RegisterExtension
    private final TestingFatalErrorHandlerExtension testingFatalErrorHandlerResource = new TestingFatalErrorHandlerExtension();
    private Configuration config;
    private HighAvailabilityServices highAvailabilityServices;

    ZooKeeperLeaderRetrievalTest() {
    }

    @BeforeEach
    void before() throws Exception {
        this.config = new Configuration();
        this.config.set(HighAvailabilityOptions.HA_MODE, (Object)"zookeeper");
        this.config.set(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, (Object)this.zooKeeperExtension.getConnectString());
        this.highAvailabilityServices = new ZooKeeperLeaderElectionHaServices(ZooKeeperUtils.startCuratorFramework((Configuration)this.config, (FatalErrorHandler)this.testingFatalErrorHandlerResource.getTestingFatalErrorHandler()), this.config, (Executor)EXECUTOR_RESOURCE.getExecutor(), (BlobStoreService)new VoidBlobStore());
    }

    @AfterEach
    void after() throws Exception {
        if (this.highAvailabilityServices != null) {
            this.highAvailabilityServices.closeWithOptionalClean(true);
            this.highAvailabilityServices = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testConnectingAddressRetrievalWithDelayedLeaderElection() throws Exception {
        InetAddress localHost;
        Duration timeout = Duration.ofMinutes(1L);
        long sleepingTime = 1000L;
        LeaderElection leaderElection = null;
        try {
            localHost = InetAddress.getLocalHost();
        }
        catch (UnknownHostException e) {
            System.err.println("Skipping 'testNetworkInterfaceSelection' test.");
            return;
        }
        try (ServerSocket serverSocket = new ServerSocket(0, 50, localHost);){
            String wrongAddress = RPC_SYSTEM.getRpcUrl("1.1.1.1", 1234, "foobar", AddressResolution.NO_ADDRESS_RESOLUTION, this.config);
            TestingLeaderElectionListener listener = new TestingLeaderElectionListener();
            try {
                InetSocketAddress correctInetSocketAddress = new InetSocketAddress(localHost, serverSocket.getLocalPort());
                String correctAddress = RPC_SYSTEM.getRpcUrl(localHost.getHostName(), correctInetSocketAddress.getPort(), "jobmanager", AddressResolution.NO_ADDRESS_RESOLUTION, this.config);
                ZooKeeperLeaderElectionDriver externalProcessDriver = new ZooKeeperLeaderElectionDriver(ZooKeeperUtils.useNamespaceAndEnsurePath((CuratorFramework)this.zooKeeperExtension.getZooKeeperClient(this.testingFatalErrorHandlerResource.getTestingFatalErrorHandler()), (String)ZooKeeperUtils.generateLeaderLatchPath((String)"")), (LeaderElectionDriver.Listener)listener);
                externalProcessDriver.isLeader();
                externalProcessDriver.publishLeaderInformation(HighAvailabilityServices.DEFAULT_JOB_ID.toString(), LeaderInformation.known((UUID)UUID.randomUUID(), (String)wrongAddress));
                FindConnectingAddress findConnectingAddress = new FindConnectingAddress(timeout, this.highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, "unused-default-address"));
                Thread thread = new Thread(findConnectingAddress);
                thread.start();
                leaderElection = this.highAvailabilityServices.getJobManagerLeaderElection(HighAvailabilityServices.DEFAULT_JOB_ID);
                TestingContender correctLeaderAddressContender = new TestingContender(correctAddress, leaderElection);
                Thread.sleep(sleepingTime);
                externalProcessDriver.notLeader();
                externalProcessDriver.close();
                correctLeaderAddressContender.startLeaderElection();
                thread.join();
                InetAddress result = findConnectingAddress.getInetAddress();
                try (Socket socket = new Socket();){
                    InetSocketAddress bindP = new InetSocketAddress(result, 0);
                    socket.bind(bindP);
                    socket.connect(correctInetSocketAddress, 1000);
                }
            }
            finally {
                if (leaderElection != null) {
                    leaderElection.close();
                }
                listener.failIfErrorEventHappened();
            }
        }
        catch (IOException e) {
            System.err.println("Skipping 'testNetworkInterfaceSelection' test.");
        }
    }

    @Test
    void testTimeoutOfFindConnectingAddress() throws Exception {
        Duration timeout = Duration.ofSeconds(1L);
        LeaderRetrievalService leaderRetrievalService = this.highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, "unused-default-address");
        InetAddress result = LeaderRetrievalUtils.findConnectingAddress((LeaderRetrievalService)leaderRetrievalService, (Duration)timeout, (RpcSystemUtils)RPC_SYSTEM);
        Assertions.assertThat((Object)InetAddress.getLocalHost()).isEqualTo((Object)result);
    }

    static class FindConnectingAddress
    implements Runnable {
        private final Duration timeout;
        private final LeaderRetrievalService leaderRetrievalService;
        private InetAddress result;
        private Exception exception;

        public FindConnectingAddress(Duration timeout, LeaderRetrievalService leaderRetrievalService) {
            this.timeout = timeout;
            this.leaderRetrievalService = leaderRetrievalService;
        }

        @Override
        public void run() {
            try {
                this.result = LeaderRetrievalUtils.findConnectingAddress((LeaderRetrievalService)this.leaderRetrievalService, (Duration)this.timeout, (RpcSystemUtils)RPC_SYSTEM);
            }
            catch (Exception e) {
                this.exception = e;
            }
        }

        public InetAddress getInetAddress() throws Exception {
            if (this.exception != null) {
                throw this.exception;
            }
            return this.result;
        }
    }
}

