package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.SpeculativeExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.TestingInternalFailuresListener;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertexTest.class */
public class SpeculativeExecutionVertexTest {

    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private TestingInternalFailuresListener internalFailuresListener;

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertexTest$TestInputSource.class */
    private class TestInputSource extends GenericInputFormat<Integer> {
        private GenericInputSplit[] splits;

        private TestInputSource() {
        }

        /* renamed from: createInputSplits, reason: merged with bridge method [inline-methods] */
        public GenericInputSplit[] m127createInputSplits(int i) {
            int i2 = i * 10;
            this.splits = new GenericInputSplit[i2];
            for (int i3 = 0; i3 < i2; i3++) {
                this.splits[i3] = new GenericInputSplit(i3, i2);
            }
            return this.splits;
        }

        public boolean reachedEnd() throws IOException {
            return false;
        }

        public Integer nextRecord(Integer num) throws IOException {
            return null;
        }
    }

    SpeculativeExecutionVertexTest() {
    }

    @BeforeEach
    void setUp() {
        this.internalFailuresListener = new TestingInternalFailuresListener();
    }

    @Test
    void testCreateSpeculativeExecution() throws Exception {
        SpeculativeExecutionVertex createSpeculativeExecutionVertex = createSpeculativeExecutionVertex();
        Assertions.assertThat(createSpeculativeExecutionVertex.getCurrentExecutions()).hasSize(1);
        createSpeculativeExecutionVertex.createNewSpeculativeExecution(System.currentTimeMillis());
        Assertions.assertThat(createSpeculativeExecutionVertex.getCurrentExecutions()).hasSize(2);
    }

    @Test
    void testResetExecutionVertex() throws Exception {
        SpeculativeExecutionVertex createSpeculativeExecutionVertex = createSpeculativeExecutionVertex();
        Execution currentExecutionAttempt = createSpeculativeExecutionVertex.getCurrentExecutionAttempt();
        Execution createNewSpeculativeExecution = createSpeculativeExecutionVertex.createNewSpeculativeExecution(System.currentTimeMillis());
        currentExecutionAttempt.transitionState(ExecutionState.RUNNING);
        currentExecutionAttempt.markFinished();
        createNewSpeculativeExecution.cancel();
        createSpeculativeExecutionVertex.resetForNewExecution();
        Assertions.assertThat(((ArchivedExecution) createSpeculativeExecutionVertex.getExecutionHistory().getHistoricalExecution(0).get()).getAttemptId()).isEqualTo(currentExecutionAttempt.getAttemptId());
        Assertions.assertThat(((ArchivedExecution) createSpeculativeExecutionVertex.getExecutionHistory().getHistoricalExecution(1).get()).getAttemptId()).isEqualTo(createNewSpeculativeExecution.getAttemptId());
        Assertions.assertThat(createSpeculativeExecutionVertex.getCurrentExecutions()).hasSize(1);
        Assertions.assertThat(createSpeculativeExecutionVertex.getCurrentExecutionAttempt().getAttemptNumber()).isEqualTo(2);
    }

    @Test
    void testCancel() throws Exception {
        SpeculativeExecutionVertex createSpeculativeExecutionVertex = createSpeculativeExecutionVertex();
        Execution currentExecutionAttempt = createSpeculativeExecutionVertex.getCurrentExecutionAttempt();
        Execution createNewSpeculativeExecution = createSpeculativeExecutionVertex.createNewSpeculativeExecution(System.currentTimeMillis());
        createSpeculativeExecutionVertex.cancel();
        Assertions.assertThat(currentExecutionAttempt.getState()).isSameAs(ExecutionState.CANCELED);
        Assertions.assertThat(createNewSpeculativeExecution.getState()).isSameAs(ExecutionState.CANCELED);
    }

    @Test
    void testSuspend() throws Exception {
        SpeculativeExecutionVertex createSpeculativeExecutionVertex = createSpeculativeExecutionVertex();
        Execution currentExecutionAttempt = createSpeculativeExecutionVertex.getCurrentExecutionAttempt();
        Execution createNewSpeculativeExecution = createSpeculativeExecutionVertex.createNewSpeculativeExecution(System.currentTimeMillis());
        createSpeculativeExecutionVertex.suspend();
        Assertions.assertThat(currentExecutionAttempt.getState()).isSameAs(ExecutionState.CANCELED);
        Assertions.assertThat(createNewSpeculativeExecution.getState()).isSameAs(ExecutionState.CANCELED);
    }

    @Test
    void testFail() throws Exception {
        SpeculativeExecutionVertex createSpeculativeExecutionVertex = createSpeculativeExecutionVertex();
        Execution currentExecutionAttempt = createSpeculativeExecutionVertex.getCurrentExecutionAttempt();
        Execution createNewSpeculativeExecution = createSpeculativeExecutionVertex.createNewSpeculativeExecution(System.currentTimeMillis());
        createSpeculativeExecutionVertex.fail(new Exception("Forced test failure."));
        Assertions.assertThat(this.internalFailuresListener.getFailedTasks()).containsExactly(new ExecutionAttemptID[]{currentExecutionAttempt.getAttemptId(), createNewSpeculativeExecution.getAttemptId()});
    }

    @Test
    void testMarkFailed() throws Exception {
        SpeculativeExecutionVertex createSpeculativeExecutionVertex = createSpeculativeExecutionVertex();
        Execution currentExecutionAttempt = createSpeculativeExecutionVertex.getCurrentExecutionAttempt();
        Execution createNewSpeculativeExecution = createSpeculativeExecutionVertex.createNewSpeculativeExecution(System.currentTimeMillis());
        createSpeculativeExecutionVertex.markFailed(new Exception("Forced test failure."));
        Assertions.assertThat(this.internalFailuresListener.getFailedTasks()).containsExactly(new ExecutionAttemptID[]{currentExecutionAttempt.getAttemptId(), createNewSpeculativeExecution.getAttemptId()});
    }

    @Test
    void testVertexTerminationAndJobTermination() throws Exception {
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
        ExecutionGraph createExecutionGraph = createExecutionGraph(JobGraphTestUtils.batchJobGraph(createNoOpVertex));
        createExecutionGraph.transitionToRunning();
        SpeculativeExecutionVertex speculativeExecutionVertex = createExecutionGraph.getJobVertex(createNoOpVertex.getID()).getTaskVertices()[0];
        Execution currentExecutionAttempt = speculativeExecutionVertex.getCurrentExecutionAttempt();
        Execution createNewSpeculativeExecution = speculativeExecutionVertex.createNewSpeculativeExecution(System.currentTimeMillis());
        CompletableFuture terminationFuture = speculativeExecutionVertex.getTerminationFuture();
        currentExecutionAttempt.transitionState(ExecutionState.RUNNING);
        currentExecutionAttempt.markFinished();
        Assertions.assertThat(terminationFuture).isNotDone();
        Assertions.assertThat(createExecutionGraph.getState()).isSameAs(JobStatus.RUNNING);
        createNewSpeculativeExecution.cancel();
        Assertions.assertThat(terminationFuture).isDone();
        Assertions.assertThat(createExecutionGraph.getState()).isSameAs(JobStatus.FINISHED);
    }

    @Test
    void testArchiveFailedExecutions() throws Exception {
        SpeculativeExecutionVertex createSpeculativeExecutionVertex = createSpeculativeExecutionVertex();
        Execution currentExecutionAttempt = createSpeculativeExecutionVertex.getCurrentExecutionAttempt();
        currentExecutionAttempt.transitionState(ExecutionState.RUNNING);
        Execution createNewSpeculativeExecution = createSpeculativeExecutionVertex.createNewSpeculativeExecution(0L);
        createNewSpeculativeExecution.transitionState(ExecutionState.FAILED);
        createSpeculativeExecutionVertex.archiveFailedExecution(createNewSpeculativeExecution.getAttemptId());
        Assertions.assertThat(createSpeculativeExecutionVertex.getCurrentExecutions()).hasSize(1);
        Assertions.assertThat(createSpeculativeExecutionVertex.currentExecution).isSameAs(currentExecutionAttempt);
        Execution createNewSpeculativeExecution2 = createSpeculativeExecutionVertex.createNewSpeculativeExecution(0L);
        createNewSpeculativeExecution2.transitionState(ExecutionState.RUNNING);
        currentExecutionAttempt.transitionState(ExecutionState.FAILED);
        createSpeculativeExecutionVertex.archiveFailedExecution(currentExecutionAttempt.getAttemptId());
        Assertions.assertThat(createSpeculativeExecutionVertex.getCurrentExecutions()).hasSize(1);
        Assertions.assertThat(createSpeculativeExecutionVertex.currentExecution).isSameAs(createNewSpeculativeExecution2);
    }

    @Test
    void testArchiveTheOnlyCurrentExecution() throws Exception {
        SpeculativeExecutionVertex createSpeculativeExecutionVertex = createSpeculativeExecutionVertex();
        Execution currentExecutionAttempt = createSpeculativeExecutionVertex.getCurrentExecutionAttempt();
        currentExecutionAttempt.transitionState(ExecutionState.FAILED);
        createSpeculativeExecutionVertex.archiveFailedExecution(currentExecutionAttempt.getAttemptId());
        Assertions.assertThat(createSpeculativeExecutionVertex.getCurrentExecutions()).hasSize(1);
        Assertions.assertThat(createSpeculativeExecutionVertex.currentExecution).isSameAs(currentExecutionAttempt);
    }

    @Test
    void testArchiveNonFailedExecutionWithArchiveFailedExecutionMethod() {
        org.junit.jupiter.api.Assertions.assertThrows(IllegalStateException.class, () -> {
            SpeculativeExecutionVertex createSpeculativeExecutionVertex = createSpeculativeExecutionVertex();
            createSpeculativeExecutionVertex.getCurrentExecutionAttempt().transitionState(ExecutionState.FAILED);
            Execution createNewSpeculativeExecution = createSpeculativeExecutionVertex.createNewSpeculativeExecution(0L);
            createNewSpeculativeExecution.transitionState(ExecutionState.RUNNING);
            createSpeculativeExecutionVertex.archiveFailedExecution(createNewSpeculativeExecution.getAttemptId());
        });
    }

    @Test
    void testGetExecutionState() throws Exception {
        SpeculativeExecutionVertex createSpeculativeExecutionVertex = createSpeculativeExecutionVertex();
        createSpeculativeExecutionVertex.getCurrentExecutionAttempt().transitionState(ExecutionState.CANCELED);
        Assertions.assertThat(createSpeculativeExecutionVertex.getExecutionState()).isSameAs(ExecutionState.CANCELED);
        ArrayList<ExecutionState> arrayList = new ArrayList();
        arrayList.add(ExecutionState.FAILED);
        arrayList.add(ExecutionState.CANCELING);
        arrayList.add(ExecutionState.CREATED);
        arrayList.add(ExecutionState.SCHEDULED);
        arrayList.add(ExecutionState.DEPLOYING);
        arrayList.add(ExecutionState.INITIALIZING);
        arrayList.add(ExecutionState.RUNNING);
        arrayList.add(ExecutionState.FINISHED);
        for (ExecutionState executionState : arrayList) {
            createSpeculativeExecutionVertex.createNewSpeculativeExecution(0L).transitionState(executionState);
            Assertions.assertThat(createSpeculativeExecutionVertex.getExecutionState()).isSameAs(executionState);
        }
    }

    @Test
    void testGetNextInputSplit() throws Exception {
        TestInputSource testInputSource = new TestInputSource();
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
        createNoOpVertex.setInputSplitSource(testInputSource);
        SpeculativeExecutionVertex createSpeculativeExecutionVertex = createSpeculativeExecutionVertex(createNoOpVertex);
        for (int i = 0; i < 2; i++) {
            createSpeculativeExecutionVertex.createNewSpeculativeExecution(0L);
        }
        ArrayList arrayList = new ArrayList(createSpeculativeExecutionVertex.getCurrentExecutions());
        HashMap hashMap = new HashMap();
        Random random = new Random();
        while (arrayList.size() > 0) {
            int nextInt = random.nextInt(arrayList.size());
            Execution execution = (Execution) arrayList.get(nextInt);
            Optional nextInputSplit = execution.getNextInputSplit();
            if (nextInputSplit.isPresent()) {
                ((List) hashMap.computeIfAbsent(Integer.valueOf(execution.getAttemptNumber()), num -> {
                    return new ArrayList();
                })).add(nextInputSplit.get());
            } else {
                arrayList.remove(nextInt);
            }
        }
        Assertions.assertThat(hashMap).hasSize(3);
        Assertions.assertThat((List) hashMap.get(0)).containsExactlyInAnyOrder(testInputSource.splits);
        Assertions.assertThat((List) hashMap.get(1)).isEqualTo(hashMap.get(0));
        Assertions.assertThat((List) hashMap.get(2)).isEqualTo(hashMap.get(0));
    }

    private SpeculativeExecutionVertex createSpeculativeExecutionVertex() throws Exception {
        return createSpeculativeExecutionVertex(ExecutionGraphTestUtils.createNoOpVertex(1));
    }

    private SpeculativeExecutionVertex createSpeculativeExecutionVertex(JobVertex jobVertex) throws Exception {
        return createExecutionGraph(JobGraphTestUtils.batchJobGraph(jobVertex)).getJobVertex(jobVertex.getID()).getTaskVertices()[0];
    }

    private ExecutionGraph createExecutionGraph(JobGraph jobGraph) throws Exception {
        DefaultExecutionGraph build = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setExecutionJobVertexFactory(new SpeculativeExecutionJobVertex.Factory()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        build.setInternalTaskFailuresListener(this.internalFailuresListener);
        build.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        return build;
    }
}
