package org.apache.flink.runtime.executiongraph;

import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.RegionPartitionGroupReleaseStrategy;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/RegionPartitionGroupReleaseStrategyTest.class */
public class RegionPartitionGroupReleaseStrategyTest extends TestLogger {
    private TestingSchedulingTopology testingSchedulingTopology;

    @Before
    public void setUp() throws Exception {
        this.testingSchedulingTopology = new TestingSchedulingTopology();
    }

    @Test
    public void releasePartitionsIfDownstreamRegionIsFinished() {
        List<TestingSchedulingExecutionVertex> finish = this.testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingExecutionVertex> finish2 = this.testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingResultPartition> finish3 = this.testingSchedulingTopology.connectPointwise(finish, finish2).finish();
        MatcherAssert.assertThat(getReleasablePartitions(new RegionPartitionGroupReleaseStrategy(this.testingSchedulingTopology), finish2.get(0).m457getId()), Matchers.contains(new IntermediateResultPartitionID[]{finish3.get(0).m459getId()}));
    }

    @Test
    public void releasePartitionsIfDownstreamRegionWithMultipleOperatorsIsFinished() {
        List<TestingSchedulingExecutionVertex> finish = this.testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingExecutionVertex> finish2 = this.testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingExecutionVertex> finish3 = this.testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingResultPartition> finish4 = this.testingSchedulingTopology.connectAllToAll(finish, finish2).finish();
        this.testingSchedulingTopology.connectAllToAll(finish2, finish3).withResultPartitionType(ResultPartitionType.PIPELINED).finish();
        ExecutionVertexID m457getId = finish2.get(0).m457getId();
        ExecutionVertexID m457getId2 = finish3.get(0).m457getId();
        IntermediateResultPartitionID m459getId = finish4.get(0).m459getId();
        RegionPartitionGroupReleaseStrategy regionPartitionGroupReleaseStrategy = new RegionPartitionGroupReleaseStrategy(this.testingSchedulingTopology);
        regionPartitionGroupReleaseStrategy.vertexFinished(m457getId);
        MatcherAssert.assertThat(getReleasablePartitions(regionPartitionGroupReleaseStrategy, m457getId2), Matchers.contains(new IntermediateResultPartitionID[]{m459getId}));
    }

    @Test
    public void notReleasePartitionsIfDownstreamRegionIsNotFinished() {
        List<TestingSchedulingExecutionVertex> finish = this.testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingExecutionVertex> finish2 = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.testingSchedulingTopology.connectAllToAll(finish, finish2).finish();
        MatcherAssert.assertThat(getReleasablePartitions(new RegionPartitionGroupReleaseStrategy(this.testingSchedulingTopology), finish2.get(0).m457getId()), Matchers.is(Matchers.empty()));
    }

    @Test
    public void toggleVertexFinishedUnfinished() {
        List<TestingSchedulingExecutionVertex> finish = this.testingSchedulingTopology.addExecutionVertices().finish();
        List<TestingSchedulingExecutionVertex> finish2 = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.testingSchedulingTopology.connectAllToAll(finish, finish2).finish();
        ExecutionVertexID m457getId = finish2.get(0).m457getId();
        ExecutionVertexID m457getId2 = finish2.get(1).m457getId();
        RegionPartitionGroupReleaseStrategy regionPartitionGroupReleaseStrategy = new RegionPartitionGroupReleaseStrategy(this.testingSchedulingTopology);
        regionPartitionGroupReleaseStrategy.vertexFinished(m457getId);
        regionPartitionGroupReleaseStrategy.vertexFinished(m457getId2);
        regionPartitionGroupReleaseStrategy.vertexUnfinished(m457getId2);
        MatcherAssert.assertThat(getReleasablePartitions(regionPartitionGroupReleaseStrategy, m457getId), Matchers.is(Matchers.empty()));
    }

    private static List<IntermediateResultPartitionID> getReleasablePartitions(RegionPartitionGroupReleaseStrategy regionPartitionGroupReleaseStrategy, ExecutionVertexID executionVertexID) {
        return (List) regionPartitionGroupReleaseStrategy.vertexFinished(executionVertexID).stream().flatMap((v0) -> {
            return IterableUtils.toStream(v0);
        }).collect(Collectors.toList());
    }
}
