/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.apache.flink.util.concurrent.FutureUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class SlotPoolBatchSlotRequestTest
extends TestLogger {
    private static final ResourceProfile resourceProfile = ResourceProfile.fromResources((double)1.0, (int)1024);
    public static final CompletableFuture[] COMPLETABLE_FUTURES_EMPTY_ARRAY = new CompletableFuture[0];
    private static ScheduledExecutorService singleThreadScheduledExecutorService;
    private static ComponentMainThreadExecutor mainThreadExecutor;

    @BeforeClass
    public static void setupClass() {
        singleThreadScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(singleThreadScheduledExecutorService);
    }

    @AfterClass
    public static void teardownClass() {
        if (singleThreadScheduledExecutorService != null) {
            singleThreadScheduledExecutorService.shutdownNow();
        }
    }

    @Test
    public void testPendingBatchSlotRequestTimeout() throws Exception {
        try (DeclarativeSlotPoolBridge slotPool = this.createAndSetUpSlotPool(mainThreadExecutor, null, Time.milliseconds((long)2L));){
            CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, mainThreadExecutor, ResourceProfile.UNKNOWN);
            try {
                slotFuture.get();
                Assert.fail((String)"Expected that slot future times out.");
            }
            catch (ExecutionException ee) {
                MatcherAssert.assertThat((Object)ee, (Matcher)FlinkMatchers.containsCause(TimeoutException.class));
            }
        }
    }

    @Test
    public void testPendingBatchSlotRequestDoesNotTimeoutIfFulfillingSlotExists() throws Exception {
        Time batchSlotTimeout = Time.milliseconds((long)2L);
        ManualClock clock = new ManualClock();
        try (DeclarativeSlotPoolBridge slotPool = this.createAndSetUpSlotPool(mainThreadExecutor, null, batchSlotTimeout, (Clock)clock);){
            SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, mainThreadExecutor, resourceProfile);
            SlotPoolUtils.offerSlots((SlotPool)slotPool, mainThreadExecutor, Arrays.asList(resourceProfile));
            CompletableFuture<PhysicalSlot> firstPendingSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, mainThreadExecutor, ResourceProfile.UNKNOWN);
            CompletableFuture<PhysicalSlot> secondPendingSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, mainThreadExecutor, resourceProfile);
            List<CompletableFuture> slotFutures = Arrays.asList(firstPendingSlotFuture, secondPendingSlotFuture);
            this.advanceTimeAndTriggerCheckBatchSlotTimeout(slotPool, mainThreadExecutor, clock, batchSlotTimeout);
            for (CompletableFuture slotFuture : slotFutures) {
                MatcherAssert.assertThat((Object)slotFuture.isDone(), (Matcher)Matchers.is((Object)false));
            }
        }
    }

    @Test
    public void testPendingBatchSlotRequestFailsIfAllocationFailsUnfulfillably() throws Exception {
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        try (DeclarativeSlotPoolBridge slotPool = new DeclarativeSlotPoolBridgeBuilder().setResourceManagerGateway(testingResourceManagerGateway).buildAndStart(mainThreadExecutor);){
            CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, mainThreadExecutor, resourceProfile);
            SlotPoolUtils.notifyNotEnoughResourcesAvailable((SlotPoolService)slotPool, mainThreadExecutor, Collections.emptyList());
            MatcherAssert.assertThat(slotFuture, (Matcher)FlinkMatchers.futureWillCompleteExceptionally((Duration)Duration.ofSeconds(10L)));
        }
    }

    @Test
    public void testPendingBatchSlotRequestDoesNotFailIfResourceDeclaringFails() throws Exception {
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setDeclareRequiredResourcesFunction((jobMasterId, resourceRequirements) -> FutureUtils.completedExceptionally((Throwable)new FlinkException("Failed request")));
        Time batchSlotTimeout = Time.milliseconds((long)1000L);
        try (DeclarativeSlotPoolBridge slotPool = this.createAndSetUpSlotPool(mainThreadExecutor, testingResourceManagerGateway, batchSlotTimeout);){
            CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, mainThreadExecutor, resourceProfile);
            MatcherAssert.assertThat(slotFuture, (Matcher)FlinkMatchers.willNotComplete((Duration)Duration.ofMillis(50L)));
        }
    }

    @Test
    public void testPendingBatchSlotRequestTimeoutAfterSlotRelease() throws Exception {
        ManualClock clock = new ManualClock();
        Time batchSlotTimeout = Time.milliseconds((long)10000L);
        try (DeclarativeSlotPoolBridge slotPool = this.createAndSetUpSlotPool(mainThreadExecutor, null, batchSlotTimeout, (Clock)clock);){
            SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, mainThreadExecutor, resourceProfile);
            ResourceID taskManagerResourceId = SlotPoolUtils.offerSlots((SlotPool)slotPool, mainThreadExecutor, Arrays.asList(resourceProfile));
            CompletableFuture<PhysicalSlot> firstPendingSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, mainThreadExecutor, ResourceProfile.UNKNOWN);
            CompletableFuture<PhysicalSlot> secondPendingSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, mainThreadExecutor, resourceProfile);
            List<CompletableFuture> slotFutures = Arrays.asList(firstPendingSlotFuture, secondPendingSlotFuture);
            this.advanceTimeAndTriggerCheckBatchSlotTimeout(slotPool, mainThreadExecutor, clock, batchSlotTimeout);
            MatcherAssert.assertThat((Object)CompletableFuture.anyOf(slotFutures.toArray(COMPLETABLE_FUTURES_EMPTY_ARRAY)).isDone(), (Matcher)Matchers.is((Object)false));
            SlotPoolUtils.releaseTaskManager((SlotPool)slotPool, mainThreadExecutor, taskManagerResourceId);
            this.advanceTimeAndTriggerCheckBatchSlotTimeout(slotPool, mainThreadExecutor, clock, batchSlotTimeout);
            for (CompletableFuture slotFuture : slotFutures) {
                MatcherAssert.assertThat((Object)slotFuture.isCompletedExceptionally(), (Matcher)Matchers.is((Object)true));
                try {
                    slotFuture.get();
                    Assert.fail((String)"Expected that the slot future times out.");
                }
                catch (ExecutionException ee) {
                    MatcherAssert.assertThat((Object)ee, (Matcher)FlinkMatchers.containsCause(TimeoutException.class));
                }
            }
        }
    }

    private void advanceTimeAndTriggerCheckBatchSlotTimeout(DeclarativeSlotPoolBridge slotPool, ComponentMainThreadExecutor componentMainThreadExecutor, ManualClock clock, Time batchSlotTimeout) {
        this.runBatchSlotTimeoutCheck(slotPool, componentMainThreadExecutor);
        clock.advanceTime(batchSlotTimeout.toMilliseconds() + 1L, TimeUnit.MILLISECONDS);
        this.runBatchSlotTimeoutCheck(slotPool, componentMainThreadExecutor);
    }

    private void runBatchSlotTimeoutCheck(DeclarativeSlotPoolBridge slotPool, ComponentMainThreadExecutor componentMainThreadExecutor) {
        CompletableFuture.runAsync(() -> ((DeclarativeSlotPoolBridge)slotPool).checkBatchSlotTimeout(), (Executor)componentMainThreadExecutor).join();
    }

    private DeclarativeSlotPoolBridge createAndSetUpSlotPool(ComponentMainThreadExecutor componentMainThreadExecutor, @Nullable ResourceManagerGateway resourceManagerGateway, Time batchSlotTimeout) throws Exception {
        return new DeclarativeSlotPoolBridgeBuilder().setResourceManagerGateway(resourceManagerGateway).setBatchSlotTimeout(batchSlotTimeout).buildAndStart(componentMainThreadExecutor);
    }

    private DeclarativeSlotPoolBridge createAndSetUpSlotPool(ComponentMainThreadExecutor componentMainThreadExecutor, @Nullable ResourceManagerGateway resourceManagerGateway, Time batchSlotTimeout, Clock clock) throws Exception {
        return new DeclarativeSlotPoolBridgeBuilder().setResourceManagerGateway(resourceManagerGateway).setBatchSlotTimeout(batchSlotTimeout).setClock(clock).buildAndStart(componentMainThreadExecutor);
    }
}

