package org.apache.flink.streaming.runtime.tasks;

import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.runtime.tasks.StreamTaskITCase;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.CompletableFutureAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.class */
class SynchronousCheckpointTest {
    private StreamTaskUnderTest streamTaskUnderTest;
    private CompletableFuture<Void> taskInvocation;
    private LinkedBlockingQueue<Event> eventQueue = new LinkedBlockingQueue<>();

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest$Event.class */
    private enum Event {
        TASK_INITIALIZED
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest$StreamTaskUnderTest.class */
    public static class StreamTaskUnderTest extends StreamTaskITCase.NoOpStreamTask {
        private Queue<Event> eventQueue;
        private volatile boolean stopped;

        StreamTaskUnderTest(Environment environment, Queue<Event> queue) throws Exception {
            super(environment);
            this.eventQueue = (Queue) Preconditions.checkNotNull(queue);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.flink.streaming.runtime.tasks.StreamTaskITCase.NoOpStreamTask
        public void init() {
            this.eventQueue.add(Event.TASK_INITIALIZED);
        }

        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            if (this.stopped || isCanceled()) {
                controller.suspendDefaultAction();
                this.mailboxProcessor.suspend();
            }
        }

        void stopTask() {
            this.stopped = true;
        }
    }

    SynchronousCheckpointTest() {
    }

    @BeforeEach
    void setupTestEnvironment() throws InterruptedException {
        this.taskInvocation = CompletableFuture.runAsync(() -> {
            try {
                this.streamTaskUnderTest = createTask(this.eventQueue);
                this.streamTaskUnderTest.invoke();
            } catch (RuntimeException e) {
                throw e;
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        }, Executors.newSingleThreadExecutor());
        Assertions.assertThat(this.eventQueue.take()).isEqualTo(Event.TASK_INITIALIZED);
    }

    @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
    @Test
    void cancelShouldAlsoCancelPendingSynchronousCheckpoint() throws Throwable {
        launchSynchronousSavepointAndWaitForSyncSavepointIdToBeSet();
        Assertions.assertThat(this.streamTaskUnderTest.getSynchronousSavepointId()).isPresent();
        this.streamTaskUnderTest.cancel();
        waitUntilMainExecutionThreadIsFinished();
        Assertions.assertThat(this.streamTaskUnderTest.isCanceled()).isTrue();
    }

    private void launchSynchronousSavepointAndWaitForSyncSavepointIdToBeSet() throws InterruptedException {
        this.streamTaskUnderTest.triggerCheckpointAsync(new CheckpointMetaData(42L, System.currentTimeMillis()), new CheckpointOptions(SavepointType.suspend(SavepointFormatType.CANONICAL), CheckpointStorageLocationReference.getDefault()));
        waitForSyncSavepointIdToBeSet(this.streamTaskUnderTest);
    }

    private void waitUntilMainExecutionThreadIsFinished() {
        CompletableFuture<Void> completableFuture = this.taskInvocation;
        completableFuture.getClass();
        Assertions.assertThatThrownBy(completableFuture::get).hasCauseInstanceOf(CancelTaskException.class);
    }

    private void waitForSyncSavepointIdToBeSet(StreamTask streamTask) throws InterruptedException {
        while (!streamTask.getSynchronousSavepointId().isPresent()) {
            Thread.sleep(10L);
            ((CompletableFutureAssert) Assertions.assertThat(this.taskInvocation).as("Task has been terminated too early", new Object[0])).isNotDone();
        }
    }

    private static StreamTaskUnderTest createTask(Queue<Event> queue) throws Exception {
        return new StreamTaskUnderTest(new DummyEnvironment("test", 1, 0), queue);
    }
}
