/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskmanager;

import java.io.IOException;
import java.net.URL;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteChannelStateChecker;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
import org.apache.flink.runtime.taskmanager.TestingTaskManagerActions;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.WrappingRuntimeException;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

public class TaskTest
extends TestLogger {
    private static final String RESTORE_EXCEPTION_MSG = "TestExceptionInRestore";
    private ShuffleEnvironment<?, ?> shuffleEnvironment;
    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private static boolean wasCleanedUp = false;

    @Before
    public void setup() {
        this.shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
        wasCleanedUp = false;
    }

    @After
    public void teardown() throws Exception {
        if (this.shuffleEnvironment != null) {
            this.shuffleEnvironment.close();
        }
    }

    @Test
    public void testCleanupWhenRestoreFails() throws Exception {
        this.createTaskBuilder().setInvokable(InvokableWithExceptionInRestore.class).build(Executors.directExecutor()).run();
        Assert.assertTrue((boolean)wasCleanedUp);
    }

    @Test
    public void testCleanupWhenInvokeFails() throws Exception {
        this.createTaskBuilder().setInvokable(InvokableWithExceptionInInvoke.class).build(Executors.directExecutor()).run();
        Assert.assertTrue((boolean)wasCleanedUp);
    }

    @Test
    public void testCleanupWhenCancelledAfterRestore() throws Exception {
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingInRestore.class).build(Executors.directExecutor());
        task.startTaskThread();
        this.awaitInvokableLatch(task);
        task.cancelExecution();
        task.getExecutingThread().join();
        Assert.assertTrue((boolean)wasCleanedUp);
    }

    @Test
    public void testCleanupWhenAfterInvokeSucceeded() throws Exception {
        Task task = this.createTaskBuilder().setInvokable(TestInvokableCorrect.class).build(Executors.directExecutor());
        task.run();
        Assert.assertTrue((boolean)wasCleanedUp);
        Assert.assertFalse((boolean)task.isCanceledOrFailed());
    }

    @Test
    public void testCleanupWhenSwitchToInitializationFails() throws Exception {
        Task task = this.createTaskBuilder().setInvokable(TestInvokableCorrect.class).setTaskManagerActions(new NoOpTaskManagerActions(){

            @Override
            public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
                if (taskExecutionState.getExecutionState() == ExecutionState.INITIALIZING) {
                    throw new ExpectedTestException();
                }
            }
        }).build(Executors.directExecutor());
        task.run();
        Assert.assertTrue((boolean)wasCleanedUp);
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
    }

    @Test
    public void testRegularExecution() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setInvokable(TestInvokableCorrect.class).setTaskManagerActions(taskManagerActions).build(Executors.directExecutor());
        Assert.assertEquals((Object)ExecutionState.CREATED, (Object)task.getExecutionState());
        Assert.assertFalse((boolean)task.isCanceledOrFailed());
        Assert.assertNull((Object)task.getFailureCause());
        task.run();
        Assert.assertEquals((Object)ExecutionState.FINISHED, (Object)task.getExecutionState());
        Assert.assertFalse((boolean)task.isCanceledOrFailed());
        Assert.assertNull((Object)task.getFailureCause());
        Assert.assertNull((Object)task.getInvokable());
        taskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.FINISHED, task, null);
    }

    @Test
    public void testCancelRightAway() throws Exception {
        Task task = this.createTaskBuilder().build(Executors.directExecutor());
        task.cancelExecution();
        Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)task.getExecutionState());
        task.run();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
        Assert.assertNull((Object)task.getInvokable());
    }

    @Test
    public void testFailExternallyRightAway() throws Exception {
        Task task = this.createTaskBuilder().build(Executors.directExecutor());
        task.failExternally((Throwable)new Exception("fail externally"));
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        task.run();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
    }

    @Test
    public void testLibraryCacheRegistrationFailed() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        IOException testException = new IOException("Could not load classloader");
        Task task = this.createTaskBuilder().setTaskManagerActions(taskManagerActions).setClassLoaderHandle((LibraryCacheManager.ClassLoaderHandle)TestingClassLoaderLease.newBuilder().setGetOrResolveClassLoaderFunction((BiFunctionWithException<Collection<PermanentBlobKey>, Collection<URL>, UserCodeClassLoader, IOException>)((BiFunctionWithException)(permanentBlobKeys, urls) -> {
            throw testException;
        })).build()).build(Executors.directExecutor());
        Assert.assertEquals((Object)ExecutionState.CREATED, (Object)task.getExecutionState());
        Assert.assertFalse((boolean)task.isCanceledOrFailed());
        Assert.assertNull((Object)task.getFailureCause());
        task.run();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        MatcherAssert.assertThat((Object)task.getFailureCause(), (Matcher)CoreMatchers.is((Object)testException));
        Assert.assertNull((Object)task.getInvokable());
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, testException);
    }

    @Test
    public void testExecutionFailsInNetworkRegistrationForPartitions() throws Exception {
        PartitionDescriptor partitionDescriptor = PartitionDescriptorBuilder.newBuilder().build();
        NettyShuffleDescriptor shuffleDescriptor = NettyShuffleDescriptorBuilder.newBuilder().buildLocal();
        ResultPartitionDeploymentDescriptor dummyPartition = new ResultPartitionDeploymentDescriptor(partitionDescriptor, (ShuffleDescriptor)shuffleDescriptor, 1);
        this.testExecutionFailsInNetworkRegistration(Collections.singletonList(dummyPartition), Collections.emptyList());
    }

    @Test
    public void testExecutionFailsInNetworkRegistrationForGates() throws Exception {
        NettyShuffleDescriptor dummyChannel = NettyShuffleDescriptorBuilder.newBuilder().buildRemote();
        InputGateDeploymentDescriptor dummyGate = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, new TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex[]{new TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex((ShuffleDescriptor)dummyChannel, 0)});
        this.testExecutionFailsInNetworkRegistration(Collections.emptyList(), Collections.singletonList(dummyGate));
    }

    private void testExecutionFailsInNetworkRegistration(List<ResultPartitionDeploymentDescriptor> resultPartitions, List<InputGateDeploymentDescriptor> inputGates) throws Exception {
        String errorMessage = "Network buffer pool has already been destroyed.";
        PartitionProducerStateChecker partitionProducerStateChecker = (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class);
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = new TestTaskBuilder(this.shuffleEnvironment).setTaskManagerActions(taskManagerActions).setPartitionProducerStateChecker(partitionProducerStateChecker).setResultPartitions(resultPartitions).setInputGates(inputGates).build(EXECUTOR_RESOURCE.getExecutor());
        this.shuffleEnvironment.close();
        task.run();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("Network buffer pool has already been destroyed."));
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new IllegalStateException("Network buffer pool has already been destroyed."));
    }

    @Test
    public void testInvokableInstantiationFailed() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setTaskManagerActions(taskManagerActions).setInvokable(InvokableNonInstantiable.class).build(Executors.directExecutor());
        task.run();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("instantiate"));
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, (Throwable)new FlinkException("Could not instantiate the task's invokable class."));
    }

    @Test
    public void testExecutionFailsInRestore() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setInvokable(InvokableWithExceptionInRestore.class).setTaskManagerActions(taskManagerActions).build(Executors.directExecutor());
        task.run();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertNotNull((Object)task.getFailureCause());
        Assert.assertNotNull((Object)task.getFailureCause().getMessage());
        MatcherAssert.assertThat((Object)task.getFailureCause().getMessage(), (Matcher)CoreMatchers.containsString((String)RESTORE_EXCEPTION_MSG));
        taskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new Exception(RESTORE_EXCEPTION_MSG));
    }

    @Test
    public void testExecutionFailsInInvoke() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setInvokable(InvokableWithExceptionInInvoke.class).setTaskManagerActions(taskManagerActions).build(Executors.directExecutor());
        task.run();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertNotNull((Object)task.getFailureCause());
        Assert.assertNotNull((Object)task.getFailureCause().getMessage());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("test"));
        taskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new Exception("test"));
    }

    @Test
    public void testFailWithWrappedException() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setInvokable(FailingInvokableWithChainedException.class).setTaskManagerActions(taskManagerActions).build(Executors.directExecutor());
        task.run();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Throwable cause = task.getFailureCause();
        Assert.assertTrue((boolean)(cause instanceof IOException));
        taskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new IOException("test"));
    }

    @Test
    public void testCancelDuringRestore() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingInRestore.class).setTaskManagerActions(taskManagerActions).build(Executors.directExecutor());
        task.startTaskThread();
        this.awaitInvokableLatch(task);
        task.cancelExecution();
        Assert.assertTrue((task.getExecutionState() == ExecutionState.CANCELING || task.getExecutionState() == ExecutionState.CANCELED ? 1 : 0) != 0);
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertNull((Object)task.getFailureCause());
        taskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.CANCELED, task, null);
    }

    @Test
    public void testCancelDuringInvoke() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setTaskManagerActions(taskManagerActions).build(Executors.directExecutor());
        task.startTaskThread();
        this.awaitInvokableLatch(task);
        task.cancelExecution();
        Assert.assertTrue((task.getExecutionState() == ExecutionState.CANCELING || task.getExecutionState() == ExecutionState.CANCELED ? 1 : 0) != 0);
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertNull((Object)task.getFailureCause());
        taskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.CANCELED, task, null);
    }

    @Test
    public void testFailExternallyDuringRestore() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingInRestore.class).setTaskManagerActions(taskManagerActions).build(Executors.directExecutor());
        task.startTaskThread();
        this.awaitInvokableLatch(task);
        task.failExternally((Throwable)new Exception(RESTORE_EXCEPTION_MSG));
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        MatcherAssert.assertThat((Object)task.getFailureCause().getMessage(), (Matcher)CoreMatchers.containsString((String)RESTORE_EXCEPTION_MSG));
        taskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new Exception(RESTORE_EXCEPTION_MSG));
    }

    @Test
    public void testFailExternallyDuringInvoke() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setTaskManagerActions(taskManagerActions).build(Executors.directExecutor());
        task.startTaskThread();
        this.awaitInvokableLatch(task);
        task.failExternally((Throwable)new Exception("test"));
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("test"));
        taskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new Exception("test"));
    }

    @Test
    public void testCanceledAfterExecutionFailedInInvoke() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setInvokable(InvokableWithExceptionInInvoke.class).setTaskManagerActions(taskManagerActions).build(Executors.directExecutor());
        task.run();
        task.cancelExecution();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("test"));
        taskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new Exception("test"));
    }

    @Test
    public void testExecutionFailsAfterCanceling() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setInvokable(InvokableWithExceptionOnTrigger.class).setTaskManagerActions(taskManagerActions).build(Executors.directExecutor());
        task.startTaskThread();
        this.awaitInvokableLatch(task);
        task.cancelExecution();
        Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)task.getExecutionState());
        this.triggerInvokableLatch(task);
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertNull((Object)task.getFailureCause());
        taskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.CANCELED, task, null);
    }

    @Test
    public void testExecutionFailsAfterTaskMarkedFailed() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setInvokable(InvokableWithExceptionOnTrigger.class).setTaskManagerActions(taskManagerActions).build(Executors.directExecutor());
        task.startTaskThread();
        this.awaitInvokableLatch(task);
        task.failExternally((Throwable)new Exception("external"));
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        this.triggerInvokableLatch(task);
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("external"));
        taskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new Exception("external"));
    }

    @Test
    public void testCancelTaskException() throws Exception {
        Task task = this.createTaskBuilder().setInvokable(InvokableWithCancelTaskExceptionInInvoke.class).build(Executors.directExecutor());
        task.startTaskThread();
        this.triggerInvokableLatch(task);
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
    }

    @Test
    public void testCancelTaskExceptionAfterTaskMarkedFailed() throws Exception {
        Task task = this.createTaskBuilder().setInvokable(InvokableWithCancelTaskExceptionInInvoke.class).build(Executors.directExecutor());
        task.startTaskThread();
        this.awaitInvokableLatch(task);
        task.failExternally((Throwable)new Exception("external"));
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        this.triggerInvokableLatch(task);
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("external"));
    }

    @Test
    public void testOnPartitionStateUpdateWhileRunning() throws Exception {
        this.testOnPartitionStateUpdate(ExecutionState.RUNNING);
    }

    @Test
    public void testOnPartitionStateUpdateWhileDeploying() throws Exception {
        this.testOnPartitionStateUpdate(ExecutionState.DEPLOYING);
    }

    public void testOnPartitionStateUpdate(ExecutionState initialTaskState) throws Exception {
        ResultPartitionID partitionId = new ResultPartitionID();
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).build(Executors.directExecutor());
        RemoteChannelStateChecker checker = new RemoteChannelStateChecker(partitionId, "test task");
        HashMap<ExecutionState, ExecutionState> expected = new HashMap<ExecutionState, ExecutionState>(ExecutionState.values().length);
        for (ExecutionState state : ExecutionState.values()) {
            expected.put(state, ExecutionState.FAILED);
        }
        expected.put(ExecutionState.INITIALIZING, initialTaskState);
        expected.put(ExecutionState.RUNNING, initialTaskState);
        expected.put(ExecutionState.SCHEDULED, initialTaskState);
        expected.put(ExecutionState.DEPLOYING, initialTaskState);
        expected.put(ExecutionState.FINISHED, initialTaskState);
        expected.put(ExecutionState.CANCELED, ExecutionState.CANCELING);
        expected.put(ExecutionState.CANCELING, ExecutionState.CANCELING);
        expected.put(ExecutionState.FAILED, ExecutionState.CANCELING);
        int producingStateCounter = 0;
        for (ExecutionState state : ExecutionState.values()) {
            TestTaskBuilder.setTaskState(task, initialTaskState);
            Task task2 = task;
            Objects.requireNonNull(task2);
            if (checker.isProducerReadyOrAbortConsumption((PartitionProducerStateProvider.ResponseHandle)new Task.PartitionProducerStateResponseHandle(task2, state, null))) {
                ++producingStateCounter;
            }
            ExecutionState newTaskState = task.getExecutionState();
            Assert.assertEquals(expected.get(state), (Object)newTaskState);
        }
        Assert.assertEquals((long)5L, (long)producingStateCounter);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTriggerPartitionStateUpdate() throws Exception {
        IntermediateDataSetID resultId = new IntermediateDataSetID();
        ResultPartitionID partitionId = new ResultPartitionID();
        PartitionProducerStateChecker partitionChecker = (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class);
        AtomicInteger callCount = new AtomicInteger(0);
        RemoteChannelStateChecker remoteChannelStateChecker = new RemoteChannelStateChecker(partitionId, "test task");
        this.setup();
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setPartitionProducerStateChecker(partitionChecker).build(Executors.directExecutor());
        TestTaskBuilder.setTaskState(task, ExecutionState.RUNNING);
        CompletableFuture<ExecutionState> promise = new CompletableFuture<ExecutionState>();
        Mockito.when((Object)partitionChecker.requestPartitionProducerState((JobID)ArgumentMatchers.eq((Object)task.getJobID()), (IntermediateDataSetID)ArgumentMatchers.eq((Object)resultId), (ResultPartitionID)ArgumentMatchers.eq((Object)partitionId))).thenReturn(promise);
        task.requestPartitionProducerState(resultId, partitionId, checkResult -> MatcherAssert.assertThat((Object)remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult), (Matcher)CoreMatchers.is((Object)false)));
        promise.completeExceptionally((Throwable)new PartitionProducerDisposedException(partitionId));
        Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)task.getExecutionState());
        this.setup();
        task = this.createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setPartitionProducerStateChecker(partitionChecker).build(Executors.directExecutor());
        TestTaskBuilder.setTaskState(task, ExecutionState.RUNNING);
        promise = new CompletableFuture();
        Mockito.when((Object)partitionChecker.requestPartitionProducerState((JobID)ArgumentMatchers.eq((Object)task.getJobID()), (IntermediateDataSetID)ArgumentMatchers.eq((Object)resultId), (ResultPartitionID)ArgumentMatchers.eq((Object)partitionId))).thenReturn(promise);
        task.requestPartitionProducerState(resultId, partitionId, checkResult -> MatcherAssert.assertThat((Object)remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult), (Matcher)CoreMatchers.is((Object)false)));
        promise.completeExceptionally(new RuntimeException("Any other exception"));
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        callCount.set(0);
        this.setup();
        task = this.createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setPartitionProducerStateChecker(partitionChecker).build(Executors.directExecutor());
        try {
            task.startTaskThread();
            this.awaitInvokableLatch(task);
            promise = new CompletableFuture();
            Mockito.when((Object)partitionChecker.requestPartitionProducerState((JobID)ArgumentMatchers.eq((Object)task.getJobID()), (IntermediateDataSetID)ArgumentMatchers.eq((Object)resultId), (ResultPartitionID)ArgumentMatchers.eq((Object)partitionId))).thenReturn(promise);
            task.requestPartitionProducerState(resultId, partitionId, checkResult -> {
                if (remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult)) {
                    callCount.incrementAndGet();
                }
            });
            promise.completeExceptionally(new TimeoutException());
            Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)task.getExecutionState());
            Assert.assertEquals((long)1L, (long)callCount.get());
        }
        finally {
            task.getExecutingThread().interrupt();
            task.getExecutingThread().join();
        }
        callCount.set(0);
        this.setup();
        task = this.createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setPartitionProducerStateChecker(partitionChecker).build(Executors.directExecutor());
        try {
            task.startTaskThread();
            this.awaitInvokableLatch(task);
            promise = new CompletableFuture();
            Mockito.when((Object)partitionChecker.requestPartitionProducerState((JobID)ArgumentMatchers.eq((Object)task.getJobID()), (IntermediateDataSetID)ArgumentMatchers.eq((Object)resultId), (ResultPartitionID)ArgumentMatchers.eq((Object)partitionId))).thenReturn(promise);
            task.requestPartitionProducerState(resultId, partitionId, checkResult -> {
                if (remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult)) {
                    callCount.incrementAndGet();
                }
            });
            promise.complete(ExecutionState.RUNNING);
            Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)task.getExecutionState());
            Assert.assertEquals((long)1L, (long)callCount.get());
        }
        finally {
            task.getExecutingThread().interrupt();
            task.getExecutingThread().join();
        }
    }

    @Test
    public void testWatchDogInterruptsTask() throws Exception {
        ProhibitFatalErrorTaskManagerActions taskManagerActions = new ProhibitFatalErrorTaskManagerActions();
        Configuration config = new Configuration();
        config.set(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, (Object)Duration.ofMillis(5L));
        config.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, (Object)Duration.ofMillis(60000L));
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingInCancel.class).setTaskManagerConfig(config).setTaskManagerActions(taskManagerActions).build(Executors.directExecutor());
        task.startTaskThread();
        this.awaitInvokableLatch(task);
        task.cancelExecution();
        task.getExecutingThread().join();
    }

    @Test
    public void testWatchDogThrowFatalErrorOnTaskStuckInInstantiation() throws Exception {
        InterruptOnFatalErrorTaskManagerActions taskManagerActions = new InterruptOnFatalErrorTaskManagerActions();
        Configuration config = new Configuration();
        config.set(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, (Object)Duration.ofMillis(5L));
        config.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, (Object)Duration.ofMillis(1000L));
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingInInstantiation.class).setTaskManagerConfig(config).setTaskManagerActions(taskManagerActions).build(Executors.directExecutor());
        taskManagerActions.setExecutingThread(task.getExecutingThread());
        task.startTaskThread();
        InvokableBlockingInInstantiation.await();
        task.cancelExecution();
        task.getExecutingThread().join();
        Assert.assertTrue((boolean)taskManagerActions.hasFatalError());
    }

    @Test
    public void testInterruptibleSharedLockInInvokeAndCancel() throws Exception {
        ProhibitFatalErrorTaskManagerActions taskManagerActions = new ProhibitFatalErrorTaskManagerActions();
        Configuration config = new Configuration();
        config.set(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, (Object)Duration.ofMillis(5L));
        config.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, (Object)Duration.ofMillis(1000L));
        Task task = this.createTaskBuilder().setInvokable(InvokableInterruptibleSharedLockInInvokeAndCancel.class).setTaskManagerConfig(config).setTaskManagerActions(taskManagerActions).build(Executors.directExecutor());
        task.startTaskThread();
        this.awaitInvokableLatch(task);
        task.cancelExecution();
        task.getExecutingThread().join();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFatalErrorAfterUnInterruptibleInvoke() throws Exception {
        CompletableFuture fatalErrorFuture = new CompletableFuture();
        TestingTaskManagerActions taskManagerActions = TestingTaskManagerActions.newBuilder().setNotifyFatalErrorConsumer((s, throwable) -> fatalErrorFuture.complete(throwable)).build();
        Configuration config = new Configuration();
        config.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, (Object)Duration.ofMillis(10L));
        Task task = this.createTaskBuilder().setInvokable(InvokableUnInterruptibleBlockingInvoke.class).setTaskManagerConfig(config).setTaskManagerActions(taskManagerActions).build(Executors.directExecutor());
        try {
            task.startTaskThread();
            this.awaitInvokableLatch(task);
            task.cancelExecution();
            Throwable fatalError = (Throwable)fatalErrorFuture.join();
            MatcherAssert.assertThat((Object)fatalError, (Matcher)CoreMatchers.is((Matcher)CoreMatchers.notNullValue()));
        }
        finally {
            this.triggerInvokableLatch(task);
            task.getExecutingThread().interrupt();
            task.getExecutingThread().join();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFatalErrorOnCanceling() throws Exception {
        CompletableFuture fatalErrorFuture = new CompletableFuture();
        TestingTaskManagerActions taskManagerActions = TestingTaskManagerActions.newBuilder().setNotifyFatalErrorConsumer((s, throwable) -> fatalErrorFuture.complete(throwable)).build();
        Configuration config = new Configuration();
        config.set(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, (Object)Duration.ofMillis(5L));
        config.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, (Object)Duration.ofMillis(50L));
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingWithTrigger.class).setTaskManagerConfig(config).setTaskManagerActions(taskManagerActions).build(Executors.directExecutor());
        Task spyTask = (Task)Mockito.spy((Object)task);
        Class<OutOfMemoryError> fatalErrorType = OutOfMemoryError.class;
        ((Task)Mockito.doThrow(fatalErrorType).when((Object)spyTask)).cancelOrFailAndCancelInvokableInternal((ExecutionState)ArgumentMatchers.eq((Object)ExecutionState.CANCELING), (Throwable)ArgumentMatchers.eq(null));
        try {
            spyTask.startTaskThread();
            this.awaitInvokableLatch(task);
            spyTask.cancelExecution();
            Throwable fatalError = (Throwable)fatalErrorFuture.join();
            MatcherAssert.assertThat((Object)fatalError, (Matcher)CoreMatchers.instanceOf(fatalErrorType));
        }
        finally {
            this.triggerInvokableLatch(task);
        }
    }

    @Test
    public void testTaskConfig() throws Exception {
        long interval = 28218123L;
        long timeout = interval + 19292L;
        Configuration config = new Configuration();
        config.set(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, (Object)Duration.ofMillis(interval));
        config.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, (Object)Duration.ofMillis(timeout));
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setTaskCancellationInterval(interval + 1337L);
        executionConfig.setTaskCancellationTimeout(timeout - 1337L);
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setTaskManagerConfig(config).setExecutionConfig(executionConfig).build(Executors.directExecutor());
        Assert.assertEquals((long)interval, (long)task.getTaskCancellationInterval());
        Assert.assertEquals((long)timeout, (long)task.getTaskCancellationTimeout());
        task.startTaskThread();
        this.awaitInvokableLatch(task);
        Assert.assertEquals((long)executionConfig.getTaskCancellationInterval(), (long)task.getTaskCancellationInterval());
        Assert.assertEquals((long)executionConfig.getTaskCancellationTimeout(), (long)task.getTaskCancellationTimeout());
        task.getExecutingThread().interrupt();
        task.getExecutingThread().join();
    }

    @Test
    public void testTerminationFutureCompletesOnNormalExecution() throws Exception {
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingWithTrigger.class).setTaskManagerActions(new NoOpTaskManagerActions()).build(Executors.directExecutor());
        task.startTaskThread();
        this.awaitInvokableLatch(task);
        Assert.assertFalse((boolean)task.getTerminationFuture().isDone());
        this.triggerInvokableLatch(task);
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.FINISHED, task.getTerminationFuture().getNow(null));
    }

    @Test
    public void testTerminationFutureCompletesOnImmediateCancellation() throws Exception {
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setTaskManagerActions(new NoOpTaskManagerActions()).build(Executors.directExecutor());
        task.cancelExecution();
        Assert.assertFalse((boolean)task.getTerminationFuture().isDone());
        task.startTaskThread();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.CANCELED, task.getTerminationFuture().getNow(null));
    }

    @Test
    public void testTerminationFutureCompletesOnErrorInInvoke() throws Exception {
        Task task = this.createTaskBuilder().setInvokable(InvokableWithExceptionInInvoke.class).setTaskManagerActions(new NoOpTaskManagerActions()).build(Executors.directExecutor());
        task.startTaskThread();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.FAILED, task.getTerminationFuture().getNow(null));
    }

    @Test
    public void testNoBackPressureIfTaskNotStarted() throws Exception {
        Task task = this.createTaskBuilder().build(Executors.directExecutor());
        Assert.assertFalse((boolean)task.isBackPressured());
    }

    @Test
    public void testDeclineCheckpoint() throws Exception {
        TestCheckpointResponder testCheckpointResponder = new TestCheckpointResponder();
        Task task = this.createTaskBuilder().setInvokable(InvokableDecliningCheckpoints.class).setCheckpointResponder(testCheckpointResponder).build(Executors.directExecutor());
        this.assertCheckpointDeclined(task, testCheckpointResponder, 1L, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY);
        task.startTaskThread();
        try {
            this.awaitInvokableLatch(task);
            Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)task.getExecutionState());
            this.assertCheckpointDeclined(task, testCheckpointResponder, 2L, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_CLOSING);
            this.assertCheckpointDeclined(task, testCheckpointResponder, 3L, CheckpointFailureReason.TASK_FAILURE);
            this.assertCheckpointDeclined(task, testCheckpointResponder, 4L, CheckpointFailureReason.TASK_FAILURE);
        }
        finally {
            this.triggerInvokableLatch(task);
            task.getExecutingThread().join();
        }
        Assert.assertEquals((Object)ExecutionState.FINISHED, task.getTerminationFuture().getNow(null));
    }

    private void assertCheckpointDeclined(Task task, TestCheckpointResponder testCheckpointResponder, long checkpointId, CheckpointFailureReason failureReason) {
        CheckpointOptions checkpointOptions = CheckpointOptions.alignedNoTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault());
        task.triggerCheckpointBarrier(checkpointId, 1L, checkpointOptions);
        Assert.assertEquals((long)1L, (long)testCheckpointResponder.getDeclineReports().size());
        Assert.assertEquals((long)checkpointId, (long)testCheckpointResponder.getDeclineReports().get(0).getCheckpointId());
        Assert.assertEquals((Object)failureReason, (Object)testCheckpointResponder.getDeclineReports().get(0).getCause().getCheckpointFailureReason());
        testCheckpointResponder.clear();
    }

    private TaskInvokable waitForInvokable(Task task) throws Exception {
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> task.getInvokable() != null), 10L);
        return task.getInvokable();
    }

    private void awaitInvokableLatch(Task task) throws Exception {
        TaskInvokable taskInvokable = this.waitForInvokable(task);
        if (!(taskInvokable instanceof AwaitLatchInvokable)) {
            throw new Exception("Invokable doesn't implement class - " + AwaitLatchInvokable.class.getName());
        }
        ((AwaitLatchInvokable)taskInvokable).await();
    }

    private void triggerInvokableLatch(Task task) throws Exception {
        TaskInvokable taskInvokable = this.waitForInvokable(task);
        if (!(taskInvokable instanceof TriggerLatchInvokable)) {
            throw new Exception("Invokable doesn't implement class - " + TriggerLatchInvokable.class.getName());
        }
        ((TriggerLatchInvokable)taskInvokable).trigger();
    }

    private TestTaskBuilder createTaskBuilder() {
        return new TestTaskBuilder(this.shuffleEnvironment);
    }

    private static abstract class TriggerLatchInvokable
    extends AwaitLatchInvokable {
        final OneShotLatch triggerLatch = new OneShotLatch();

        public TriggerLatchInvokable(Environment environment) {
            super(environment);
        }

        void trigger() {
            this.triggerLatch.trigger();
        }

        void awaitTriggerLatch() {
            this.awaitLatch.trigger();
            while (true) {
                try {
                    this.triggerLatch.await();
                }
                catch (InterruptedException interruptedException) {
                    continue;
                }
                break;
            }
        }
    }

    private static abstract class AwaitLatchInvokable
    extends AbstractInvokable {
        final OneShotLatch awaitLatch = new OneShotLatch();

        public AwaitLatchInvokable(Environment environment) {
            super(environment);
        }

        void await() throws InterruptedException {
            this.awaitLatch.await();
        }
    }

    private static class TestWrappedException
    extends WrappingRuntimeException {
        private static final long serialVersionUID = 1L;

        TestWrappedException(@Nonnull Throwable cause) {
            super(cause);
        }
    }

    public static final class InvokableBlockingInInstantiation
    extends AbstractInvokable {
        static final OneShotLatch AWAIT_LATCH = new OneShotLatch();

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public InvokableBlockingInInstantiation(Environment environment) throws InterruptedException {
            super(environment);
            while (true) {
                InvokableBlockingInInstantiation invokableBlockingInInstantiation = this;
                synchronized (invokableBlockingInInstantiation) {
                    AWAIT_LATCH.trigger();
                    ((Object)((Object)this)).wait();
                }
            }
        }

        public void invoke() {
        }

        static void await() throws InterruptedException {
            AWAIT_LATCH.await();
        }
    }

    public static final class InvokableUnInterruptibleBlockingInvoke
    extends TriggerLatchInvokable {
        public InvokableUnInterruptibleBlockingInvoke(Environment environment) {
            super(environment);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() {
            while (!this.triggerLatch.isTriggered()) {
                try {
                    InvokableUnInterruptibleBlockingInvoke invokableUnInterruptibleBlockingInvoke = this;
                    synchronized (invokableUnInterruptibleBlockingInvoke) {
                        this.awaitLatch.trigger();
                        ((Object)((Object)this)).wait();
                    }
                }
                catch (InterruptedException interruptedException) {
                }
            }
        }

        public void cancel() {
        }
    }

    public static final class InvokableInterruptibleSharedLockInInvokeAndCancel
    extends TriggerLatchInvokable {
        private final Object lock = new Object();

        public InvokableInterruptibleSharedLockInInvokeAndCancel(Environment environment) {
            super(environment);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            while (!this.triggerLatch.isTriggered()) {
                Object object = this.lock;
                synchronized (object) {
                    this.awaitLatch.trigger();
                    this.lock.wait();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void cancel() {
            Object object = this.lock;
            synchronized (object) {
                this.triggerLatch.trigger();
            }
        }
    }

    public static final class InvokableBlockingInCancel
    extends TriggerLatchInvokable {
        public InvokableBlockingInCancel(Environment environment) {
            super(environment);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() {
            this.awaitLatch.trigger();
            try {
                this.triggerLatch.await();
                InvokableBlockingInCancel invokableBlockingInCancel = this;
                synchronized (invokableBlockingInCancel) {
                    ((Object)((Object)this)).wait();
                }
            }
            catch (InterruptedException ignored) {
                InvokableBlockingInCancel invokableBlockingInCancel = this;
                synchronized (invokableBlockingInCancel) {
                    ((Object)((Object)this)).notifyAll();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void cancel() throws Exception {
            InvokableBlockingInCancel invokableBlockingInCancel = this;
            synchronized (invokableBlockingInCancel) {
                this.triggerLatch.trigger();
                ((Object)((Object)this)).wait();
            }
        }
    }

    public static final class InvokableWithCancelTaskExceptionInInvoke
    extends TriggerLatchInvokable {
        public InvokableWithCancelTaskExceptionInInvoke(Environment environment) {
            super(environment);
        }

        public void invoke() {
            this.awaitTriggerLatch();
            throw new CancelTaskException();
        }
    }

    public static final class InvokableWithExceptionOnTrigger
    extends TriggerLatchInvokable {
        public InvokableWithExceptionOnTrigger(Environment environment) {
            super(environment);
        }

        public void invoke() {
            this.awaitTriggerLatch();
            throw new RuntimeException("test");
        }
    }

    private static final class InvokableBlockingInRestore
    extends AwaitLatchInvokable {
        public InvokableBlockingInRestore(Environment environment) {
            super(environment);
        }

        public void restore() throws Exception {
            this.awaitLatch.trigger();
            InvokableBlockingInRestore invokableBlockingInRestore = this;
            synchronized (invokableBlockingInRestore) {
                while (true) {
                    ((Object)((Object)this)).wait();
                }
            }
        }

        public void invoke() {
        }

        public void cleanUp(Throwable throwable) throws Exception {
            wasCleanedUp = true;
            super.cleanUp(throwable);
        }
    }

    private static final class InvokableBlockingInInvoke
    extends AwaitLatchInvokable {
        public InvokableBlockingInInvoke(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            this.awaitLatch.trigger();
            InvokableBlockingInInvoke invokableBlockingInInvoke = this;
            synchronized (invokableBlockingInInvoke) {
                while (true) {
                    ((Object)((Object)this)).wait();
                }
            }
        }
    }

    private static class InvokableDecliningCheckpoints
    extends InvokableBlockingWithTrigger {
        public static final int REJECTED_EXECUTION_CHECKPOINT_ID = 2;
        public static final int THROWING_CHECKPOINT_ID = 3;
        public static final int TRIGGERING_FAILED_CHECKPOINT_ID = 4;

        public InvokableDecliningCheckpoints(Environment environment) {
            super(environment);
        }

        public CompletableFuture<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
            long checkpointId = checkpointMetaData.getCheckpointId();
            switch (Math.toIntExact(checkpointId)) {
                case 2: {
                    throw new RejectedExecutionException();
                }
                case 3: {
                    CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
                    result.completeExceptionally(new ExpectedTestException());
                    return result;
                }
                case 4: {
                    return CompletableFuture.completedFuture(false);
                }
            }
            throw new UnsupportedOperationException("Unsupported checkpointId: " + checkpointId);
        }
    }

    private static class InvokableBlockingWithTrigger
    extends TriggerLatchInvokable {
        public InvokableBlockingWithTrigger(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            this.awaitLatch.trigger();
            this.triggerLatch.await();
        }
    }

    private static final class FailingInvokableWithChainedException
    extends AbstractInvokable {
        public FailingInvokableWithChainedException(Environment environment) {
            super(environment);
        }

        public void invoke() {
            throw new TestWrappedException(new IOException("test"));
        }

        public void cancel() {
        }
    }

    static final class InvokableWithExceptionInRestore
    extends AbstractInvokable {
        public InvokableWithExceptionInRestore(Environment environment) {
            super(environment);
        }

        public void restore() throws Exception {
            throw new Exception(TaskTest.RESTORE_EXCEPTION_MSG);
        }

        public void invoke() {
        }

        public void cleanUp(Throwable throwable) throws Exception {
            wasCleanedUp = true;
            super.cleanUp(throwable);
        }
    }

    private static final class InvokableWithExceptionInInvoke
    extends AbstractInvokable {
        public InvokableWithExceptionInInvoke(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            throw new Exception("test");
        }

        public void cleanUp(Throwable throwable) throws Exception {
            wasCleanedUp = true;
            super.cleanUp(throwable);
        }
    }

    private static abstract class InvokableNonInstantiable
    extends AbstractInvokable {
        public InvokableNonInstantiable(Environment environment) {
            super(environment);
        }
    }

    private static final class TestInvokableCorrect
    extends AbstractInvokable {
        public TestInvokableCorrect(Environment environment) {
            super(environment);
        }

        public void invoke() {
        }

        public void cancel() {
        }

        public void cleanUp(Throwable throwable) throws Exception {
            wasCleanedUp = true;
            super.cleanUp(throwable);
        }
    }

    private static class InterruptOnFatalErrorTaskManagerActions
    extends NoOpTaskManagerActions {
        private boolean fatalError = false;
        private Thread executingThread;

        private InterruptOnFatalErrorTaskManagerActions() {
        }

        @Override
        public void notifyFatalError(String message, Throwable cause) {
            this.fatalError = true;
            this.executingThread.interrupt();
        }

        public boolean hasFatalError() {
            return this.fatalError;
        }

        public void setExecutingThread(Thread executingThread) {
            this.executingThread = executingThread;
        }
    }

    private static class ProhibitFatalErrorTaskManagerActions
    extends NoOpTaskManagerActions {
        private ProhibitFatalErrorTaskManagerActions() {
        }

        @Override
        public void notifyFatalError(String message, Throwable cause) {
            throw new RuntimeException("Unexpected FatalError notification");
        }
    }

    private static class QueuedNoOpTaskManagerActions
    extends NoOpTaskManagerActions {
        private final BlockingQueue<TaskExecutionState> queue = new LinkedBlockingDeque<TaskExecutionState>();

        private QueuedNoOpTaskManagerActions() {
        }

        @Override
        public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
            Assert.assertTrue((boolean)this.queue.offer(taskExecutionState));
        }

        private void validateListenerMessage(ExecutionState state, Task task, Throwable error) {
            try {
                TaskExecutionState taskState = this.queue.take();
                Assert.assertNotNull((String)"There is no additional listener message", (Object)state);
                Assert.assertEquals((Object)task.getExecutionId(), (Object)taskState.getID());
                Assert.assertEquals((Object)state, (Object)taskState.getExecutionState());
                Throwable t = taskState.getError(this.getClass().getClassLoader());
                if (error == null) {
                    Assert.assertNull((Object)t);
                } else {
                    Assert.assertEquals((Object)error.toString(), (Object)t.toString());
                }
            }
            catch (InterruptedException e) {
                Assert.fail((String)"interrupted");
            }
        }
    }
}

