/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor.slot;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotActiveException;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlot;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotPayload;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
import org.apache.flink.runtime.taskexecutor.slot.TestingSlotActions;
import org.apache.flink.runtime.taskexecutor.slot.TestingSlotActionsBuilder;
import org.apache.flink.runtime.taskexecutor.slot.TestingTaskSlotPayload;
import org.apache.flink.runtime.taskexecutor.slot.TestingTimerService;
import org.apache.flink.runtime.taskexecutor.slot.TestingTimerServiceBuilder;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
import org.apache.flink.shaded.guava33.com.google.common.collect.Sets;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.util.function.TriFunctionWithException;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.NotThrownAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class TaskSlotTableImplTest {
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> MAIN_THREAD_EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();
    private static final Duration SLOT_TIMEOUT = Duration.ofSeconds(100L);

    TaskSlotTableImplTest() {
    }

    @Test
    void testTryMarkSlotActive() throws Exception {
        TaskSlotTableImplTest.runInMainThread(3, (ThrowingConsumer<TaskSlotTableImpl<TaskSlotPayload>, ? extends Exception>)((ThrowingConsumer)taskSlotTable -> {
            JobID jobId1 = new JobID();
            AllocationID allocationId1 = new AllocationID();
            taskSlotTable.allocateSlot(0, jobId1, allocationId1, SLOT_TIMEOUT);
            AllocationID allocationId2 = new AllocationID();
            taskSlotTable.allocateSlot(1, jobId1, allocationId2, SLOT_TIMEOUT);
            AllocationID allocationId3 = new AllocationID();
            JobID jobId2 = new JobID();
            taskSlotTable.allocateSlot(2, jobId2, allocationId3, SLOT_TIMEOUT);
            taskSlotTable.markSlotActive(allocationId1);
            Assertions.assertThat((boolean)taskSlotTable.isAllocated(0, jobId1, allocationId1)).isTrue();
            Assertions.assertThat((boolean)taskSlotTable.isAllocated(1, jobId1, allocationId2)).isTrue();
            Assertions.assertThat((boolean)taskSlotTable.isAllocated(2, jobId2, allocationId3)).isTrue();
            Assertions.assertThat((Collection)taskSlotTable.getActiveTaskSlotAllocationIdsPerJob(jobId1)).isEqualTo((Object)Sets.newHashSet((Object[])new AllocationID[]{allocationId1}));
            Assertions.assertThat((boolean)taskSlotTable.tryMarkSlotActive(jobId1, allocationId1)).isTrue();
            Assertions.assertThat((boolean)taskSlotTable.tryMarkSlotActive(jobId1, allocationId2)).isTrue();
            Assertions.assertThat((boolean)taskSlotTable.tryMarkSlotActive(jobId1, allocationId3)).isFalse();
            Assertions.assertThat((Collection)taskSlotTable.getActiveTaskSlotAllocationIdsPerJob(jobId1)).isEqualTo(new HashSet<AllocationID>(Arrays.asList(allocationId2, allocationId1)));
        }));
    }

    @Test
    void testRetrievingAllActiveSlots() throws Exception {
        TaskSlotTableImplTest.runInMainThread(3, (ThrowingConsumer<TaskSlotTableImpl<TaskSlotPayload>, ? extends Exception>)((ThrowingConsumer)taskSlotTable -> {
            JobID jobId1 = new JobID();
            AllocationID allocationId1 = new AllocationID();
            taskSlotTable.allocateSlot(0, jobId1, allocationId1, SLOT_TIMEOUT);
            AllocationID allocationId2 = new AllocationID();
            taskSlotTable.allocateSlot(1, jobId1, allocationId2, SLOT_TIMEOUT);
            AllocationID allocationId3 = new AllocationID();
            JobID jobId2 = new JobID();
            taskSlotTable.allocateSlot(2, jobId2, allocationId3, SLOT_TIMEOUT);
            taskSlotTable.markSlotActive(allocationId1);
            taskSlotTable.markSlotActive(allocationId3);
            Assertions.assertThat((Collection)taskSlotTable.getActiveTaskSlotAllocationIds()).isEqualTo((Object)Sets.newHashSet((Object[])new AllocationID[]{allocationId1, allocationId3}));
        }));
    }

    @Test
    void testInconsistentStaticSlotAllocation() throws Exception {
        TaskSlotTableImplTest.runInMainThread(2, (ThrowingConsumer<TaskSlotTableImpl<TaskSlotPayload>, ? extends Exception>)((ThrowingConsumer)taskSlotTable -> {
            JobID jobId = new JobID();
            AllocationID allocationId1 = new AllocationID();
            AllocationID allocationId2 = new AllocationID();
            Assertions.assertThatNoException().isThrownBy(() -> taskSlotTable.allocateSlot(0, jobId, allocationId1, SLOT_TIMEOUT));
            Assertions.assertThatThrownBy(() -> taskSlotTable.allocateSlot(1, jobId, allocationId1, SLOT_TIMEOUT)).isInstanceOf(SlotAllocationException.class);
            Assertions.assertThatThrownBy(() -> taskSlotTable.allocateSlot(0, jobId, allocationId2, SLOT_TIMEOUT)).isInstanceOf(SlotAllocationException.class);
            Assertions.assertThat((boolean)taskSlotTable.isAllocated(0, jobId, allocationId1)).isTrue();
            Assertions.assertThat((boolean)taskSlotTable.isSlotFree(1)).isTrue();
            Iterator allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
            Assertions.assertThat((int)((TaskSlot)allocatedSlots.next()).getIndex()).isZero();
            Assertions.assertThat((boolean)allocatedSlots.hasNext()).isFalse();
        }));
    }

    @Test
    void testInconsistentDynamicSlotAllocation() throws Exception {
        TaskSlotTableImplTest.runInMainThread(1, (ThrowingConsumer<TaskSlotTableImpl<TaskSlotPayload>, ? extends Exception>)((ThrowingConsumer)taskSlotTable -> {
            JobID jobId1 = new JobID();
            JobID jobId2 = new JobID();
            AllocationID allocationId = new AllocationID();
            Assertions.assertThatNoException().isThrownBy(() -> taskSlotTable.allocateSlot(-1, jobId1, allocationId, SLOT_TIMEOUT));
            Assertions.assertThatThrownBy(() -> taskSlotTable.allocateSlot(-1, jobId2, allocationId, SLOT_TIMEOUT)).isInstanceOf(SlotAllocationException.class);
            Assertions.assertThat((boolean)taskSlotTable.isAllocated(1, jobId1, allocationId)).isTrue();
            Iterator allocatedSlots = taskSlotTable.getAllocatedSlots(jobId1);
            Assertions.assertThat((Comparable)((TaskSlot)allocatedSlots.next()).getAllocationId()).isEqualTo((Object)allocationId);
            Assertions.assertThat((boolean)allocatedSlots.hasNext()).isFalse();
        }));
    }

    @Test
    void testDuplicateStaticSlotAllocation() throws Exception {
        TaskSlotTableImplTest.runInMainThread(2, (ThrowingConsumer<TaskSlotTableImpl<TaskSlotPayload>, ? extends Exception>)((ThrowingConsumer)taskSlotTable -> {
            JobID jobId = new JobID();
            AllocationID allocationId = new AllocationID();
            Assertions.assertThatNoException().isThrownBy(() -> taskSlotTable.allocateSlot(0, jobId, allocationId, ResourceProfile.UNKNOWN, SLOT_TIMEOUT));
            Assertions.assertThatNoException().isThrownBy(() -> taskSlotTable.allocateSlot(0, jobId, allocationId, ResourceProfile.UNKNOWN, SLOT_TIMEOUT));
            Assertions.assertThat((boolean)taskSlotTable.isAllocated(0, jobId, allocationId)).isTrue();
            Assertions.assertThat((boolean)taskSlotTable.isSlotFree(1)).isTrue();
            Iterator allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
            Assertions.assertThat((int)((TaskSlot)allocatedSlots.next()).getIndex()).isZero();
            Assertions.assertThat((boolean)allocatedSlots.hasNext()).isFalse();
        }));
    }

    @Test
    void testDuplicateDynamicSlotAllocation() throws Exception {
        TaskSlotTableImplTest.runInMainThread(1, (ThrowingConsumer<TaskSlotTableImpl<TaskSlotPayload>, ? extends Exception>)((ThrowingConsumer)taskSlotTable -> {
            JobID jobId = new JobID();
            AllocationID allocationId = new AllocationID();
            Assertions.assertThatNoException().isThrownBy(() -> taskSlotTable.allocateSlot(-1, jobId, allocationId, SLOT_TIMEOUT));
            Iterator allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
            TaskSlot taskSlot1 = (TaskSlot)allocatedSlots.next();
            Assertions.assertThatNoException().isThrownBy(() -> taskSlotTable.allocateSlot(-1, jobId, allocationId, SLOT_TIMEOUT));
            allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
            TaskSlot taskSlot2 = (TaskSlot)allocatedSlots.next();
            Assertions.assertThat((boolean)taskSlotTable.isAllocated(1, jobId, allocationId)).isTrue();
            Assertions.assertThat((Object)taskSlot2).isEqualTo((Object)taskSlot1);
            Assertions.assertThat((boolean)allocatedSlots.hasNext()).isFalse();
        }));
    }

    @Test
    void testFreeSlot() throws Exception {
        TaskSlotTableImplTest.runInMainThread(2, (ThrowingConsumer<TaskSlotTableImpl<TaskSlotPayload>, ? extends Exception>)((ThrowingConsumer)taskSlotTable -> {
            JobID jobId = new JobID();
            AllocationID allocationId1 = new AllocationID();
            AllocationID allocationId2 = new AllocationID();
            Assertions.assertThatNoException().isThrownBy(() -> taskSlotTable.allocateSlot(0, jobId, allocationId1, SLOT_TIMEOUT));
            Assertions.assertThatNoException().isThrownBy(() -> taskSlotTable.allocateSlot(1, jobId, allocationId2, SLOT_TIMEOUT));
            Assertions.assertThat((int)taskSlotTable.freeSlot(allocationId2)).isOne();
            Iterator allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
            Assertions.assertThat((int)((TaskSlot)allocatedSlots.next()).getIndex()).isZero();
            Assertions.assertThat((boolean)allocatedSlots.hasNext()).isFalse();
            Assertions.assertThat((boolean)taskSlotTable.isAllocated(1, jobId, allocationId1)).isFalse();
            Assertions.assertThat((boolean)taskSlotTable.isAllocated(1, jobId, allocationId2)).isFalse();
            Assertions.assertThat((boolean)taskSlotTable.isSlotFree(1)).isTrue();
        }));
    }

    @Test
    void testSlotAllocationWithDynamicSlotId() throws Exception {
        TaskSlotTableImplTest.runInMainThread(2, (ThrowingConsumer<TaskSlotTableImpl<TaskSlotPayload>, ? extends Exception>)((ThrowingConsumer)taskSlotTable -> {
            JobID jobId = new JobID();
            AllocationID allocationId = new AllocationID();
            Assertions.assertThatNoException().isThrownBy(() -> taskSlotTable.allocateSlot(-1, jobId, allocationId, SLOT_TIMEOUT));
            Iterator allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
            Assertions.assertThat((int)((TaskSlot)allocatedSlots.next()).getIndex()).isEqualTo(2);
            Assertions.assertThat((boolean)allocatedSlots.hasNext()).isFalse();
            Assertions.assertThat((boolean)taskSlotTable.isAllocated(2, jobId, allocationId)).isTrue();
        }));
    }

    @Test
    void testSlotAllocationWithConcreteResourceProfile() throws Exception {
        TaskSlotTableImplTest.runInMainThread(2, (ThrowingConsumer<TaskSlotTableImpl<TaskSlotPayload>, ? extends Exception>)((ThrowingConsumer)taskSlotTable -> {
            JobID jobId = new JobID();
            AllocationID allocationId = new AllocationID();
            ResourceProfile resourceProfile = TaskSlotUtils.DEFAULT_RESOURCE_PROFILE.merge(ResourceProfile.newBuilder().setCpuCores(0.1).build());
            Assertions.assertThatNoException().isThrownBy(() -> taskSlotTable.allocateSlot(-1, jobId, allocationId, resourceProfile, SLOT_TIMEOUT));
            Iterator allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
            TaskSlot allocatedSlot = (TaskSlot)allocatedSlots.next();
            Assertions.assertThat((int)allocatedSlot.getIndex()).isEqualTo(2);
            Assertions.assertThat((Object)allocatedSlot.getResourceProfile()).isEqualTo((Object)resourceProfile);
            Assertions.assertThat((boolean)allocatedSlots.hasNext()).isFalse();
        }));
    }

    @Test
    void testSlotAllocationWithUnknownResourceProfile() throws Exception {
        TaskSlotTableImplTest.runInMainThread(2, (ThrowingConsumer<TaskSlotTableImpl<TaskSlotPayload>, ? extends Exception>)((ThrowingConsumer)taskSlotTable -> {
            JobID jobId = new JobID();
            AllocationID allocationId = new AllocationID();
            Assertions.assertThatNoException().isThrownBy(() -> taskSlotTable.allocateSlot(-1, jobId, allocationId, ResourceProfile.UNKNOWN, SLOT_TIMEOUT));
            Iterator allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
            TaskSlot allocatedSlot = (TaskSlot)allocatedSlots.next();
            Assertions.assertThat((int)allocatedSlot.getIndex()).isEqualTo(2);
            Assertions.assertThat((Object)allocatedSlot.getResourceProfile()).isEqualTo((Object)TaskSlotUtils.DEFAULT_RESOURCE_PROFILE);
            Assertions.assertThat((boolean)allocatedSlots.hasNext()).isFalse();
        }));
    }

    @Test
    void testSlotAllocationWithResourceProfileFailure() throws Exception {
        TaskSlotTableImplTest.runInMainThread(2, (ThrowingConsumer<TaskSlotTableImpl<TaskSlotPayload>, ? extends Exception>)((ThrowingConsumer)taskSlotTable -> {
            JobID jobId = new JobID();
            AllocationID allocationId = new AllocationID();
            ResourceProfile resourceProfile = TaskSlotUtils.DEFAULT_RESOURCE_PROFILE;
            ResourceProfile mergedResourceProfile = resourceProfile = resourceProfile.merge(resourceProfile).merge(resourceProfile);
            Assertions.assertThatThrownBy(() -> taskSlotTable.allocateSlot(-1, jobId, allocationId, mergedResourceProfile, SLOT_TIMEOUT)).isInstanceOf(SlotAllocationException.class);
            Iterator allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
            Assertions.assertThat((boolean)allocatedSlots.hasNext()).isFalse();
        }));
    }

    @Test
    void testGenerateSlotReport() throws Exception {
        TaskSlotTableImplTest.runInMainThread(3, (ThrowingConsumer<TaskSlotTableImpl<TaskSlotPayload>, ? extends Exception>)((ThrowingConsumer)taskSlotTable -> {
            JobID jobId = new JobID();
            AllocationID allocationId1 = new AllocationID();
            AllocationID allocationId2 = new AllocationID();
            AllocationID allocationId3 = new AllocationID();
            ((NotThrownAssert)Assertions.assertThatNoException().as("Slot with allocation ID %s should have been allocated successfully.", new Object[]{allocationId1})).isThrownBy(() -> taskSlotTable.allocateSlot(0, jobId, allocationId1, SLOT_TIMEOUT));
            ((NotThrownAssert)Assertions.assertThatNoException().as("Slot with allocation ID %s should have been allocated successfully.", new Object[]{allocationId2})).isThrownBy(() -> taskSlotTable.allocateSlot(-1, jobId, allocationId2, SLOT_TIMEOUT));
            ((NotThrownAssert)Assertions.assertThatNoException().as("Slot with allocation ID %s should have been allocated successfully.", new Object[]{allocationId3})).isThrownBy(() -> taskSlotTable.allocateSlot(-1, jobId, allocationId3, SLOT_TIMEOUT));
            Assertions.assertThat((int)taskSlotTable.freeSlot(allocationId2)).isEqualTo(3);
            ResourceID resourceId = ResourceID.generate();
            SlotReport slotReport = taskSlotTable.createSlotReport(resourceId);
            ArrayList slotStatuses = new ArrayList();
            slotReport.iterator().forEachRemaining(slotStatuses::add);
            Assertions.assertThat(slotStatuses).hasSize(4);
            Assertions.assertThat(slotStatuses).containsExactlyInAnyOrder((Object[])new SlotStatus[]{new SlotStatus(new SlotID(resourceId, 0), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE, jobId, allocationId1), new SlotStatus(new SlotID(resourceId, 1), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE, null, null), new SlotStatus(new SlotID(resourceId, 2), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE, null, null), new SlotStatus(new SlotID(resourceId, 4), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE, jobId, allocationId3)});
        }));
    }

    @Test
    void testAllocateSlot() throws Exception {
        JobID jobId = new JobID();
        AllocationID allocationId = new AllocationID();
        TaskSlotTableImplTest.runInMainThread((SupplierWithException<TaskSlotTableImpl<TaskSlotPayload>, Exception>)((SupplierWithException)() -> TaskSlotTableImplTest.createTaskSlotTableWithAllocatedSlot(jobId, allocationId, new TestingSlotActionsBuilder().build())), (ThrowingConsumer<TaskSlotTableImpl<TaskSlotPayload>, ? extends Exception>)((ThrowingConsumer)taskSlotTable -> {
            Iterator allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
            TaskSlot nextSlot = (TaskSlot)allocatedSlots.next();
            Assertions.assertThat((int)nextSlot.getIndex()).isZero();
            Assertions.assertThat((Comparable)nextSlot.getAllocationId()).isEqualTo((Object)allocationId);
            Assertions.assertThat((Comparable)nextSlot.getJobId()).isEqualTo((Object)jobId);
            Assertions.assertThat((boolean)allocatedSlots.hasNext()).isFalse();
        }));
    }

    @Test
    void testAddTask() throws Exception {
        JobID jobId = new JobID();
        ExecutionAttemptID executionAttemptId = ExecutionGraphTestUtils.createExecutionAttemptId();
        AllocationID allocationId = new AllocationID();
        TestingTaskSlotPayload task = new TestingTaskSlotPayload(jobId, executionAttemptId, allocationId).terminate();
        TaskSlotTableImplTest.runInMainThread((SupplierWithException<TaskSlotTableImpl<TaskSlotPayload>, Exception>)((SupplierWithException)() -> TaskSlotTableImplTest.createTaskSlotTableWithStartedTask(task)), (ThrowingConsumer<TaskSlotTableImpl<TaskSlotPayload>, ? extends Exception>)((ThrowingConsumer)taskSlotTable -> {
            Iterator tasks = taskSlotTable.getTasks(jobId);
            TaskSlotPayload nextTask = (TaskSlotPayload)tasks.next();
            Assertions.assertThat((Object)nextTask.getExecutionId()).isEqualTo((Object)executionAttemptId);
            Assertions.assertThat((Comparable)nextTask.getAllocationId()).isEqualTo((Object)allocationId);
            Assertions.assertThat((boolean)tasks.hasNext()).isFalse();
        }));
    }

    @Test
    void testRemoveTaskCallsFreeSlotAction() throws Exception {
        JobID jobId = new JobID();
        ExecutionAttemptID executionAttemptId = ExecutionGraphTestUtils.createExecutionAttemptId();
        AllocationID allocationId = new AllocationID();
        CompletableFuture freeSlotFuture = new CompletableFuture();
        TestingSlotActions slotActions = new TestingSlotActions(freeSlotFuture::complete, (aid, uid) -> {});
        TestingTaskSlotPayload task = new TestingTaskSlotPayload(jobId, executionAttemptId, allocationId).terminate();
        TaskSlotTableImplTest.runInMainThread((SupplierWithException<TaskSlotTableImpl<TaskSlotPayload>, Exception>)((SupplierWithException)() -> TaskSlotTableImplTest.createTaskSlotTableWithStartedTask(task, slotActions)), (ThrowingConsumer<TaskSlotTableImpl<TaskSlotPayload>, ? extends Exception>)((ThrowingConsumer)taskSlotTable -> {
            taskSlotTable.freeSlot(allocationId);
            taskSlotTable.removeTask(executionAttemptId);
            FlinkAssertions.assertThatFuture((CompletableFuture)freeSlotFuture).eventuallySucceeds().isEqualTo((Object)allocationId);
        }));
    }

    @Test
    void testFreeSlotInterruptsSubmittedTask() throws Exception {
        TestingTaskSlotPayload task = new TestingTaskSlotPayload();
        TaskSlotTableImplTest.runInMainThread((SupplierWithException<TaskSlotTableImpl<TaskSlotPayload>, Exception>)((SupplierWithException)() -> TaskSlotTableImplTest.createTaskSlotTableWithStartedTask(task)), (ThrowingConsumer<TaskSlotTableImpl<TaskSlotPayload>, ? extends Exception>)((ThrowingConsumer)taskSlotTable -> {
            Assertions.assertThat((int)taskSlotTable.freeSlot(task.getAllocationId())).isEqualTo(-1);
            task.waitForFailure();
            task.terminate();
        }));
    }

    @Test
    void testTableIsClosedOnlyWhenAllTasksTerminated() throws Exception {
        TestingTaskSlotPayload task = new TestingTaskSlotPayload();
        TaskSlotTableImplTest.runInMainThread((SupplierWithException<TaskSlotTableImpl<TaskSlotPayload>, Exception>)((SupplierWithException)() -> TaskSlotTableImplTest.createTaskSlotTableWithStartedTask(task)), (ThrowingConsumer<TaskSlotTableImpl<TaskSlotPayload>, ? extends Exception>)((ThrowingConsumer)taskSlotTable -> {
            Assertions.assertThat((int)taskSlotTable.freeSlot(task.getAllocationId())).isEqualTo(-1);
            CompletableFuture closingFuture = taskSlotTable.closeAsync();
            Assertions.assertThat((CompletableFuture)closingFuture).isNotDone();
            task.terminate();
        }));
    }

    @Test
    void testAllocatedSlotTimeout() throws Exception {
        CompletableFuture timeoutFuture = new CompletableFuture();
        TestingSlotActions testingSlotActions = new TestingSlotActionsBuilder().setTimeoutSlotConsumer((allocationID, uuid) -> timeoutFuture.complete(allocationID)).build();
        TaskSlotTableImplTest.runInMainThread((SupplierWithException<TaskSlotTableImpl<TaskSlotPayload>, Exception>)((SupplierWithException)() -> TaskSlotTableImplTest.createTaskSlotTableAndStart(1, testingSlotActions)), (ThrowingConsumer<TaskSlotTableImpl<TaskSlotPayload>, ? extends Exception>)((ThrowingConsumer)taskSlotTable -> {
            AllocationID allocationId = new AllocationID();
            Assertions.assertThatNoException().isThrownBy(() -> taskSlotTable.allocateSlot(0, new JobID(), allocationId, Duration.ofMillis(1L)));
            FlinkAssertions.assertThatFuture((CompletableFuture)timeoutFuture).eventuallySucceeds().isEqualTo((Object)allocationId);
        }));
    }

    @Test
    void testMarkSlotActiveDeactivatesSlotTimeout() throws Exception {
        this.runDeactivateSlotTimeoutTest((TriFunctionWithException<TaskSlotTable<TaskSlotPayload>, JobID, AllocationID, Boolean, SlotNotFoundException>)((TriFunctionWithException)(taskSlotTable, jobId, allocationId) -> taskSlotTable.markSlotActive(allocationId)));
    }

    @Test
    void testTryMarkSlotActiveDeactivatesSlotTimeout() throws Exception {
        this.runDeactivateSlotTimeoutTest((TriFunctionWithException<TaskSlotTable<TaskSlotPayload>, JobID, AllocationID, Boolean, SlotNotFoundException>)((TriFunctionWithException)TaskSlotTable::tryMarkSlotActive));
    }

    private void runDeactivateSlotTimeoutTest(TriFunctionWithException<TaskSlotTable<TaskSlotPayload>, JobID, AllocationID, Boolean, SlotNotFoundException> taskSlotTableAction) throws Exception {
        CompletableFuture timeoutCancellationFuture = new CompletableFuture();
        TestingTimerService<AllocationID> testingTimerService = new TestingTimerServiceBuilder<AllocationID>().setUnregisterTimeoutConsumer(timeoutCancellationFuture::complete).createTestingTimerService();
        TaskSlotTableImplTest.runInMainThread((SupplierWithException<TaskSlotTableImpl<TaskSlotPayload>, Exception>)((SupplierWithException)() -> TaskSlotTableImplTest.createTaskSlotTableAndStart(1, (TimerService<AllocationID>)testingTimerService)), (ThrowingConsumer<TaskSlotTableImpl<TaskSlotPayload>, ? extends Exception>)((ThrowingConsumer)taskSlotTable -> {
            AllocationID allocationId = new AllocationID();
            long timeout = 50L;
            JobID jobId = new JobID();
            Assertions.assertThatNoException().isThrownBy(() -> taskSlotTable.allocateSlot(0, jobId, allocationId, Duration.ofMillis(50L)));
            Assertions.assertThat((Boolean)((Boolean)taskSlotTableAction.apply(taskSlotTable, (Object)jobId, (Object)allocationId))).isTrue();
            timeoutCancellationFuture.get();
        }));
    }

    private static TaskSlotTableImpl<TaskSlotPayload> createTaskSlotTableWithStartedTask(TaskSlotPayload task) throws SlotNotFoundException, SlotNotActiveException {
        return TaskSlotTableImplTest.createTaskSlotTableWithStartedTask(task, new TestingSlotActionsBuilder().build());
    }

    private static TaskSlotTableImpl<TaskSlotPayload> createTaskSlotTableWithStartedTask(TaskSlotPayload task, SlotActions slotActions) throws SlotNotFoundException, SlotNotActiveException {
        TaskSlotTableImpl<TaskSlotPayload> taskSlotTable = TaskSlotTableImplTest.createTaskSlotTableWithAllocatedSlot(task.getJobID(), task.getAllocationId(), slotActions);
        taskSlotTable.markSlotActive(task.getAllocationId());
        taskSlotTable.addTask(task);
        return taskSlotTable;
    }

    private static TaskSlotTableImpl<TaskSlotPayload> createTaskSlotTableWithAllocatedSlot(JobID jobId, AllocationID allocationId, SlotActions slotActions) {
        TaskSlotTableImpl<TaskSlotPayload> taskSlotTable = TaskSlotTableImplTest.createTaskSlotTableAndStart(1, slotActions);
        Assertions.assertThatNoException().isThrownBy(() -> taskSlotTable.allocateSlot(0, jobId, allocationId, SLOT_TIMEOUT));
        return taskSlotTable;
    }

    private static TaskSlotTableImpl<TaskSlotPayload> createTaskSlotTableAndStart(int numberOfSlots) {
        return TaskSlotTableImplTest.createTaskSlotTableAndStart(numberOfSlots, new TestingSlotActionsBuilder().build());
    }

    private static TaskSlotTableImpl<TaskSlotPayload> createTaskSlotTableAndStart(int numberOfSlots, SlotActions slotActions) {
        TaskSlotTableImpl taskSlotTable = TaskSlotUtils.createTaskSlotTable(numberOfSlots, (ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor());
        taskSlotTable.start(slotActions, ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor((ScheduledExecutorService)MAIN_THREAD_EXECUTOR_EXTENSION.getExecutor()));
        return taskSlotTable;
    }

    private static TaskSlotTableImpl<TaskSlotPayload> createTaskSlotTableAndStart(int numberOfSlots, TimerService<AllocationID> timerService) {
        TaskSlotTableImpl taskSlotTable = TaskSlotUtils.createTaskSlotTable(numberOfSlots, timerService, (ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor());
        taskSlotTable.start((SlotActions)new TestingSlotActionsBuilder().build(), ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor((ScheduledExecutorService)MAIN_THREAD_EXECUTOR_EXTENSION.getExecutor()));
        return taskSlotTable;
    }

    private static void runInMainThread(SupplierWithException<TaskSlotTableImpl<TaskSlotPayload>, Exception> taskSlotTableFactory, ThrowingConsumer<TaskSlotTableImpl<TaskSlotPayload>, ? extends Exception> callback) throws Exception {
        TaskSlotTableImpl taskSlotTable = (TaskSlotTableImpl)taskSlotTableFactory.get();
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)FutureUtils.runAsync(() -> callback.accept((Object)taskSlotTable), (Executor)MAIN_THREAD_EXECUTOR_EXTENSION.getExecutor()).thenApply(ignored -> taskSlotTable.closeAsync())).thenCompose(Function.identity())).thenRun(() -> Assertions.assertThat((boolean)taskSlotTable.isClosed()).isTrue())).join();
    }

    private static void runInMainThread(int slotCount, ThrowingConsumer<TaskSlotTableImpl<TaskSlotPayload>, ? extends Exception> callback) throws Exception {
        TaskSlotTableImplTest.runInMainThread((SupplierWithException<TaskSlotTableImpl<TaskSlotPayload>, Exception>)((SupplierWithException)() -> TaskSlotTableImplTest.createTaskSlotTableAndStart(slotCount)), callback);
    }
}

