package org.apache.flink.runtime.scheduler.strategy;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategyTest.class */
public class VertexwiseSchedulingStrategyTest {
    private TestingSchedulerOperations testingSchedulerOperation;
    private static final int PARALLELISM = 2;
    private TestingSchedulingTopology testingSchedulingTopology;
    private List<TestingSchedulingExecutionVertex> source;
    private List<TestingSchedulingExecutionVertex> map;
    private List<TestingSchedulingExecutionVertex> sink;

    @Before
    public void setUp() {
        this.testingSchedulerOperation = new TestingSchedulerOperations();
        buildTopology();
    }

    private void buildTopology() {
        this.testingSchedulingTopology = new TestingSchedulingTopology();
        this.source = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.map = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.sink = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.testingSchedulingTopology.connectPointwise(this.source, this.map).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
        this.testingSchedulingTopology.connectAllToAll(this.map, this.sink).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
    }

    @Test
    public void testStartScheduling() {
        startScheduling(this.testingSchedulingTopology);
        ArrayList arrayList = new ArrayList();
        arrayList.add(Arrays.asList(this.source.get(0)));
        arrayList.add(Arrays.asList(this.source.get(1)));
        StrategyTestUtil.assertLatestScheduledVerticesAreEqualTo(arrayList, this.testingSchedulerOperation);
    }

    @Test
    public void testRestartTasks() {
        startScheduling(this.testingSchedulingTopology).restartTasks((Set) Stream.of((Object[]) new List[]{this.source, this.map, this.sink}).flatMap((v0) -> {
            return v0.stream();
        }).map((v0) -> {
            return v0.m550getId();
        }).collect(Collectors.toSet()));
        ArrayList arrayList = new ArrayList();
        arrayList.add(Arrays.asList(this.source.get(0)));
        arrayList.add(Arrays.asList(this.source.get(1)));
        StrategyTestUtil.assertLatestScheduledVerticesAreEqualTo(arrayList, this.testingSchedulerOperation);
    }

    @Test
    public void testOnExecutionStateChangeToFinished() {
        VertexwiseSchedulingStrategy startScheduling = startScheduling(this.testingSchedulingTopology);
        MatcherAssert.assertThat(this.testingSchedulerOperation.getScheduledVertices(), Matchers.hasSize(2));
        TestingSchedulingExecutionVertex testingSchedulingExecutionVertex = this.source.get(0);
        testingSchedulingExecutionVertex.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
        startScheduling.onExecutionStateChange(testingSchedulingExecutionVertex.m550getId(), ExecutionState.FINISHED);
        MatcherAssert.assertThat(this.testingSchedulerOperation.getScheduledVertices(), Matchers.hasSize(3));
        TestingSchedulingExecutionVertex testingSchedulingExecutionVertex2 = this.source.get(1);
        testingSchedulingExecutionVertex2.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
        startScheduling.onExecutionStateChange(testingSchedulingExecutionVertex2.m550getId(), ExecutionState.FINISHED);
        MatcherAssert.assertThat(this.testingSchedulerOperation.getScheduledVertices(), Matchers.hasSize(4));
        TestingSchedulingExecutionVertex testingSchedulingExecutionVertex3 = this.map.get(0);
        testingSchedulingExecutionVertex3.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
        startScheduling.onExecutionStateChange(testingSchedulingExecutionVertex3.m550getId(), ExecutionState.FINISHED);
        MatcherAssert.assertThat(this.testingSchedulerOperation.getScheduledVertices(), Matchers.hasSize(4));
        TestingSchedulingExecutionVertex testingSchedulingExecutionVertex4 = this.map.get(1);
        testingSchedulingExecutionVertex4.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
        startScheduling.onExecutionStateChange(testingSchedulingExecutionVertex4.m550getId(), ExecutionState.FINISHED);
        MatcherAssert.assertThat(this.testingSchedulerOperation.getScheduledVertices(), Matchers.hasSize(6));
        ArrayList arrayList = new ArrayList();
        arrayList.add(Arrays.asList(this.source.get(0)));
        arrayList.add(Arrays.asList(this.source.get(1)));
        arrayList.add(Arrays.asList(this.map.get(0)));
        arrayList.add(Arrays.asList(this.map.get(1)));
        arrayList.add(Arrays.asList(this.sink.get(0)));
        arrayList.add(Arrays.asList(this.sink.get(1)));
        StrategyTestUtil.assertLatestScheduledVerticesAreEqualTo(arrayList, this.testingSchedulerOperation);
    }

    @Test
    public void testUpdateStrategyWithAllToAll() {
        testUpdateStrategyOnTopologyUpdate(true);
    }

    @Test
    public void testUpdateStrategyWithPointWise() {
        testUpdateStrategyOnTopologyUpdate(false);
    }

    public void testUpdateStrategyOnTopologyUpdate(boolean z) {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> finish = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        VertexwiseSchedulingStrategy startScheduling = startScheduling(testingSchedulingTopology);
        List<TestingSchedulingExecutionVertex> finish2 = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        startScheduling.onExecutionStateChange(finish.get(0).m550getId(), ExecutionState.FINISHED);
        if (z) {
            testingSchedulingTopology.connectAllToAll(finish, finish2).withResultPartitionState(ResultPartitionState.CONSUMABLE).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
        } else {
            testingSchedulingTopology.connectPointwise(finish, finish2).withResultPartitionState(ResultPartitionState.CONSUMABLE).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
        }
        startScheduling.notifySchedulingTopologyUpdated(testingSchedulingTopology, (List) finish2.stream().map((v0) -> {
            return v0.m550getId();
        }).collect(Collectors.toList()));
        startScheduling.onExecutionStateChange(finish.get(1).m550getId(), ExecutionState.FINISHED);
        StrategyTestUtil.assertLatestScheduledVerticesAreEqualTo(Arrays.asList(Arrays.asList(finish.get(0)), Arrays.asList(finish.get(1)), Arrays.asList(finish2.get(0)), Arrays.asList(finish2.get(1))), this.testingSchedulerOperation);
    }

    VertexwiseSchedulingStrategy startScheduling(SchedulingTopology schedulingTopology) {
        VertexwiseSchedulingStrategy vertexwiseSchedulingStrategy = new VertexwiseSchedulingStrategy(this.testingSchedulerOperation, schedulingTopology);
        vertexwiseSchedulingStrategy.startScheduling();
        return vertexwiseSchedulingStrategy;
    }
}
