/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPoolBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.TestingFreeSlotTracker;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler;
import org.apache.flink.runtime.scheduler.adaptive.Finished;
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;

public class AdaptiveSchedulerTestBase {
    protected static final Duration DEFAULT_TIMEOUT = Duration.ofHours(1L);
    protected static final int PARALLELISM = 4;
    protected static final JobVertex JOB_VERTEX = ExecutionGraphTestUtils.createNoOpVertex("v1", 4);
    @RegisterExtension
    protected static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    @RegisterExtension
    protected static final TestExecutorExtension<ScheduledExecutorService> TEST_EXECUTOR_RESOURCE = new TestExecutorExtension(Executors::newSingleThreadScheduledExecutor);
    protected final ManuallyTriggeredComponentMainThreadExecutor mainThreadExecutor = new ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
    protected final ComponentMainThreadExecutor singleThreadMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor((ScheduledExecutorService)TEST_EXECUTOR_RESOURCE.getExecutor());
    protected final ClassLoader classLoader = ClassLoader.getSystemClassLoader();
    protected AdaptiveScheduler scheduler;

    @BeforeEach
    void before() {
        this.scheduler = null;
    }

    @AfterEach
    void after() {
        AdaptiveSchedulerTestBase.closeInExecutorService(this.scheduler, (Executor)this.singleThreadMainThreadExecutor);
    }

    protected static JobGraph createJobGraphWithCheckpointing(JobVertex ... jobVertex) {
        JobGraph jobGraph = JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertices(Arrays.asList(jobVertex)).build();
        SchedulerTestingUtils.enableCheckpointing(jobGraph, null, null, Duration.ofHours(1L).toMillis(), true);
        return jobGraph;
    }

    protected static void closeInExecutorService(@Nullable AdaptiveScheduler scheduler, Executor executor) {
        if (scheduler != null) {
            CompletableFuture closeFuture = new CompletableFuture();
            executor.execute(() -> {
                try {
                    scheduler.cancel();
                    FutureUtils.forward((CompletableFuture)scheduler.closeAsync(), (CompletableFuture)closeFuture);
                }
                catch (Throwable t) {
                    closeFuture.completeExceptionally(t);
                }
            });
            scheduler.getJobTerminationFuture().whenCompleteAsync((jobStatus, error) -> {
                Assertions.assertThat(scheduler.getState().getClass()).isEqualTo(Finished.class);
                if (error != null) {
                    closeFuture.completeExceptionally((Throwable)error);
                } else {
                    try {
                        FutureUtils.forward((CompletableFuture)scheduler.closeAsync(), (CompletableFuture)closeFuture);
                    }
                    catch (Throwable t) {
                        closeFuture.completeExceptionally(t);
                    }
                }
            }, executor);
            FlinkAssertions.assertThatFuture(closeFuture).eventuallySucceeds();
        }
    }

    protected static DeclarativeSlotPool getSlotPoolWithFreeSlots(int freeSlots) {
        return new TestingDeclarativeSlotPoolBuilder().setContainsFreeSlotFunction(allocationID -> true).setReserveFreeSlotFunction((allocationId, resourceProfile) -> TestingPhysicalSlot.builder().withAllocationID((AllocationID)allocationId).build()).setGetFreeSlotTrackerSupplier(() -> TestingFreeSlotTracker.newBuilder().setGetFreeSlotsInformationSupplier(() -> IntStream.range(0, freeSlots).mapToObj(v -> new TestingSlot()).collect(Collectors.toSet())).build()).build();
    }

    protected void startTestInstanceInMainThread() {
        this.runInMainThread(() -> this.scheduler.startScheduling());
    }

    protected void runInMainThread(Runnable callback) {
        CompletableFuture.runAsync(callback, (Executor)this.singleThreadMainThreadExecutor).join();
    }

    protected <T> T supplyInMainThread(Supplier<T> supplier) throws Exception {
        return CompletableFuture.supplyAsync(supplier, (Executor)this.singleThreadMainThreadExecutor).get();
    }
}

