/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.CleanupOptions;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.dispatcher.AbstractDispatcherTest;
import org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerRegistry;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.JobCancellationFailedException;
import org.apache.flink.runtime.dispatcher.JobManagerRunnerRegistry;
import org.apache.flink.runtime.dispatcher.JobMasterTester;
import org.apache.flink.runtime.dispatcher.NoOpExecutionPlanListener;
import org.apache.flink.runtime.dispatcher.TestingDispatcher;
import org.apache.flink.runtime.dispatcher.TestingJobManagerRunnerRegistry;
import org.apache.flink.runtime.dispatcher.cleanup.DispatcherResourceCleanerFactory;
import org.apache.flink.runtime.dispatcher.cleanup.ResourceCleanerFactory;
import org.apache.flink.runtime.dispatcher.cleanup.TestingRetryStrategies;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.JobResultEntry;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.ExecutionPlanWriter;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.leaderelection.TestingLeaderElection;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingExecutionPlanStore;
import org.apache.flink.runtime.testutils.TestingJobResultStore;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class DispatcherCleanupITCase
extends AbstractDispatcherTest {
    private final BlockingQueue<RpcEndpoint> toTerminate = new LinkedBlockingQueue<RpcEndpoint>();

    @Override
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.haServices.setCheckpointRecoveryFactory((CheckpointRecoveryFactory)new PerJobCheckpointRecoveryFactory((maxCheckpoints, previous, sharedStateRegistryFactory, ioExecutor, recoveryClaimMode) -> {
            if (previous != null) {
                Assertions.assertThat((Optional)previous.getShutdownStatus()).isPresent();
                Assertions.assertThat((List)previous.getAllCheckpoints()).isEmpty();
                return new EmbeddedCompletedCheckpointStore(maxCheckpoints, (Collection)previous.getAllCheckpoints(), sharedStateRegistryFactory.create(ioExecutor, (Collection)previous.getAllCheckpoints(), recoveryClaimMode));
            }
            return new EmbeddedCompletedCheckpointStore(maxCheckpoints, Collections.emptyList(), sharedStateRegistryFactory.create(ioExecutor, Collections.emptyList(), RecoveryClaimMode.DEFAULT));
        }));
    }

    @Override
    @After
    public void tearDown() {
        while (!this.toTerminate.isEmpty()) {
            RpcEndpoint endpoint = (RpcEndpoint)this.toTerminate.poll();
            try {
                RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{endpoint});
            }
            catch (Exception exception) {}
        }
    }

    @Test
    public void testCleanupThroughRetries() throws Exception {
        JobGraph jobGraph = this.createJobGraph();
        JobID jobId = jobGraph.getJobID();
        AtomicInteger actualGlobalCleanupCallCount = new AtomicInteger();
        OneShotLatch successfulCleanupLatch = new OneShotLatch();
        int numberOfErrors = 5;
        RuntimeException temporaryError = new RuntimeException("Expected RuntimeException: Unable to remove job graph.");
        AtomicInteger failureCount = new AtomicInteger(5);
        TestingExecutionPlanStore executionPlanStore = TestingExecutionPlanStore.newBuilder().setGlobalCleanupFunction((ignoredJobId, ignoredExecutor) -> {
            actualGlobalCleanupCallCount.incrementAndGet();
            if (failureCount.getAndDecrement() > 0) {
                return FutureUtils.completedExceptionally((Throwable)temporaryError);
            }
            successfulCleanupLatch.trigger();
            return FutureUtils.completedVoidFuture();
        }).build();
        executionPlanStore.start(NoOpExecutionPlanListener.INSTANCE);
        this.haServices.setExecutionPlanStore(executionPlanStore);
        TestingLeaderElection leaderElection = new TestingLeaderElection();
        this.haServices.setJobMasterLeaderElection(jobId, leaderElection);
        DefaultJobManagerRunnerRegistry jobManagerRunnerRegistry = new DefaultJobManagerRunnerRegistry(2);
        TestingDispatcher dispatcher = this.createTestingDispatcherBuilder().setResourceCleanerFactory((ResourceCleanerFactory)new DispatcherResourceCleanerFactory((Executor)ForkJoinPool.commonPool(), TestingRetryStrategies.createWithNumberOfRetries(5), (JobManagerRunnerRegistry)jobManagerRunnerRegistry, (ExecutionPlanWriter)this.haServices.getExecutionPlanStore(), this.blobServer, (HighAvailabilityServices)this.haServices, UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup())).build(rpcService);
        dispatcher.start();
        this.toTerminate.add((RpcEndpoint)dispatcher);
        CompletableFuture<LeaderInformation> confirmedLeaderInformation = leaderElection.isLeader(UUID.randomUUID());
        DispatcherGateway dispatcherGateway = (DispatcherGateway)dispatcher.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.submitJob((ExecutionPlan)jobGraph, TIMEOUT).get();
        this.waitForJobToFinish(confirmedLeaderInformation, dispatcherGateway, jobId);
        successfulCleanupLatch.await();
        Assertions.assertThat((int)actualGlobalCleanupCallCount.get()).isEqualTo(6);
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)this.haServices.getExecutionPlanStore().getJobIds()).as("The JobGraph should be removed from ExecutionPlanStore.", new Object[0])).isEmpty();
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> (Boolean)this.haServices.getJobResultStore().hasJobResultEntryAsync(jobId).get()));
    }

    @Test
    public void testCleanupNotCancellable() throws Exception {
        JobGraph jobGraph = this.createJobGraph();
        JobID jobId = jobGraph.getJobID();
        EmbeddedJobResultStore jobResultStore = new EmbeddedJobResultStore();
        jobResultStore.createDirtyResultAsync(new JobResultEntry(TestingJobResultStore.createSuccessfulJobResult(jobId))).get();
        this.haServices.setJobResultStore((JobResultStore)jobResultStore);
        CompletableFuture<Object> jobManagerRunnerCleanupFuture = new CompletableFuture<Object>();
        AtomicReference<JobManagerRunner> jobManagerRunnerEntry = new AtomicReference<JobManagerRunner>();
        TestingJobManagerRunnerRegistry jobManagerRunnerRegistry = TestingJobManagerRunnerRegistry.newSingleJobBuilder(jobManagerRunnerEntry).withLocalCleanupAsyncFunction((actualJobId, executor) -> jobManagerRunnerCleanupFuture).build();
        TestingDispatcher dispatcher = this.createTestingDispatcherBuilder().setJobManagerRunnerRegistry(jobManagerRunnerRegistry).build(rpcService);
        dispatcher.start();
        this.toTerminate.add((RpcEndpoint)dispatcher);
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> jobManagerRunnerEntry.get() != null));
        ((AbstractBooleanAssert)Assertions.assertThat((Boolean)((Boolean)this.haServices.getJobResultStore().hasDirtyJobResultEntryAsync(jobId).get())).as("The JobResultStore should have this job still marked as dirty.", new Object[0])).isTrue();
        DispatcherGateway dispatcherGateway = (DispatcherGateway)dispatcher.getSelfGateway(DispatcherGateway.class);
        Assertions.assertThatThrownBy(() -> dispatcherGateway.cancelJob(jobId, TIMEOUT).get()).hasCauseInstanceOf(JobCancellationFailedException.class);
        jobManagerRunnerCleanupFuture.complete(null);
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> (Boolean)this.haServices.getJobResultStore().hasCleanJobResultEntryAsync(jobId).get()));
    }

    @Test
    public void testCleanupAfterLeadershipChange() throws Exception {
        JobGraph jobGraph = this.createJobGraph();
        JobID jobId = jobGraph.getJobID();
        AtomicInteger actualGlobalCleanupCallCount = new AtomicInteger();
        OneShotLatch firstCleanupTriggered = new OneShotLatch();
        CompletableFuture successfulJobGraphCleanup = new CompletableFuture();
        TestingExecutionPlanStore executionPlanStore = TestingExecutionPlanStore.newBuilder().setGlobalCleanupFunction((actualJobId, ignoredExecutor) -> {
            int callCount = actualGlobalCleanupCallCount.getAndIncrement();
            firstCleanupTriggered.trigger();
            if (callCount < 1) {
                return FutureUtils.completedExceptionally((Throwable)new RuntimeException("Expected RuntimeException: Unable to remove job graph."));
            }
            successfulJobGraphCleanup.complete(actualJobId);
            return FutureUtils.completedVoidFuture();
        }).build();
        executionPlanStore.start(NoOpExecutionPlanListener.INSTANCE);
        this.haServices.setExecutionPlanStore(executionPlanStore);
        TestingLeaderElection leaderElection = new TestingLeaderElection();
        this.haServices.setJobMasterLeaderElection(jobId, leaderElection);
        this.configuration.set(CleanupOptions.CLEANUP_STRATEGY, (Object)((String)CleanupOptions.NONE_PARAM_VALUES.iterator().next()));
        TestingDispatcher dispatcher = this.createTestingDispatcherBuilder().build(rpcService);
        dispatcher.start();
        this.toTerminate.add((RpcEndpoint)dispatcher);
        CompletableFuture<LeaderInformation> confirmedLeaderInformation = leaderElection.isLeader(UUID.randomUUID());
        DispatcherGateway dispatcherGateway = (DispatcherGateway)dispatcher.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.submitJob((ExecutionPlan)jobGraph, TIMEOUT).get();
        this.waitForJobToFinish(confirmedLeaderInformation, dispatcherGateway, jobId);
        firstCleanupTriggered.await();
        ((AbstractIntegerAssert)Assertions.assertThat((int)actualGlobalCleanupCallCount.get()).as("The cleanup should have been triggered only once.", new Object[0])).isOne();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)successfulJobGraphCleanup.isDone()).as("The cleanup should not have reached the successful cleanup code path.", new Object[0])).isFalse();
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)this.haServices.getExecutionPlanStore().getJobIds()).as("The JobGraph is still stored in the ExecutionPlanStore.", new Object[0])).containsExactly((Object[])new JobID[]{jobId});
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)this.haServices.getJobResultStore().getDirtyResults().stream().map(JobResult::getJobId).collect(Collectors.toSet())).as("The JobResultStore should have this job marked as dirty.", new Object[0])).containsExactly((Object[])new JobID[]{jobId});
        TestingDispatcher secondDispatcher = this.createTestingDispatcherBuilder().setRecoveredDirtyJobs(this.haServices.getJobResultStore().getDirtyResults()).build(rpcService);
        secondDispatcher.start();
        this.toTerminate.add((RpcEndpoint)secondDispatcher);
        leaderElection.isLeader(UUID.randomUUID());
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> this.haServices.getJobResultStore().getDirtyResults().isEmpty()));
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)this.haServices.getExecutionPlanStore().getJobIds()).as("The JobGraph is not stored in the ExecutionPlanStore.", new Object[0])).isEmpty();
        ((AbstractBooleanAssert)Assertions.assertThat((Boolean)((Boolean)this.haServices.getJobResultStore().hasJobResultEntryAsync(jobId).get())).as("The JobResultStore has the job listed as clean.", new Object[0])).isTrue();
        Assertions.assertThat((Comparable)((JobID)successfulJobGraphCleanup.get())).isEqualTo((Object)jobId);
        Assertions.assertThat((int)actualGlobalCleanupCallCount.get()).isEqualTo(2);
    }

    private void waitForJobToFinish(CompletableFuture<LeaderInformation> confirmedLeaderInformation, DispatcherGateway dispatcherGateway, JobID jobId) throws Exception {
        JobMasterGateway jobMasterGateway = DispatcherCleanupITCase.connectToLeadingJobMaster(confirmedLeaderInformation).get();
        try (JobMasterTester tester = new JobMasterTester(rpcService, jobId, jobMasterGateway);){
            CompletableFuture<List<TaskDeploymentDescriptor>> descriptorsFuture = tester.deployVertices(2);
            DispatcherCleanupITCase.awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
            tester.transitionTo(descriptorsFuture.get(), ExecutionState.INITIALIZING).get();
            tester.transitionTo(descriptorsFuture.get(), ExecutionState.RUNNING).get();
            tester.getCheckpointFuture(1L).get();
            tester.transitionTo(descriptorsFuture.get(), ExecutionState.FINISHED).get();
        }
        DispatcherCleanupITCase.awaitStatus(dispatcherGateway, jobId, JobStatus.FINISHED);
    }

    private JobGraph createJobGraph() {
        JobVertex firstVertex = new JobVertex("first");
        firstVertex.setInvokableClass(NoOpInvokable.class);
        firstVertex.setParallelism(1);
        JobVertex secondVertex = new JobVertex("second");
        secondVertex.setInvokableClass(NoOpInvokable.class);
        secondVertex.setParallelism(1);
        CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = CheckpointCoordinatorConfiguration.builder().setCheckpointInterval(20L).setMinPauseBetweenCheckpoints(20L).setCheckpointTimeout(10000L).build();
        JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(checkpointCoordinatorConfiguration, null);
        return JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertex(firstVertex).addJobVertex(secondVertex).setJobCheckpointingSettings(checkpointingSettings).build();
    }

    private static CompletableFuture<JobMasterGateway> connectToLeadingJobMaster(CompletableFuture<LeaderInformation> confirmedLeaderInformation) {
        return confirmedLeaderInformation.thenCompose(leaderInformation -> rpcService.connect(leaderInformation.getLeaderAddress(), JobMasterId.fromUuidOrNull((UUID)leaderInformation.getLeaderSessionID()), JobMasterGateway.class));
    }
}

