/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.OptionalLong;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskITCase;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.CompletableFutureAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

class SynchronousCheckpointTest {
    private StreamTaskUnderTest streamTaskUnderTest;
    private CompletableFuture<Void> taskInvocation;
    private LinkedBlockingQueue<Event> eventQueue = new LinkedBlockingQueue();

    SynchronousCheckpointTest() {
    }

    @BeforeEach
    void setupTestEnvironment() throws InterruptedException {
        this.taskInvocation = CompletableFuture.runAsync(() -> {
            try {
                this.streamTaskUnderTest = SynchronousCheckpointTest.createTask(this.eventQueue);
                this.streamTaskUnderTest.invoke();
            }
            catch (RuntimeException e) {
                throw e;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, Executors.newSingleThreadExecutor());
        Assertions.assertThat((Comparable)((Object)this.eventQueue.take())).isEqualTo((Object)Event.TASK_INITIALIZED);
    }

    @Timeout(value=10000L, unit=TimeUnit.MILLISECONDS)
    @Test
    void cancelShouldAlsoCancelPendingSynchronousCheckpoint() throws Throwable {
        this.launchSynchronousSavepointAndWaitForSyncSavepointIdToBeSet();
        Assertions.assertThat((OptionalLong)this.streamTaskUnderTest.getSynchronousSavepointId()).isPresent();
        this.streamTaskUnderTest.cancel();
        this.waitUntilMainExecutionThreadIsFinished();
        Assertions.assertThat((boolean)this.streamTaskUnderTest.isCanceled()).isTrue();
    }

    private void launchSynchronousSavepointAndWaitForSyncSavepointIdToBeSet() throws InterruptedException {
        this.streamTaskUnderTest.triggerCheckpointAsync(new CheckpointMetaData(42L, System.currentTimeMillis()), new CheckpointOptions((SnapshotType)SavepointType.suspend((SavepointFormatType)SavepointFormatType.CANONICAL), CheckpointStorageLocationReference.getDefault()));
        this.waitForSyncSavepointIdToBeSet(this.streamTaskUnderTest);
    }

    private void waitUntilMainExecutionThreadIsFinished() {
        Assertions.assertThatThrownBy(this.taskInvocation::get).hasCauseInstanceOf(CancelTaskException.class);
    }

    private void waitForSyncSavepointIdToBeSet(StreamTask streamTaskUnderTest) throws InterruptedException {
        while (!streamTaskUnderTest.getSynchronousSavepointId().isPresent()) {
            Thread.sleep(10L);
            ((CompletableFutureAssert)Assertions.assertThat(this.taskInvocation).as("Task has been terminated too early", new Object[0])).isNotDone();
        }
    }

    private static StreamTaskUnderTest createTask(Queue<Event> eventQueue) throws Exception {
        DummyEnvironment environment = new DummyEnvironment("test", 1, 0);
        return new StreamTaskUnderTest((Environment)environment, eventQueue);
    }

    private static class StreamTaskUnderTest
    extends StreamTaskITCase.NoOpStreamTask {
        private Queue<Event> eventQueue;
        private volatile boolean stopped;

        StreamTaskUnderTest(Environment env, Queue<Event> eventQueue) throws Exception {
            super(env);
            this.eventQueue = (Queue)Preconditions.checkNotNull(eventQueue);
        }

        @Override
        protected void init() {
            this.eventQueue.add(Event.TASK_INITIALIZED);
        }

        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            if (this.stopped || this.isCanceled()) {
                controller.suspendDefaultAction();
                this.mailboxProcessor.suspend();
            }
        }

        void stopTask() {
            this.stopped = true;
        }
    }

    private static enum Event {
        TASK_INITIALIZED;

    }
}

