package org.apache.flink.runtime.resourcemanager;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blocklist.BlockedNode;
import org.apache.flink.runtime.blocklist.BlocklistHandler;
import org.apache.flink.runtime.blocklist.DefaultBlocklistHandler;
import org.apache.flink.runtime.blocklist.NoOpBlocklistHandler;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatServicesImpl;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulatorTest;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderelection.StandaloneLeaderElection;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerBuilder;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingSlotManagerBuilder;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
import org.apache.flink.runtime.security.token.NoOpDelegationTokenManager;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/ResourceManagerTest.class */
public class ResourceManagerTest {
    private static final Time TIMEOUT = Time.minutes(2);
    private static final HeartbeatServices heartbeatServices = new HeartbeatServicesImpl(1000, 10000);
    private static final HeartbeatServices fastHeartbeatServices = new HeartbeatServicesImpl(1, 1);
    private static final HeartbeatServices failedRpcEnabledHeartbeatServices = new HeartbeatServicesImpl(1, 10000000, 1);
    private static final HardwareDescription hardwareDescription = new HardwareDescription(42, 1337, 1337, 0);
    private static final int dataPort = 1234;
    private static final int jmxPort = 23456;
    private static TestingRpcService rpcService;
    private TestingHighAvailabilityServices highAvailabilityServices;
    private TestingFatalErrorHandler testingFatalErrorHandler;
    private ResourceID resourceManagerResourceId;
    private TestingResourceManager resourceManager;
    private ResourceManagerId resourceManagerId;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/resourcemanager/ResourceManagerTest$ResourceManagerBuilder.class */
    public class ResourceManagerBuilder {
        private HeartbeatServices heartbeatServices;
        private JobLeaderIdService jobLeaderIdService;
        private SlotManager slotManager;
        private BlocklistHandler.Factory blocklistHandlerFactory;
        private Consumer<ResourceID> stopWorkerConsumer;
        private CompletableFuture<Void> readyToServeFuture;

        private ResourceManagerBuilder() {
            this.heartbeatServices = null;
            this.jobLeaderIdService = null;
            this.slotManager = null;
            this.blocklistHandlerFactory = new NoOpBlocklistHandler.Factory();
            this.stopWorkerConsumer = null;
            this.readyToServeFuture = CompletableFuture.completedFuture(null);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ResourceManagerBuilder withHeartbeatServices(HeartbeatServices heartbeatServices) {
            this.heartbeatServices = heartbeatServices;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ResourceManagerBuilder withJobLeaderIdService(JobLeaderIdService jobLeaderIdService) {
            this.jobLeaderIdService = jobLeaderIdService;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ResourceManagerBuilder withSlotManager(SlotManager slotManager) {
            this.slotManager = slotManager;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ResourceManagerBuilder withBlocklistHandlerFactory(BlocklistHandler.Factory factory) {
            this.blocklistHandlerFactory = factory;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ResourceManagerBuilder withStopWorkerConsumer(Consumer<ResourceID> consumer) {
            this.stopWorkerConsumer = consumer;
            return this;
        }

        public ResourceManagerBuilder withReadyToServeFuture(CompletableFuture<Void> completableFuture) {
            this.readyToServeFuture = completableFuture;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public TestingResourceManager buildAndStart() throws Exception {
            if (this.heartbeatServices == null) {
                this.heartbeatServices = ResourceManagerTest.heartbeatServices;
            }
            if (this.jobLeaderIdService == null) {
                this.jobLeaderIdService = new DefaultJobLeaderIdService(ResourceManagerTest.this.highAvailabilityServices, ResourceManagerTest.rpcService.getScheduledExecutor(), TestingUtils.infiniteTime());
            }
            if (this.slotManager == null) {
                this.slotManager = FineGrainedSlotManagerBuilder.newBuilder(ResourceManagerTest.rpcService.getScheduledExecutor()).build();
            }
            if (this.stopWorkerConsumer == null) {
                this.stopWorkerConsumer = resourceID -> {
                };
            }
            ResourceManagerTest.this.resourceManagerId = ResourceManagerId.generate();
            TestingResourceManager testingResourceManager = new TestingResourceManager(ResourceManagerTest.rpcService, ResourceManagerTest.this.resourceManagerId.toUUID(), ResourceManagerTest.this.resourceManagerResourceId, this.heartbeatServices, new NoOpDelegationTokenManager(), this.slotManager, NoOpResourceManagerPartitionTracker::get, this.blocklistHandlerFactory, this.jobLeaderIdService, ResourceManagerTest.this.testingFatalErrorHandler, UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(), this.stopWorkerConsumer, this.readyToServeFuture);
            testingResourceManager.start();
            testingResourceManager.getStartedFuture().get(ResourceManagerTest.TIMEOUT.getSize(), ResourceManagerTest.TIMEOUT.getUnit());
            return testingResourceManager;
        }
    }

    ResourceManagerTest() {
    }

    @BeforeAll
    static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @BeforeEach
    void setup() {
        this.highAvailabilityServices = new TestingHighAvailabilityServices();
        this.highAvailabilityServices.setResourceManagerLeaderElection(new StandaloneLeaderElection(UUID.randomUUID()));
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
        this.resourceManagerResourceId = ResourceID.generate();
    }

    @AfterEach
    void after() throws Exception {
        if (this.resourceManager != null) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{this.resourceManager});
        }
        if (this.highAvailabilityServices != null) {
            this.highAvailabilityServices.closeWithOptionalClean(true);
        }
        if (this.testingFatalErrorHandler.hasExceptionOccurred()) {
            this.testingFatalErrorHandler.rethrowError();
        }
        if (rpcService != null) {
            rpcService.clearGateways();
        }
    }

    @AfterAll
    static void tearDownClass() throws Exception {
        if (rpcService != null) {
            RpcUtils.terminateRpcService(new RpcService[]{rpcService});
        }
    }

    private static SlotManager createSlotManager() {
        return createSlotManager(rpcService.getScheduledExecutor());
    }

    private static SlotManager createSlotManager(ScheduledExecutor scheduledExecutor) {
        return FineGrainedSlotManagerBuilder.newBuilder(scheduledExecutor).build();
    }

    @Test
    void testRequestTaskManagerInfo() throws Exception {
        ResourceID generate = ResourceID.generate();
        RpcGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setAddress(UUID.randomUUID().toString()).createTestingTaskExecutorGateway();
        rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        this.resourceManager = new ResourceManagerBuilder().withSlotManager(createSlotManager()).buildAndStart();
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway) this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        registerTaskExecutor(resourceManagerGateway, generate, createTestingTaskExecutorGateway.getAddress());
        TaskManagerInfoWithSlots taskManagerInfoWithSlots = (TaskManagerInfoWithSlots) resourceManagerGateway.requestTaskManagerDetailsInfo(generate, TestingUtils.TIMEOUT).get();
        TaskManagerInfo taskManagerInfo = taskManagerInfoWithSlots.getTaskManagerInfo();
        Assertions.assertThat(taskManagerInfo.getResourceId()).isEqualTo(generate);
        Assertions.assertThat(taskManagerInfo.getHardwareDescription()).isEqualTo(hardwareDescription);
        Assertions.assertThat(taskManagerInfo.getAddress()).isEqualTo(createTestingTaskExecutorGateway.getAddress());
        Assertions.assertThat(taskManagerInfo.getDataPort()).isEqualTo(dataPort);
        Assertions.assertThat(taskManagerInfo.getJmxPort()).isEqualTo(jmxPort);
        Assertions.assertThat(taskManagerInfo.getNumberSlots()).isEqualTo(0);
        Assertions.assertThat(taskManagerInfo.getNumberAvailableSlots()).isEqualTo(0);
        Assertions.assertThat(taskManagerInfoWithSlots.getAllocatedSlots()).isEmpty();
    }

    @Test
    void testRequestTaskExecutorGateway() throws Exception {
        ResourceID generate = ResourceID.generate();
        RpcGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setAddress(UUID.randomUUID().toString()).createTestingTaskExecutorGateway();
        rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        this.resourceManager = new ResourceManagerBuilder().withSlotManager(createSlotManager()).buildAndStart();
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway) this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        registerTaskExecutor(resourceManagerGateway, generate, createTestingTaskExecutorGateway.getAddress());
        FlinkAssertions.assertThatFuture(resourceManagerGateway.requestTaskExecutorThreadInfoGateway(generate, TestingUtils.TIMEOUT)).eventuallySucceeds().isEqualTo(createTestingTaskExecutorGateway);
    }

    private void registerTaskExecutor(ResourceManagerGateway resourceManagerGateway, ResourceID resourceID, String str) {
        FlinkAssertions.assertThatFuture(resourceManagerGateway.registerTaskExecutor(new TaskExecutorRegistration(str, resourceID, dataPort, jmxPort, hardwareDescription, new TaskExecutorMemoryConfiguration(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), ResourceProfile.ZERO, ResourceProfile.ZERO, str), TestingUtils.TIMEOUT)).eventuallySucceeds().isInstanceOf(RegistrationResponse.Success.class);
    }

    @Test
    void testDisconnectJobManagerClearsRequirements() throws Exception {
        RpcGateway build = new TestingJobMasterGatewayBuilder().setAddress(UUID.randomUUID().toString()).build();
        rpcService.registerGateway(build.getAddress(), build);
        TestingJobLeaderIdService build2 = TestingJobLeaderIdService.newBuilder().setGetLeaderIdFunction(jobID -> {
            return CompletableFuture.completedFuture(build.m248getFencingToken());
        }).build();
        CompletableFuture completableFuture = new CompletableFuture();
        TestingSlotManagerBuilder testingSlotManagerBuilder = new TestingSlotManagerBuilder();
        completableFuture.getClass();
        this.resourceManager = new ResourceManagerBuilder().withJobLeaderIdService(build2).withSlotManager(testingSlotManagerBuilder.setClearRequirementsConsumer((v1) -> {
            r1.complete(v1);
        }).createSlotManager()).buildAndStart();
        JobID generate = JobID.generate();
        ResourceManagerGateway selfGateway = this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        selfGateway.registerJobMaster(build.m248getFencingToken(), ResourceID.generate(), build.getAddress(), generate, TIMEOUT).get();
        selfGateway.declareRequiredResources(build.m248getFencingToken(), ResourceRequirements.create(generate, build.getAddress(), Collections.singleton(ResourceRequirement.create(ResourceProfile.UNKNOWN, 1))), TIMEOUT).get();
        selfGateway.disconnectJobManager(generate, JobStatus.FINISHED, new FlinkException("Test exception"));
        Assertions.assertThat((Comparable) completableFuture.get(5L, TimeUnit.SECONDS)).isEqualTo(generate);
    }

    @Test
    void testProcessResourceRequirementsWhenRecoveryFinished() throws Exception {
        RpcGateway build = new TestingJobMasterGatewayBuilder().setAddress(UUID.randomUUID().toString()).build();
        rpcService.registerGateway(build.getAddress(), build);
        TestingJobLeaderIdService build2 = TestingJobLeaderIdService.newBuilder().setGetLeaderIdFunction(jobID -> {
            return CompletableFuture.completedFuture(build.m248getFencingToken());
        }).build();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
        this.resourceManager = new ResourceManagerBuilder().withJobLeaderIdService(build2).withSlotManager(new TestingSlotManagerBuilder().setProcessRequirementsConsumer(resourceRequirements -> {
            completableFuture.complete(null);
        }).createSlotManager()).withReadyToServeFuture(completableFuture2).buildAndStart();
        JobID generate = JobID.generate();
        ResourceManagerGateway selfGateway = this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        selfGateway.registerJobMaster(build.m248getFencingToken(), ResourceID.generate(), build.getAddress(), generate, TIMEOUT).get();
        selfGateway.declareRequiredResources(build.m248getFencingToken(), ResourceRequirements.create(generate, build.getAddress(), Collections.singleton(ResourceRequirement.create(ResourceProfile.UNKNOWN, 1))), TIMEOUT);
        this.resourceManager.runInMainThread(() -> {
            Assertions.assertThat(completableFuture.isDone()).isFalse();
            completableFuture2.complete(null);
            return null;
        }, TIMEOUT).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
        completableFuture.get();
    }

    @Test
    void testHeartbeatTimeoutWithJobMaster() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        TestingJobMasterGatewayBuilder resourceManagerHeartbeatFunction = new TestingJobMasterGatewayBuilder().setResourceManagerHeartbeatFunction(resourceID -> {
            completableFuture.complete(resourceID);
            return FutureUtils.completedVoidFuture();
        });
        completableFuture2.getClass();
        RpcGateway build = resourceManagerHeartbeatFunction.setDisconnectResourceManagerConsumer((v1) -> {
            r1.complete(v1);
        }).build();
        rpcService.registerGateway(build.getAddress(), build);
        JobID jobID = new JobID();
        ResourceID generate = ResourceID.generate();
        SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService(build.getAddress(), build.m248getFencingToken().toUUID());
        this.highAvailabilityServices.setJobMasterLeaderRetrieverFunction(jobID2 -> {
            Assertions.assertThat(jobID2).isEqualTo(jobID);
            return settableLeaderRetrievalService;
        });
        runHeartbeatTimeoutTest(resourceManagerBuilder -> {
        }, resourceManagerGateway -> {
            FlinkAssertions.assertThatFuture(resourceManagerGateway.registerJobMaster(build.m248getFencingToken(), generate, build.getAddress(), jobID, TIMEOUT)).eventuallySucceeds().isInstanceOf(RegistrationResponse.Success.class);
        }, resourceID2 -> {
            Assertions.assertThat((ResourceID) completableFuture.getNow(null)).satisfiesAnyOf(new ThrowingConsumer[]{resourceID2 -> {
                Assertions.assertThat(resourceID2).isEqualTo(resourceID2);
            }, resourceID3 -> {
                Assertions.assertThat(resourceID3).isNull();
            }});
            FlinkAssertions.assertThatFuture(completableFuture2).eventuallySucceeds().isEqualTo(this.resourceManagerId);
        });
    }

    @Test
    void testJobMasterBecomesUnreachableTriggersDisconnect() throws Exception {
        JobID jobID = new JobID();
        ResourceID generate = ResourceID.generate();
        CompletableFuture completableFuture = new CompletableFuture();
        TestingJobMasterGatewayBuilder resourceManagerHeartbeatFunction = new TestingJobMasterGatewayBuilder().setAddress(UUID.randomUUID().toString()).setResourceManagerHeartbeatFunction(resourceID -> {
            return FutureUtils.completedExceptionally(new RecipientUnreachableException("sender", "recipient", "task executor is unreachable"));
        });
        completableFuture.getClass();
        RpcGateway build = resourceManagerHeartbeatFunction.setDisconnectResourceManagerConsumer((v1) -> {
            r1.complete(v1);
        }).build();
        rpcService.registerGateway(build.getAddress(), build);
        SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService(build.getAddress(), build.m248getFencingToken().toUUID());
        this.highAvailabilityServices.setJobMasterLeaderRetrieverFunction(jobID2 -> {
            Assertions.assertThat(jobID2).isEqualTo(jobID);
            return settableLeaderRetrievalService;
        });
        runHeartbeatTargetBecomesUnreachableTest(resourceManagerBuilder -> {
        }, resourceManagerGateway -> {
            FlinkAssertions.assertThatFuture(resourceManagerGateway.registerJobMaster(build.m248getFencingToken(), generate, build.getAddress(), jobID, TIMEOUT)).eventuallySucceeds().isInstanceOf(RegistrationResponse.Success.class);
        }, resourceID2 -> {
            FlinkAssertions.assertThatFuture(completableFuture).eventuallySucceeds().isEqualTo(this.resourceManagerId);
        });
    }

    @Test
    void testHeartbeatTimeoutWithTaskExecutor() throws Exception {
        ResourceID generate = ResourceID.generate();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        CompletableFuture completableFuture3 = new CompletableFuture();
        TestingTaskExecutorGatewayBuilder testingTaskExecutorGatewayBuilder = new TestingTaskExecutorGatewayBuilder();
        completableFuture2.getClass();
        RpcGateway createTestingTaskExecutorGateway = testingTaskExecutorGatewayBuilder.setDisconnectResourceManagerConsumer((v1) -> {
            r1.complete(v1);
        }).setHeartbeatResourceManagerFunction(resourceID -> {
            completableFuture.complete(resourceID);
            return FutureUtils.completedVoidFuture();
        }).createTestingTaskExecutorGateway();
        rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        runHeartbeatTimeoutTest(resourceManagerBuilder -> {
            completableFuture3.getClass();
            resourceManagerBuilder.withStopWorkerConsumer((v1) -> {
                r1.complete(v1);
            });
        }, resourceManagerGateway -> {
            registerTaskExecutor(resourceManagerGateway, generate, createTestingTaskExecutorGateway.getAddress());
        }, resourceID2 -> {
            Assertions.assertThat((ResourceID) completableFuture.getNow(null)).satisfiesAnyOf(new ThrowingConsumer[]{resourceID2 -> {
                Assertions.assertThat(resourceID2).isEqualTo(resourceID2);
            }, resourceID3 -> {
                Assertions.assertThat(resourceID3).isNull();
            }});
            FlinkAssertions.assertThatFuture(completableFuture2).eventuallySucceeds().isInstanceOf(TimeoutException.class);
            FlinkAssertions.assertThatFuture(completableFuture3).eventuallySucceeds().isEqualTo(generate);
        });
    }

    @Test
    void testTaskExecutorBecomesUnreachableTriggersDisconnect() throws Exception {
        ResourceID generate = ResourceID.generate();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        TestingTaskExecutorGatewayBuilder address = new TestingTaskExecutorGatewayBuilder().setAddress(UUID.randomUUID().toString());
        completableFuture.getClass();
        RpcGateway createTestingTaskExecutorGateway = address.setDisconnectResourceManagerConsumer((v1) -> {
            r1.complete(v1);
        }).setHeartbeatResourceManagerFunction(resourceID -> {
            return FutureUtils.completedExceptionally(new RecipientUnreachableException("sender", "recipient", "task executor is unreachable"));
        }).createTestingTaskExecutorGateway();
        rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        runHeartbeatTargetBecomesUnreachableTest(resourceManagerBuilder -> {
            completableFuture2.getClass();
            resourceManagerBuilder.withStopWorkerConsumer((v1) -> {
                r1.complete(v1);
            });
        }, resourceManagerGateway -> {
            registerTaskExecutor(resourceManagerGateway, generate, createTestingTaskExecutorGateway.getAddress());
        }, resourceID2 -> {
            FlinkAssertions.assertThatFuture(completableFuture).eventuallySucceeds().isInstanceOf(ResourceManagerException.class);
            FlinkAssertions.assertThatFuture(completableFuture2).eventuallySucceeds().isEqualTo(generate);
        });
    }

    @Test
    void testDisconnectJobManagerWithTerminalStatusShouldRemoveJob() throws Exception {
        testDisconnectJobManager(JobStatus.CANCELED);
    }

    @Test
    void testDisconnectJobManagerWithNonTerminalStatusShouldNotRemoveJob() throws Exception {
        testDisconnectJobManager(JobStatus.FAILING);
    }

    @Test
    void testDisconnectTaskManager() throws Exception {
        ResourceID generate = ResourceID.generate();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        TestingTaskExecutorGatewayBuilder testingTaskExecutorGatewayBuilder = new TestingTaskExecutorGatewayBuilder();
        completableFuture.getClass();
        RpcGateway createTestingTaskExecutorGateway = testingTaskExecutorGatewayBuilder.setDisconnectResourceManagerConsumer((v1) -> {
            r1.complete(v1);
        }).createTestingTaskExecutorGateway();
        rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        ResourceManagerBuilder resourceManagerBuilder = new ResourceManagerBuilder();
        completableFuture2.getClass();
        this.resourceManager = resourceManagerBuilder.withStopWorkerConsumer((v1) -> {
            r2.complete(v1);
        }).withSlotManager(createSlotManager()).buildAndStart();
        registerTaskExecutor(this.resourceManager, generate, createTestingTaskExecutorGateway.getAddress());
        this.resourceManager.disconnectTaskManager(generate, new FlinkException("Test exception"));
        FlinkAssertions.assertThatFuture(completableFuture).eventuallySucceeds().isInstanceOf(FlinkException.class);
        FlinkAssertions.assertThatFuture(completableFuture2).eventuallySucceeds().isEqualTo(generate);
    }

    @Test
    void testUnblockResourcesWillTriggerResourceRequirementsCheck() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingSlotManagerBuilder testingSlotManagerBuilder = new TestingSlotManagerBuilder();
        completableFuture.getClass();
        this.resourceManager = new ResourceManagerBuilder().withSlotManager(testingSlotManagerBuilder.setTriggerRequirementsCheckConsumer((v1) -> {
            r1.complete(v1);
        }).createSlotManager()).withBlocklistHandlerFactory(new DefaultBlocklistHandler.Factory(Duration.ofMillis(100L))).buildAndStart();
        this.resourceManager.getSelfGateway(ResourceManagerGateway.class).notifyNewBlockedNodes(Collections.singleton(new BlockedNode("node", "Test cause", System.currentTimeMillis())));
        completableFuture.get();
    }

    @Test
    void testNewlyAddedBlockedNodesWillBeSynchronizedToAllRegisteredJobMasters() throws Exception {
        JobID generate = JobID.generate();
        JobID generate2 = JobID.generate();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        JobMasterGateway createJobMasterGateway = createJobMasterGateway(arrayList);
        JobMasterGateway createJobMasterGateway2 = createJobMasterGateway(arrayList2);
        this.resourceManager = new ResourceManagerBuilder().withJobLeaderIdService(TestingJobLeaderIdService.newBuilder().setGetLeaderIdFunction(jobID -> {
            JobMasterGateway jobMasterGateway;
            if (jobID.equals(generate)) {
                jobMasterGateway = createJobMasterGateway;
            } else {
                if (!jobID.equals(generate2)) {
                    throw new IllegalArgumentException("Unknown job");
                }
                jobMasterGateway = createJobMasterGateway2;
            }
            return CompletableFuture.completedFuture(jobMasterGateway.getFencingToken());
        }).build()).withBlocklistHandlerFactory(new DefaultBlocklistHandler.Factory(Duration.ofMillis(100L))).withSlotManager(createSlotManager()).buildAndStart();
        ResourceManagerGateway selfGateway = this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        registerJobMasterToResourceManager(selfGateway, createJobMasterGateway, generate);
        registerJobMasterToResourceManager(selfGateway, createJobMasterGateway2, generate2);
        BlockedNode blockedNode = new BlockedNode("node1", "Test exception", Long.MAX_VALUE);
        selfGateway.notifyNewBlockedNodes(Collections.singleton(blockedNode)).get();
        Assertions.assertThat(arrayList).containsExactly(new BlockedNode[]{blockedNode});
        Assertions.assertThat(arrayList2).containsExactly(new BlockedNode[]{blockedNode});
        selfGateway.disconnectJobManager(generate, JobStatus.FINISHED, new FlinkException("Test exception"));
        BlockedNode blockedNode2 = new BlockedNode("node2", "Test exception", Long.MAX_VALUE);
        selfGateway.notifyNewBlockedNodes(Collections.singleton(blockedNode2)).get();
        Assertions.assertThat(arrayList).containsExactly(new BlockedNode[]{blockedNode});
        Assertions.assertThat(arrayList2).containsExactlyInAnyOrder(new BlockedNode[]{blockedNode, blockedNode2});
    }

    @Test
    void testResourceOverviewWithBlockedSlots() throws Exception {
        ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        this.resourceManager = new ResourceManagerBuilder().withSlotManager(createSlotManager(manuallyTriggeredScheduledExecutor)).withBlocklistHandlerFactory(new DefaultBlocklistHandler.Factory(Duration.ofMillis(100L))).buildAndStart();
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway) this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        ResourceID generate = ResourceID.generate();
        ResourceID generate2 = ResourceID.generate();
        registerTaskExecutorAndSlot(resourceManagerGateway, generate, 3);
        registerTaskExecutorAndSlot(resourceManagerGateway, generate2, 5);
        manuallyTriggeredScheduledExecutor.triggerAll();
        ResourceOverview resourceOverview = (ResourceOverview) resourceManagerGateway.requestResourceOverview(Time.seconds(5L)).get();
        Assertions.assertThat(resourceOverview.getNumberTaskManagers()).isEqualTo(2);
        Assertions.assertThat(resourceOverview.getNumberRegisteredSlots()).isEqualTo(8);
        Assertions.assertThat(resourceOverview.getNumberFreeSlots()).isEqualTo(8);
        Assertions.assertThat(resourceOverview.getNumberBlockedTaskManagers()).isEqualTo(0);
        Assertions.assertThat(resourceOverview.getNumberBlockedFreeSlots()).isEqualTo(0);
        Assertions.assertThat(resourceOverview.getTotalResource()).isEqualTo(ResourceProfile.fromResources(1.0d, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE).multiply(8));
        Assertions.assertThat(resourceOverview.getFreeResource()).isEqualTo(ResourceProfile.fromResources(1.0d, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE).multiply(8));
        resourceManagerGateway.notifyNewBlockedNodes(Collections.singleton(new BlockedNode(this.resourceManager.getNodeIdOfTaskManager(generate2), "Test cause", Long.MAX_VALUE)));
        ResourceOverview resourceOverview2 = (ResourceOverview) resourceManagerGateway.requestResourceOverview(Time.seconds(5L)).get();
        Assertions.assertThat(resourceOverview2.getNumberTaskManagers()).isEqualTo(2);
        Assertions.assertThat(resourceOverview2.getNumberRegisteredSlots()).isEqualTo(8);
        Assertions.assertThat(resourceOverview2.getNumberFreeSlots()).isEqualTo(3);
        Assertions.assertThat(resourceOverview2.getNumberBlockedTaskManagers()).isEqualTo(1);
        Assertions.assertThat(resourceOverview2.getNumberBlockedFreeSlots()).isEqualTo(5);
        Assertions.assertThat(resourceOverview2.getTotalResource()).isEqualTo(ResourceProfile.fromResources(1.0d, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE).multiply(8));
        Assertions.assertThat(resourceOverview2.getFreeResource()).isEqualTo(ResourceProfile.fromResources(1.0d, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE).multiply(3));
    }

    private void registerTaskExecutorAndSlot(ResourceManagerGateway resourceManagerGateway, ResourceID resourceID, int i) throws Exception {
        RpcGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setAddress(UUID.randomUUID().toString()).createTestingTaskExecutorGateway();
        rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        TaskExecutorRegistrationSuccess taskExecutorRegistrationSuccess = (RegistrationResponse) resourceManagerGateway.registerTaskExecutor(new TaskExecutorRegistration(createTestingTaskExecutorGateway.getAddress(), resourceID, dataPort, jmxPort, hardwareDescription, new TaskExecutorMemoryConfiguration(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), ResourceProfile.fromResources(1.0d, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE), ResourceProfile.fromResources(1.0d, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE).multiply(i), createTestingTaskExecutorGateway.getAddress()), TestingUtils.TIMEOUT).get();
        Assertions.assertThat(taskExecutorRegistrationSuccess).isInstanceOf(TaskExecutorRegistrationSuccess.class);
        InstanceID registrationId = taskExecutorRegistrationSuccess.getRegistrationId();
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(new SlotStatus(new SlotID(resourceID, i2), ResourceProfile.fromResources(1.0d, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE)));
        }
        resourceManagerGateway.sendSlotReport(resourceID, registrationId, new SlotReport(arrayList), Time.seconds(5L));
    }

    private JobMasterGateway createJobMasterGateway(Collection<BlockedNode> collection) {
        RpcGateway build = new TestingJobMasterGatewayBuilder().setNotifyNewBlockedNodesFunction(collection2 -> {
            collection.addAll(collection2);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setAddress(UUID.randomUUID().toString()).build();
        rpcService.registerGateway(build.getAddress(), build);
        return build;
    }

    private static void registerJobMasterToResourceManager(ResourceManagerGateway resourceManagerGateway, JobMasterGateway jobMasterGateway, JobID jobID) throws Exception {
        resourceManagerGateway.registerJobMaster(jobMasterGateway.getFencingToken(), ResourceID.generate(), jobMasterGateway.getAddress(), jobID, TIMEOUT).get();
    }

    private void testDisconnectJobManager(JobStatus jobStatus) throws Exception {
        RpcGateway build = new TestingJobMasterGatewayBuilder().setAddress(UUID.randomUUID().toString()).build();
        rpcService.registerGateway(build.getAddress(), build);
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        this.resourceManager = new ResourceManagerBuilder().withJobLeaderIdService(TestingJobLeaderIdService.newBuilder().setAddJobConsumer(jobID -> {
            oneShotLatch.trigger();
        }).setRemoveJobConsumer(jobID2 -> {
            oneShotLatch2.trigger();
        }).build()).withSlotManager(createSlotManager()).buildAndStart();
        this.highAvailabilityServices.setJobMasterLeaderRetrieverFunction(jobID3 -> {
            return new SettableLeaderRetrievalService(build.getAddress(), build.m248getFencingToken().toUUID());
        });
        JobID generate = JobID.generate();
        ResourceManagerGateway selfGateway = this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        selfGateway.registerJobMaster(build.m248getFencingToken(), ResourceID.generate(), build.getAddress(), generate, TIMEOUT);
        oneShotLatch.await();
        selfGateway.disconnectJobManager(generate, jobStatus, new FlinkException("Test exception"));
        if (jobStatus.isGloballyTerminalState()) {
            oneShotLatch2.await();
        } else {
            Assertions.assertThatThrownBy(() -> {
                oneShotLatch2.await(10L, TimeUnit.MILLISECONDS);
            }, "We should not have removed the job.", new Object[0]).isInstanceOf(TimeoutException.class);
        }
    }

    private void runHeartbeatTimeoutTest(Consumer<ResourceManagerBuilder> consumer, org.apache.flink.util.function.ThrowingConsumer<ResourceManagerGateway, Exception> throwingConsumer, org.apache.flink.util.function.ThrowingConsumer<ResourceID, Exception> throwingConsumer2) throws Exception {
        ResourceManagerBuilder resourceManagerBuilder = new ResourceManagerBuilder();
        consumer.accept(resourceManagerBuilder);
        this.resourceManager = resourceManagerBuilder.withHeartbeatServices(fastHeartbeatServices).withSlotManager(createSlotManager()).buildAndStart();
        throwingConsumer.accept(this.resourceManager.getSelfGateway(ResourceManagerGateway.class));
        throwingConsumer2.accept(this.resourceManagerResourceId);
    }

    private void runHeartbeatTargetBecomesUnreachableTest(Consumer<ResourceManagerBuilder> consumer, org.apache.flink.util.function.ThrowingConsumer<ResourceManagerGateway, Exception> throwingConsumer, org.apache.flink.util.function.ThrowingConsumer<ResourceID, Exception> throwingConsumer2) throws Exception {
        ResourceManagerBuilder resourceManagerBuilder = new ResourceManagerBuilder();
        consumer.accept(resourceManagerBuilder);
        this.resourceManager = resourceManagerBuilder.withHeartbeatServices(failedRpcEnabledHeartbeatServices).withSlotManager(createSlotManager()).buildAndStart();
        throwingConsumer.accept(this.resourceManager.getSelfGateway(ResourceManagerGateway.class));
        throwingConsumer2.accept(this.resourceManagerResourceId);
    }
}
