/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.executiongraph.DefaultVertexAttemptNumberStore;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.VertexAttemptNumberStore;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
import org.apache.flink.runtime.jobmaster.TestUtils;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo;
import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.shuffle.ShuffleTestUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.traces.SpanBuilder;
import org.apache.flink.util.clock.SystemClock;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class DefaultExecutionGraphFactoryTest {
    private static final Logger log = LoggerFactory.getLogger(DefaultExecutionGraphFactoryTest.class);
    @TempDir
    private File tempDir;
    private File temporaryFile;
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();

    DefaultExecutionGraphFactoryTest() {
    }

    @BeforeEach
    private void setup() {
        this.temporaryFile = new File(this.tempDir.getAbsolutePath(), "stateFile");
    }

    @Test
    void testRestoringModifiedJobFromSavepointFails() throws Exception {
        JobGraph jobGraphWithNewOperator = this.createJobGraphWithSavepoint(false, 42L, 1);
        ExecutionGraphFactory executionGraphFactory = this.createExecutionGraphFactory();
        ((AbstractThrowableAssert)((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> executionGraphFactory.createAndRestoreExecutionGraph(jobGraphWithNewOperator, (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1), new CheckpointsCleaner(), (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN, 0L, (VertexAttemptNumberStore)new DefaultVertexAttemptNumberStore(), SchedulerBase.computeVertexParallelismStore((JobGraph)jobGraphWithNewOperator), (execution, previousState, newState) -> {}, rp -> false, log)).withFailMessage("Expected ExecutionGraph creation to fail because of non restored state.", new Object[0])).isInstanceOf(Exception.class)).hasMessageContaining("Failed to rollback to checkpoint/savepoint");
    }

    @Test
    void testRestoringModifiedJobFromSavepointWithAllowNonRestoredStateSucceeds() throws Exception {
        long savepointId = 42L;
        JobGraph jobGraphWithNewOperator = this.createJobGraphWithSavepoint(true, 42L, 1);
        ExecutionGraphFactory executionGraphFactory = this.createExecutionGraphFactory();
        StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        executionGraphFactory.createAndRestoreExecutionGraph(jobGraphWithNewOperator, (CompletedCheckpointStore)completedCheckpointStore, new CheckpointsCleaner(), (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN, 0L, (VertexAttemptNumberStore)new DefaultVertexAttemptNumberStore(), SchedulerBase.computeVertexParallelismStore((JobGraph)jobGraphWithNewOperator), (execution, previousState, newState) -> {}, rp -> false, log);
        CompletedCheckpoint savepoint = completedCheckpointStore.getLatestCheckpoint();
        Assertions.assertThat((Object)savepoint).isNotNull();
        Assertions.assertThat((long)savepoint.getCheckpointID()).isEqualTo(42L);
    }

    @Test
    void testCheckpointStatsTrackerUpdatedWithNewParallelism() throws Exception {
        long savepointId = 42L;
        JobGraph jobGraphWithParallelism2 = this.createJobGraphWithSavepoint(true, 42L, 2);
        final ArrayList spans = new ArrayList();
        ExecutionGraphFactory executionGraphFactory = this.createExecutionGraphFactory((JobManagerJobMetricGroup)new UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup(){

            public void addSpan(SpanBuilder spanBuilder) {
                spans.add(spanBuilder.build());
            }
        });
        StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        ExecutionGraph executionGraph = executionGraphFactory.createAndRestoreExecutionGraph(jobGraphWithParallelism2, (CompletedCheckpointStore)completedCheckpointStore, new CheckpointsCleaner(), (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN, 0L, (VertexAttemptNumberStore)new DefaultVertexAttemptNumberStore(), vertexId -> new DefaultVertexParallelismInfo(1, 1337, integer -> Optional.empty()), (execution, previousState, newState) -> {}, rp -> false, log);
        CheckpointStatsTracker checkpointStatsTracker = executionGraph.getCheckpointStatsTracker();
        checkpointStatsTracker.reportInitializationStartTs(SystemClock.getInstance().absoluteTimeMillis());
        checkpointStatsTracker.reportRestoredCheckpoint(42L, CheckpointProperties.forSavepoint((boolean)false, (SavepointFormatType)SavepointFormatType.NATIVE), "foo", 1337L);
        checkpointStatsTracker.reportInitializationMetrics(new SubTaskInitializationMetricsBuilder(SystemClock.getInstance().absoluteTimeMillis()).build());
        Assertions.assertThat(spans).hasSize(1);
    }

    @Nonnull
    private ExecutionGraphFactory createExecutionGraphFactory() {
        return this.createExecutionGraphFactory(UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
    }

    @Nonnull
    private ExecutionGraphFactory createExecutionGraphFactory(JobManagerJobMetricGroup metricGroup) {
        DefaultExecutionGraphFactory executionGraphFactory = new DefaultExecutionGraphFactory(new Configuration(), ClassLoader.getSystemClassLoader(), (ExecutionDeploymentTracker)new DefaultExecutionDeploymentTracker(), (ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor(), (Executor)EXECUTOR_EXTENSION.getExecutor(), Time.milliseconds((long)0L), metricGroup, (BlobWriter)VoidBlobWriter.getInstance(), ShuffleTestUtils.DEFAULT_SHUFFLE_MASTER, (JobMasterPartitionTracker)NoOpJobMasterPartitionTracker.INSTANCE);
        return executionGraphFactory;
    }

    @Nonnull
    private JobGraph createJobGraphWithSavepoint(boolean allowNonRestoredState, long savepointId, int parallelism) throws IOException {
        OperatorID operatorID = new OperatorID();
        File savepointFile = TestUtils.createSavepointWithOperatorState(this.temporaryFile, savepointId, operatorID);
        SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath((String)savepointFile.getAbsolutePath(), (boolean)allowNonRestoredState);
        JobVertex jobVertex = new JobVertex("New operator");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(parallelism);
        return TestUtils.createJobGraphFromJobVerticesWithCheckpointing(savepointRestoreSettings, jobVertex);
    }
}

