package org.apache.flink.runtime.dispatcher;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.dispatcher.TestingDispatcher;
import org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource;
import org.apache.flink.runtime.dispatcher.cleanup.TestingResourceCleanerFactory;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.JobResultEntry;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.testutils.TestingJobResultStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.Reference;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.ThrowingRunnable;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsInstanceOf;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.class */
public class DispatcherResourceCleanupTest extends TestLogger {

    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    @Rule
    public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();
    private static TestingRpcService rpcService;
    private JobID jobId;
    private JobGraph jobGraph;
    private TestingDispatcher dispatcher;
    private DispatcherGateway dispatcherGateway;
    private BlobServer blobServer;
    private CompletableFuture<JobID> localCleanupFuture;
    private CompletableFuture<JobID> globalCleanupFuture;

    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final Time timeout = Time.seconds(10);

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest$BlockingJobManagerRunnerFactory.class */
    private static final class BlockingJobManagerRunnerFactory extends TestingJobMasterServiceLeadershipRunnerFactory {
        private final ThrowingRunnable<Exception> jobManagerRunnerCreationLatch;
        private TestingJobManagerRunner testingRunner;

        BlockingJobManagerRunnerFactory(ThrowingRunnable<Exception> throwingRunnable) {
            this.jobManagerRunnerCreationLatch = throwingRunnable;
        }

        @Override // org.apache.flink.runtime.dispatcher.TestingJobMasterServiceLeadershipRunnerFactory
        public TestingJobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, Collection<FailureEnricher> collection, long j) throws Exception {
            this.jobManagerRunnerCreationLatch.run();
            this.testingRunner = super.createJobManagerRunner(jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler, collection, j);
            this.testingRunner.completeJobMasterGatewayFuture(new TestingJobMasterGatewayBuilder().setRequestJobSupplier(() -> {
                return CompletableFuture.completedFuture(new ExecutionGraphInfo(ArchivedExecutionGraph.createSparseArchivedExecutionGraph(jobGraph.getJobID(), jobGraph.getName(), JobStatus.RUNNING, (JobType) null, (Throwable) null, (JobCheckpointingSettings) null, 1337L)));
            }).build());
            return this.testingRunner;
        }

        public void setJobStatus(JobStatus jobStatus) {
            Preconditions.checkState(this.testingRunner != null, "JobManagerRunner must be created before this method is available");
            this.testingRunner.setJobStatus(jobStatus);
        }

        @Override // org.apache.flink.runtime.dispatcher.TestingJobMasterServiceLeadershipRunnerFactory
        /* renamed from: createJobManagerRunner, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ JobManagerRunner mo70createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, Collection collection, long j) throws Exception {
            return createJobManagerRunner(jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler, (Collection<FailureEnricher>) collection, j);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest$FailingJobManagerRunnerFactory.class */
    private class FailingJobManagerRunnerFactory implements JobManagerRunnerFactory {
        private final Exception testException;

        public FailingJobManagerRunnerFactory(FlinkException flinkException) {
            this.testException = flinkException;
        }

        public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, Collection<FailureEnricher> collection, long j) throws Exception {
            throw this.testException;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest$QueueJobManagerRunnerFactory.class */
    private static final class QueueJobManagerRunnerFactory implements JobManagerRunnerFactory {
        private final Queue<? extends JobManagerRunner> jobManagerRunners;

        private QueueJobManagerRunnerFactory(Queue<? extends JobManagerRunner> queue) {
            this.jobManagerRunners = queue;
        }

        public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, Collection<FailureEnricher> collection, long j) {
            return (JobManagerRunner) Optional.ofNullable(this.jobManagerRunners.poll()).orElseThrow(() -> {
                return new IllegalStateException("Cannot create more JobManagerRunners.");
            });
        }
    }

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @Before
    public void setup() throws Exception {
        this.jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        this.jobId = this.jobGraph.getJobID();
        this.globalCleanupFuture = new CompletableFuture<>();
        this.localCleanupFuture = new CompletableFuture<>();
        this.blobServer = BlobUtils.createBlobServer(new Configuration(), Reference.owned(temporaryFolder.newFolder()), new TestingBlobStoreBuilder().createTestingBlobStore());
    }

    private TestingJobManagerRunnerFactory startDispatcherAndSubmitJob() throws Exception {
        return startDispatcherAndSubmitJob(0);
    }

    private TestingJobManagerRunnerFactory startDispatcherAndSubmitJob(int i) throws Exception {
        return startDispatcherAndSubmitJob(createTestingDispatcherBuilder(), i);
    }

    private TestingJobManagerRunnerFactory startDispatcherAndSubmitJob(TestingDispatcher.Builder builder, int i) throws Exception {
        TestingJobMasterServiceLeadershipRunnerFactory testingJobMasterServiceLeadershipRunnerFactory = new TestingJobMasterServiceLeadershipRunnerFactory(i);
        startDispatcher(builder, testingJobMasterServiceLeadershipRunnerFactory);
        submitJobAndWait();
        return testingJobMasterServiceLeadershipRunnerFactory;
    }

    private void startDispatcher(JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
        startDispatcher(createTestingDispatcherBuilder(), jobManagerRunnerFactory);
    }

    private void startDispatcher(TestingDispatcher.Builder builder, JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
        this.dispatcher = builder.setJobManagerRunnerFactory(jobManagerRunnerFactory).build(rpcService);
        this.dispatcher.start();
        this.dispatcherGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
    }

    private TestingDispatcher.Builder createTestingDispatcherBuilder() {
        LocallyCleanableResource defaultJobManagerRunnerRegistry = new DefaultJobManagerRunnerRegistry(2);
        return TestingDispatcher.builder().setBlobServer(this.blobServer).setJobManagerRunnerRegistry(defaultJobManagerRunnerRegistry).setFatalErrorHandler(this.testingFatalErrorHandlerResource.getFatalErrorHandler()).setResourceCleanerFactory(TestingResourceCleanerFactory.builder().withLocallyCleanableResource(defaultJobManagerRunnerRegistry).withGloballyCleanableResource((jobID, executor) -> {
            this.globalCleanupFuture.complete(jobID);
            return FutureUtils.completedVoidFuture();
        }).withLocallyCleanableResource((jobID2, executor2) -> {
            this.localCleanupFuture.complete(jobID2);
            return FutureUtils.completedVoidFuture();
        }).build());
    }

    @After
    public void teardown() throws Exception {
        if (this.dispatcher != null) {
            this.dispatcher.close();
        }
        if (this.blobServer != null) {
            this.blobServer.close();
        }
    }

    @AfterClass
    public static void teardownClass() throws ExecutionException, InterruptedException {
        if (rpcService != null) {
            rpcService.closeAsync().get();
        }
    }

    @Test
    public void testGlobalCleanupWhenJobFinished() throws Exception {
        finishJob(startDispatcherAndSubmitJob().takeCreatedJobManagerRunner());
        assertGlobalCleanupTriggered(this.jobId);
    }

    @Test
    public void testGlobalCleanupWhenJobCanceled() throws Exception {
        cancelJob(startDispatcherAndSubmitJob().takeCreatedJobManagerRunner());
        assertGlobalCleanupTriggered(this.jobId);
    }

    private CompletableFuture<Acknowledge> submitJob() {
        return this.dispatcherGateway.submitJob(this.jobGraph, timeout);
    }

    private void submitJobAndWait() {
        submitJob().join();
    }

    @Test
    public void testLocalCleanupWhenJobNotFinished() throws Exception {
        suspendJob(startDispatcherAndSubmitJob().takeCreatedJobManagerRunner());
        assertLocalCleanupTriggered(this.jobId);
    }

    @Test
    public void testGlobalCleanupWhenJobSubmissionFails() throws Exception {
        startDispatcher(new FailingJobManagerRunnerFactory(new FlinkException("Test exception")));
        try {
            submitJob().get();
            Assert.fail("Job submission was expected to fail.");
        } catch (ExecutionException e) {
            MatcherAssert.assertThat(e, FlinkMatchers.containsCause(JobSubmissionException.class));
        }
        assertGlobalCleanupTriggered(this.jobId);
    }

    @Test
    public void testLocalCleanupWhenClosingDispatcher() throws Exception {
        startDispatcherAndSubmitJob();
        this.dispatcher.closeAsync().get();
        assertLocalCleanupTriggered(this.jobId);
    }

    @Test
    public void testGlobalCleanupWhenJobFinishedWhileClosingDispatcher() throws Exception {
        TestingJobManagerRunner build = TestingJobManagerRunner.newBuilder().setBlockingTermination(true).setJobId(this.jobId).build();
        startDispatcher(new QueueJobManagerRunnerFactory(new ArrayDeque(Arrays.asList(build))));
        submitJobAndWait();
        CompletableFuture closeAsync = this.dispatcher.closeAsync();
        build.getCloseAsyncCalledLatch().await();
        build.completeResultFuture(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(this.jobId).setState(JobStatus.FINISHED).build()));
        build.completeTerminationFuture();
        closeAsync.get();
        assertGlobalCleanupTriggered(this.jobId);
    }

    @Test
    public void testJobBeingMarkedAsDirtyBeforeCleanup() throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        finishJob(startDispatcherAndSubmitJob(createTestingDispatcherBuilder().setJobResultStore(TestingJobResultStore.builder().withCreateDirtyResultConsumer(jobResultEntry -> {
            try {
                oneShotLatch.await();
                return FutureUtils.completedVoidFuture();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return FutureUtils.completedExceptionally(e);
            }
        }).build()), 0).takeCreatedJobManagerRunner());
        assertThatNoCleanupWasTriggered();
        oneShotLatch.trigger();
        assertGlobalCleanupTriggered(this.jobId);
    }

    @Test
    public void testJobBeingMarkedAsCleanAfterCleanup() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingJobResultStore build = TestingJobResultStore.builder().withMarkResultAsCleanConsumer(jobID -> {
            completableFuture.complete(jobID);
            return FutureUtils.completedVoidFuture();
        }).build();
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        finishJob(startDispatcherAndSubmitJob(createTestingDispatcherBuilder().setJobResultStore(build).setResourceCleanerFactory(TestingResourceCleanerFactory.builder().withLocallyCleanableResource((jobID2, executor) -> {
            try {
                oneShotLatch.await();
                return FutureUtils.completedVoidFuture();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).withGloballyCleanableResource((jobID3, executor2) -> {
            try {
                oneShotLatch2.await();
                return FutureUtils.completedVoidFuture();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).build()), 0).takeCreatedJobManagerRunner());
        MatcherAssert.assertThat(Boolean.valueOf(completableFuture.isDone()), Matchers.is(false));
        oneShotLatch.trigger();
        MatcherAssert.assertThat(Boolean.valueOf(completableFuture.isDone()), Matchers.is(false));
        oneShotLatch2.trigger();
        MatcherAssert.assertThat(completableFuture.get(), Matchers.is(this.jobId));
    }

    @Test
    public void testJobSubmissionUnderSameJobId() throws Exception {
        TestingJobManagerRunner takeCreatedJobManagerRunner = startDispatcherAndSubmitJob(1).takeCreatedJobManagerRunner();
        suspendJob(takeCreatedJobManagerRunner);
        takeCreatedJobManagerRunner.getCloseAsyncCalledLatch().await();
        CompletableFuture submitJob = this.dispatcherGateway.submitJob(this.jobGraph, timeout);
        try {
            submitJob.get(10L, TimeUnit.MILLISECONDS);
            Assert.fail("The job submission future should not complete until the previous JobManager termination future has been completed.");
            takeCreatedJobManagerRunner.completeTerminationFuture();
        } catch (TimeoutException e) {
            takeCreatedJobManagerRunner.completeTerminationFuture();
        } catch (Throwable th) {
            takeCreatedJobManagerRunner.completeTerminationFuture();
            throw th;
        }
        MatcherAssert.assertThat(submitJob.get(), Matchers.equalTo(Acknowledge.get()));
    }

    @Test
    public void testDuplicateJobSubmissionDoesNotDeleteJobMetaData() throws Exception {
        TestingJobManagerRunnerFactory startDispatcherAndSubmitJob = startDispatcherAndSubmitJob();
        try {
            try {
                this.dispatcherGateway.submitJob(this.jobGraph, timeout).get();
                Assert.fail("Expected a DuplicateJobSubmissionFailure.");
            } catch (ExecutionException e) {
                MatcherAssert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowable(e, DuplicateJobSubmissionException.class).isPresent()), Matchers.is(true));
            }
            assertThatNoCleanupWasTriggered();
            finishJob(startDispatcherAndSubmitJob.takeCreatedJobManagerRunner());
            assertGlobalCleanupTriggered(this.jobId);
        } catch (Throwable th) {
            finishJob(startDispatcherAndSubmitJob.takeCreatedJobManagerRunner());
            throw th;
        }
    }

    private void finishJob(TestingJobManagerRunner testingJobManagerRunner) {
        terminateJobWithState(testingJobManagerRunner, JobStatus.FINISHED);
    }

    private void suspendJob(TestingJobManagerRunner testingJobManagerRunner) {
        terminateJobWithState(testingJobManagerRunner, JobStatus.SUSPENDED);
    }

    private void cancelJob(TestingJobManagerRunner testingJobManagerRunner) {
        terminateJobWithState(testingJobManagerRunner, JobStatus.CANCELED);
    }

    private void terminateJobWithState(TestingJobManagerRunner testingJobManagerRunner, JobStatus jobStatus) {
        testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(this.jobId).setState(jobStatus).build()));
    }

    private void assertThatNoCleanupWasTriggered() {
        MatcherAssert.assertThat(Boolean.valueOf(this.globalCleanupFuture.isDone()), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(this.localCleanupFuture.isDone()), Matchers.is(false));
    }

    @Test
    public void testDispatcherTerminationTerminatesRunningJobMasters() throws Exception {
        TestingJobManagerRunnerFactory startDispatcherAndSubmitJob = startDispatcherAndSubmitJob();
        this.dispatcher.closeAsync().get();
        MatcherAssert.assertThat(Boolean.valueOf(startDispatcherAndSubmitJob.takeCreatedJobManagerRunner().getTerminationFuture().isDone()), Matchers.is(true));
    }

    @Test
    public void testDispatcherTerminationWaitsForJobMasterTerminations() throws Exception {
        TestingJobManagerRunnerFactory startDispatcherAndSubmitJob = startDispatcherAndSubmitJob(1);
        CompletableFuture closeAsync = this.dispatcher.closeAsync();
        try {
            closeAsync.get(10L, TimeUnit.MILLISECONDS);
            Assert.fail("We should not terminate before all running JobMasters have terminated.");
            startDispatcherAndSubmitJob.takeCreatedJobManagerRunner().completeTerminationFuture();
        } catch (TimeoutException e) {
            startDispatcherAndSubmitJob.takeCreatedJobManagerRunner().completeTerminationFuture();
        } catch (Throwable th) {
            startDispatcherAndSubmitJob.takeCreatedJobManagerRunner().completeTerminationFuture();
            throw th;
        }
        closeAsync.get();
    }

    private void assertLocalCleanupTriggered(JobID jobID) throws ExecutionException, InterruptedException, TimeoutException {
        MatcherAssert.assertThat(this.localCleanupFuture.get(), Matchers.equalTo(jobID));
        MatcherAssert.assertThat(Boolean.valueOf(this.globalCleanupFuture.isDone()), Matchers.is(false));
    }

    private void assertGlobalCleanupTriggered(JobID jobID) throws ExecutionException, InterruptedException, TimeoutException {
        MatcherAssert.assertThat(Boolean.valueOf(this.localCleanupFuture.isDone()), Matchers.is(false));
        MatcherAssert.assertThat(this.globalCleanupFuture.get(), Matchers.equalTo(jobID));
    }

    @Test
    public void testFatalErrorIfJobCannotBeMarkedDirtyInJobResultStore() throws Exception {
        startDispatcherAndSubmitJob(createTestingDispatcherBuilder().setJobResultStore(TestingJobResultStore.builder().withCreateDirtyResultConsumer(jobResultEntry -> {
            return FutureUtils.completedExceptionally(new IOException("Expected IOException."));
        }).build()), 0).takeCreatedJobManagerRunner().completeResultFuture(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(this.jobId).setState(JobStatus.FINISHED).build()));
        MatcherAssert.assertThat(this.testingFatalErrorHandlerResource.getFatalErrorHandler().getErrorFuture().get(), IsInstanceOf.instanceOf(FlinkException.class));
        this.testingFatalErrorHandlerResource.getFatalErrorHandler().clearError();
    }

    @Test
    public void testErrorHandlingIfJobCannotBeMarkedAsCleanInJobResultStore() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        startDispatcherAndSubmitJob(createTestingDispatcherBuilder().setJobResultStore(TestingJobResultStore.builder().withCreateDirtyResultConsumer(jobResultEntry -> {
            completableFuture.complete(jobResultEntry);
            return FutureUtils.completedVoidFuture();
        }).withMarkResultAsCleanConsumer(jobID -> {
            return FutureUtils.completedExceptionally(new IOException("Expected IOException."));
        }).build()), 0).takeCreatedJobManagerRunner().completeResultFuture(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(this.jobId).setState(JobStatus.FINISHED).build()));
        try {
            Assert.fail("No error should have been reported but an " + this.testingFatalErrorHandlerResource.getFatalErrorHandler().getErrorFuture().get(100L, TimeUnit.MILLISECONDS).getClass() + " was handled.");
        } catch (TimeoutException e) {
        }
        MatcherAssert.assertThat(((JobResultEntry) completableFuture.get()).getJobId(), Matchers.is(this.jobId));
    }

    @Test
    public void testFailingJobManagerRunnerCleanup() throws Exception {
        FlinkException flinkException = new FlinkException("Test exception.");
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2);
        BlockingJobManagerRunnerFactory blockingJobManagerRunnerFactory = new BlockingJobManagerRunnerFactory(() -> {
            Optional optional = (Optional) arrayBlockingQueue.take();
            if (optional.isPresent()) {
                throw ((Exception) optional.get());
            }
        });
        startDispatcher(blockingJobManagerRunnerFactory);
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        arrayBlockingQueue.offer(Optional.of(flinkException));
        try {
            selfGateway.submitJob(this.jobGraph, Time.minutes(1L)).get();
            Assert.fail("A FlinkException is expected");
        } catch (Throwable th) {
            MatcherAssert.assertThat(th, FlinkMatchers.containsCause(FlinkException.class));
            MatcherAssert.assertThat(th, FlinkMatchers.containsMessage(flinkException.getMessage()));
            assertGlobalCleanupTriggered(this.jobId);
        }
        arrayBlockingQueue.offer(Optional.empty());
        selfGateway.submitJob(this.jobGraph, Time.minutes(1L)).get();
        blockingJobManagerRunnerFactory.setJobStatus(JobStatus.RUNNING);
        AbstractDispatcherTest.awaitStatus(selfGateway, this.jobId, JobStatus.RUNNING);
    }

    @Test
    public void testArchivingFinishedJobToHistoryServer() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        finishJob(startDispatcherAndSubmitJob(createTestingDispatcherBuilder().setHistoryServerArchivist(executionGraphInfo -> {
            return completableFuture;
        }), 0).takeCreatedJobManagerRunner());
        assertThatNoCleanupWasTriggered();
        CompletableFuture<Void> jobTerminationFuture = this.dispatcher.getJobTerminationFuture(this.jobId, Time.hours(1L));
        Assert.assertFalse(jobTerminationFuture.isDone());
        completableFuture.complete(Acknowledge.get());
        assertGlobalCleanupTriggered(this.jobId);
        jobTerminationFuture.join();
    }

    @Test
    public void testNotArchivingSuspendedJobToHistoryServer() throws Exception {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        suspendJob(startDispatcherAndSubmitJob(createTestingDispatcherBuilder().setHistoryServerArchivist(executionGraphInfo -> {
            atomicBoolean.set(true);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }), 0).takeCreatedJobManagerRunner());
        assertLocalCleanupTriggered(this.jobId);
        this.dispatcher.getJobTerminationFuture(this.jobId, Time.hours(1L)).join();
        Assert.assertFalse(atomicBoolean.get());
    }
}
