/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher.cleanup;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.dispatcher.JobManagerRunnerRegistry;
import org.apache.flink.runtime.dispatcher.TestingJobManagerRunnerRegistry;
import org.apache.flink.runtime.dispatcher.cleanup.DispatcherResourceCleanerFactory;
import org.apache.flink.runtime.dispatcher.cleanup.TestingRetryStrategies;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobmanager.ExecutionPlanWriter;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.testutils.TestingExecutionPlanStore;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class DispatcherResourceCleanerFactoryTest {
    private static final JobID JOB_ID = new JobID();
    private CleanableBlobServer blobServer;
    private CompletableFuture<JobID> jobManagerRunnerRegistryLocalCleanupFuture;
    private CompletableFuture<Void> jobManagerRunnerRegistryLocalCleanupResultFuture;
    private CompletableFuture<JobID> executionPlanWriterLocalCleanupFuture;
    private CompletableFuture<JobID> executionPlanWriterGlobalCleanupFuture;
    private CompletableFuture<JobID> highAvailabilityServicesGlobalCleanupFuture;
    private JobManagerMetricGroup jobManagerMetricGroup;
    private DispatcherResourceCleanerFactory testInstance;

    @BeforeEach
    public void setup() throws Exception {
        this.blobServer = new CleanableBlobServer();
        TestingMetricRegistry metricRegistry = TestingMetricRegistry.builder().build();
        this.jobManagerMetricGroup = JobManagerMetricGroup.createJobManagerMetricGroup((MetricRegistry)metricRegistry, (String)"ignored hostname");
        this.jobManagerMetricGroup.addJob(JOB_ID, "ignored job name");
        this.testInstance = new DispatcherResourceCleanerFactory(Executors.directExecutor(), TestingRetryStrategies.NO_RETRY_STRATEGY, this.createJobManagerRunnerRegistry(), this.createExecutionPlanWriter(), (BlobServer)this.blobServer, this.createHighAvailabilityServices(), this.jobManagerMetricGroup);
    }

    private JobManagerRunnerRegistry createJobManagerRunnerRegistry() {
        this.jobManagerRunnerRegistryLocalCleanupFuture = new CompletableFuture();
        this.jobManagerRunnerRegistryLocalCleanupResultFuture = new CompletableFuture();
        return TestingJobManagerRunnerRegistry.builder().withLocalCleanupAsyncFunction((jobId, executor) -> {
            this.jobManagerRunnerRegistryLocalCleanupFuture.complete((JobID)jobId);
            return this.jobManagerRunnerRegistryLocalCleanupResultFuture;
        }).build();
    }

    private ExecutionPlanWriter createExecutionPlanWriter() throws Exception {
        this.executionPlanWriterLocalCleanupFuture = new CompletableFuture();
        this.executionPlanWriterGlobalCleanupFuture = new CompletableFuture();
        TestingExecutionPlanStore executionPlanStore = TestingExecutionPlanStore.newBuilder().setGlobalCleanupFunction((jobId, executor) -> {
            this.executionPlanWriterGlobalCleanupFuture.complete((JobID)jobId);
            return FutureUtils.completedVoidFuture();
        }).setLocalCleanupFunction((jobId, ignoredExecutor) -> {
            this.executionPlanWriterLocalCleanupFuture.complete((JobID)jobId);
            return FutureUtils.completedVoidFuture();
        }).build();
        executionPlanStore.start(null);
        return executionPlanStore;
    }

    private HighAvailabilityServices createHighAvailabilityServices() {
        this.highAvailabilityServicesGlobalCleanupFuture = new CompletableFuture();
        TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
        haServices.setGlobalCleanupFuture(this.highAvailabilityServicesGlobalCleanupFuture);
        return haServices;
    }

    @Test
    public void testLocalResourceCleaning() {
        this.assertCleanupNotTriggered();
        CompletableFuture cleanupResultFuture = this.testInstance.createLocalResourceCleaner(ComponentMainThreadExecutorServiceAdapter.forMainThread()).cleanupAsync(JOB_ID);
        this.assertWaitingForPrioritizedCleanupToFinish();
        Assertions.assertThat((CompletableFuture)cleanupResultFuture).isNotCompleted();
        this.jobManagerRunnerRegistryLocalCleanupResultFuture.complete(null);
        Assertions.assertThat(this.jobManagerRunnerRegistryLocalCleanupFuture).isCompleted();
        Assertions.assertThat(this.executionPlanWriterLocalCleanupFuture).isCompleted();
        Assertions.assertThat(this.executionPlanWriterGlobalCleanupFuture).isNotDone();
        Assertions.assertThat(this.blobServer.getLocalCleanupFuture()).isCompleted();
        Assertions.assertThat(this.blobServer.getGlobalCleanupFuture()).isNotDone();
        Assertions.assertThat(this.highAvailabilityServicesGlobalCleanupFuture).isNotDone();
        this.assertJobMetricGroupCleanedUp();
        Assertions.assertThat((CompletableFuture)cleanupResultFuture).isCompleted();
    }

    @Test
    public void testGlobalResourceCleaning() throws ExecutionException, InterruptedException, TimeoutException {
        this.assertCleanupNotTriggered();
        CompletableFuture cleanupResultFuture = this.testInstance.createGlobalResourceCleaner(ComponentMainThreadExecutorServiceAdapter.forMainThread()).cleanupAsync(JOB_ID);
        this.assertWaitingForPrioritizedCleanupToFinish();
        Assertions.assertThat((CompletableFuture)cleanupResultFuture).isNotCompleted();
        this.jobManagerRunnerRegistryLocalCleanupResultFuture.complete(null);
        Assertions.assertThat(this.jobManagerRunnerRegistryLocalCleanupFuture).isCompleted();
        Assertions.assertThat(this.executionPlanWriterLocalCleanupFuture).isNotDone();
        Assertions.assertThat(this.executionPlanWriterGlobalCleanupFuture).isCompleted();
        Assertions.assertThat(this.blobServer.getLocalCleanupFuture()).isNotDone();
        Assertions.assertThat(this.blobServer.getGlobalCleanupFuture()).isCompleted();
        Assertions.assertThat(this.highAvailabilityServicesGlobalCleanupFuture).isCompleted();
        this.assertJobMetricGroupCleanedUp();
        Assertions.assertThat((CompletableFuture)cleanupResultFuture).isCompleted();
    }

    private void assertCleanupNotTriggered() {
        Assertions.assertThat(this.jobManagerRunnerRegistryLocalCleanupFuture).isNotDone();
        this.assertNoRegularCleanupsTriggered();
    }

    private void assertWaitingForPrioritizedCleanupToFinish() {
        Assertions.assertThat(this.jobManagerRunnerRegistryLocalCleanupFuture).isCompleted();
        this.assertNoRegularCleanupsTriggered();
    }

    private void assertNoRegularCleanupsTriggered() {
        Assertions.assertThat(this.executionPlanWriterLocalCleanupFuture).isNotDone();
        Assertions.assertThat(this.executionPlanWriterGlobalCleanupFuture).isNotDone();
        Assertions.assertThat(this.blobServer.getLocalCleanupFuture()).isNotDone();
        Assertions.assertThat(this.blobServer.getGlobalCleanupFuture()).isNotDone();
        Assertions.assertThat(this.highAvailabilityServicesGlobalCleanupFuture).isNotDone();
        Assertions.assertThat((int)this.jobManagerMetricGroup.numRegisteredJobMetricGroups()).isEqualTo(1);
    }

    private void assertJobMetricGroupCleanedUp() {
        Assertions.assertThat((int)this.jobManagerMetricGroup.numRegisteredJobMetricGroups()).isEqualTo(0);
    }

    private static class CleanableBlobServer
    extends BlobServer {
        private final CompletableFuture<JobID> localCleanupFuture = new CompletableFuture();
        private final CompletableFuture<JobID> globalCleanupFuture = new CompletableFuture();

        public CleanableBlobServer() throws IOException {
            super(new Configuration(), new File("non-existent-file"), (BlobStore)new TestingBlobStoreBuilder().createTestingBlobStore());
        }

        public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor ignoredExecutor) {
            this.localCleanupFuture.complete(jobId);
            return FutureUtils.completedVoidFuture();
        }

        public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor ignoredExecutor) {
            this.globalCleanupFuture.complete(jobId);
            return FutureUtils.completedVoidFuture();
        }

        public CompletableFuture<JobID> getLocalCleanupFuture() {
            return this.localCleanupFuture;
        }

        public CompletableFuture<JobID> getGlobalCleanupFuture() {
            return this.globalCleanupFuture;
        }
    }
}

