package org.apache.flink.runtime.jobmaster.slotpool;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest.class */
class DeclarativeSlotPoolBridgeResourceDeclarationTest {
    private static final JobMasterId jobMasterId = JobMasterId.generate();
    private final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();

    @Parameter
    private RequestSlotMatchingStrategy requestSlotMatchingStrategy;
    private RequirementListener requirementListener;
    private DeclarativeSlotPoolBridge declarativeSlotPoolBridge;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeResourceDeclarationTest$RequirementListener.class */
    public static final class RequirementListener {
        private ResourceCounter requirements;

        private RequirementListener() {
            this.requirements = ResourceCounter.empty();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void increaseRequirements(ResourceCounter resourceCounter) {
            this.requirements = this.requirements.add(resourceCounter);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void decreaseRequirements(ResourceCounter resourceCounter) {
            this.requirements = this.requirements.subtract(resourceCounter);
        }

        public ResourceCounter getRequirements() {
            return this.requirements;
        }
    }

    DeclarativeSlotPoolBridgeResourceDeclarationTest() {
    }

    @Parameters(name = "RequestSlotMatchingStrategy: {0}")
    public static Collection<RequestSlotMatchingStrategy> data() throws IOException {
        return Arrays.asList(SimpleRequestSlotMatchingStrategy.INSTANCE, PreferredAllocationRequestSlotMatchingStrategy.INSTANCE);
    }

    @BeforeEach
    void setup() {
        this.requirementListener = new RequirementListener();
        TestingDeclarativeSlotPoolBuilder builder = TestingDeclarativeSlotPool.builder();
        RequirementListener requirementListener = this.requirementListener;
        requirementListener.getClass();
        TestingDeclarativeSlotPoolBuilder increaseResourceRequirementsByConsumer = builder.setIncreaseResourceRequirementsByConsumer(resourceCounter -> {
            requirementListener.increaseRequirements(resourceCounter);
        });
        RequirementListener requirementListener2 = this.requirementListener;
        requirementListener2.getClass();
        this.declarativeSlotPoolBridge = DeclarativeSlotPoolBridgeTest.createDeclarativeSlotPoolBridge(new TestingDeclarativeSlotPoolFactory(increaseResourceRequirementsByConsumer.setDecreaseResourceRequirementsByConsumer(resourceCounter2 -> {
            requirementListener2.decreaseRequirements(resourceCounter2);
        }).setReserveFreeSlotFunction((allocationID, resourceProfile) -> {
            return DeclarativeSlotPoolBridgeTest.createAllocatedSlot(allocationID);
        }).setFreeReservedSlotFunction((allocationID2, th, l) -> {
            return ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
        }).setReleaseSlotFunction((allocationID3, exc) -> {
            return ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
        })), this.requestSlotMatchingStrategy);
    }

    @AfterEach
    void teardown() {
        if (this.declarativeSlotPoolBridge != null) {
            this.declarativeSlotPoolBridge.close();
        }
    }

    @TestTemplate
    void testRequirementsIncreasedOnNewAllocation() throws Exception {
        this.declarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
        this.declarativeSlotPoolBridge.requestNewAllocatedSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, Time.minutes(5L));
        Assertions.assertThat(this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isOne();
    }

    @TestTemplate
    void testRequirementsDecreasedOnAllocationTimeout() throws Exception {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            ComponentMainThreadExecutor forSingleThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(newSingleThreadScheduledExecutor);
            this.declarativeSlotPoolBridge.start(jobMasterId, "localhost", forSingleThreadExecutor);
            FlinkAssertions.assertThatFuture((CompletableFuture) CompletableFuture.supplyAsync(() -> {
                return this.declarativeSlotPoolBridge.requestNewAllocatedSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, Time.milliseconds(5L));
            }, forSingleThreadExecutor).get()).failsWithin(Duration.ofMinutes(1L));
            CompletableFuture.runAsync(() -> {
                Assertions.assertThat(this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isZero();
            }, forSingleThreadExecutor).join();
            newSingleThreadScheduledExecutor.shutdown();
        } catch (Throwable th) {
            newSingleThreadScheduledExecutor.shutdown();
            throw th;
        }
    }

    @TestTemplate
    void testRequirementsUnchangedOnNewSlotsNotification() throws Exception {
        this.declarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
        this.declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(DeclarativeSlotPoolBridgeTest.createAllocatedSlot(new AllocationID())));
        Assertions.assertThat(this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isZero();
    }

    @TestTemplate
    void testRequirementsIncreasedOnSlotReservation() throws Exception {
        this.declarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
        PhysicalSlot createAllocatedSlot = DeclarativeSlotPoolBridgeTest.createAllocatedSlot(new AllocationID());
        this.declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(createAllocatedSlot));
        this.declarativeSlotPoolBridge.allocateAvailableSlot(new SlotRequestId(), createAllocatedSlot.getAllocationId(), ResourceProfile.UNKNOWN);
        Assertions.assertThat(this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isOne();
    }

    @TestTemplate
    void testRequirementsDecreasedOnSlotFreeing() throws Exception {
        this.declarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
        PhysicalSlot createAllocatedSlot = DeclarativeSlotPoolBridgeTest.createAllocatedSlot(new AllocationID());
        this.declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(createAllocatedSlot));
        SlotRequestId slotRequestId = new SlotRequestId();
        this.declarativeSlotPoolBridge.allocateAvailableSlot(slotRequestId, createAllocatedSlot.getAllocationId(), ResourceProfile.UNKNOWN);
        this.declarativeSlotPoolBridge.releaseSlot(slotRequestId, new RuntimeException("Test exception"));
        Assertions.assertThat(this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isZero();
    }

    @TestTemplate
    void testRequirementsDecreasedOnSlotAllocationFailure() throws Exception {
        this.declarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
        PhysicalSlot createAllocatedSlot = DeclarativeSlotPoolBridgeTest.createAllocatedSlot(new AllocationID());
        this.declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(createAllocatedSlot));
        this.declarativeSlotPoolBridge.allocateAvailableSlot(new SlotRequestId(), createAllocatedSlot.getAllocationId(), ResourceProfile.UNKNOWN);
        this.declarativeSlotPoolBridge.failAllocation(createAllocatedSlot.getTaskManagerLocation().getResourceID(), createAllocatedSlot.getAllocationId(), new RuntimeException("Test exception"));
        Assertions.assertThat(this.requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN)).isZero();
    }
}
