package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtilTest.class */
class EdgeManagerBuildUtilTest {

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    EdgeManagerBuildUtilTest() {
    }

    @Test
    void testGetMaxNumEdgesToTargetInPointwiseConnection() throws Exception {
        testGetMaxNumEdgesToTarget(17, 17, DistributionPattern.POINTWISE);
        testGetMaxNumEdgesToTarget(17, 23, DistributionPattern.POINTWISE);
        testGetMaxNumEdgesToTarget(17, 34, DistributionPattern.POINTWISE);
        testGetMaxNumEdgesToTarget(34, 17, DistributionPattern.POINTWISE);
        testGetMaxNumEdgesToTarget(23, 17, DistributionPattern.POINTWISE);
    }

    @Test
    void testGetMaxNumEdgesToTargetInAllToAllConnection() throws Exception {
        testGetMaxNumEdgesToTarget(17, 17, DistributionPattern.ALL_TO_ALL);
        testGetMaxNumEdgesToTarget(17, 23, DistributionPattern.ALL_TO_ALL);
        testGetMaxNumEdgesToTarget(17, 34, DistributionPattern.ALL_TO_ALL);
        testGetMaxNumEdgesToTarget(34, 17, DistributionPattern.ALL_TO_ALL);
        testGetMaxNumEdgesToTarget(23, 17, DistributionPattern.ALL_TO_ALL);
    }

    @Test
    void testConnectAllToAll() throws Exception {
        ExecutionGraph executionGraph = setupExecutionGraph(3, 2, DistributionPattern.POINTWISE, true);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 2; i++) {
            arrayList.add(new ExecutionVertexInputInfo(i, new IndexRange(0, 3 - 1), new IndexRange(0, 0)));
        }
        JobVertexInputInfo jobVertexInputInfo = new JobVertexInputInfo(arrayList);
        Iterator it = executionGraph.getVerticesTopologically().iterator();
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) it.next();
        ExecutionJobVertex executionJobVertex2 = (ExecutionJobVertex) it.next();
        executionGraph.initializeJobVertex(executionJobVertex, 1L, Collections.emptyMap());
        executionGraph.initializeJobVertex(executionJobVertex2, 1L, Collections.singletonMap(executionJobVertex.getProducedDataSets()[0].getId(), jobVertexInputInfo));
        IntermediateResult intermediateResult = ((ExecutionJobVertex) Objects.requireNonNull(executionGraph.getJobVertex(executionJobVertex.getJobVertexId()))).getProducedDataSets()[0];
        IntermediateResultPartition intermediateResultPartition = intermediateResult.getPartitions()[0];
        IntermediateResultPartition intermediateResultPartition2 = intermediateResult.getPartitions()[1];
        IntermediateResultPartition intermediateResultPartition3 = intermediateResult.getPartitions()[2];
        ExecutionVertex executionVertex = executionJobVertex2.getTaskVertices()[0];
        ExecutionVertex executionVertex2 = executionJobVertex2.getTaskVertices()[1];
        ConsumerVertexGroup consumerVertexGroup = (ConsumerVertexGroup) intermediateResultPartition.getConsumerVertexGroups().get(0);
        Assertions.assertThat(consumerVertexGroup).containsExactlyInAnyOrder(new ExecutionVertexID[]{executionVertex.getID(), executionVertex2.getID()});
        Assertions.assertThat((Iterable) intermediateResultPartition2.getConsumerVertexGroups().get(0)).isEqualTo(consumerVertexGroup);
        Assertions.assertThat((Iterable) intermediateResultPartition3.getConsumerVertexGroups().get(0)).isEqualTo(consumerVertexGroup);
        ConsumedPartitionGroup consumedPartitionGroup = executionVertex.getConsumedPartitionGroup(0);
        Assertions.assertThat(consumedPartitionGroup).containsExactlyInAnyOrder(new IntermediateResultPartitionID[]{intermediateResultPartition.getPartitionId(), intermediateResultPartition2.getPartitionId(), intermediateResultPartition3.getPartitionId()});
        Assertions.assertThat(executionVertex2.getConsumedPartitionGroup(0)).isEqualTo(consumedPartitionGroup);
        Assertions.assertThat(consumerVertexGroup.getConsumedPartitionGroup()).isEqualTo(consumedPartitionGroup);
        Assertions.assertThat(consumedPartitionGroup.getConsumerVertexGroup()).isEqualTo(consumerVertexGroup);
    }

    @Test
    void testConnectPointwise() throws Exception {
        ExecutionGraph executionGraph = setupExecutionGraph(4, 4, DistributionPattern.POINTWISE, true);
        List asList = Arrays.asList(new IndexRange(0, 0), new IndexRange(0, 0), new IndexRange(1, 2), new IndexRange(3, 3));
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 4; i++) {
            arrayList.add(new ExecutionVertexInputInfo(i, (IndexRange) asList.get(i), new IndexRange(0, 0)));
        }
        JobVertexInputInfo jobVertexInputInfo = new JobVertexInputInfo(arrayList);
        Iterator it = executionGraph.getVerticesTopologically().iterator();
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) it.next();
        ExecutionJobVertex executionJobVertex2 = (ExecutionJobVertex) it.next();
        executionGraph.initializeJobVertex(executionJobVertex, 1L, Collections.emptyMap());
        executionGraph.initializeJobVertex(executionJobVertex2, 1L, Collections.singletonMap(executionJobVertex.getProducedDataSets()[0].getId(), jobVertexInputInfo));
        IntermediateResult intermediateResult = ((ExecutionJobVertex) Objects.requireNonNull(executionGraph.getJobVertex(executionJobVertex.getJobVertexId()))).getProducedDataSets()[0];
        IntermediateResultPartition intermediateResultPartition = intermediateResult.getPartitions()[0];
        IntermediateResultPartition intermediateResultPartition2 = intermediateResult.getPartitions()[1];
        IntermediateResultPartition intermediateResultPartition3 = intermediateResult.getPartitions()[2];
        IntermediateResultPartition intermediateResultPartition4 = intermediateResult.getPartitions()[3];
        ExecutionVertex executionVertex = executionJobVertex2.getTaskVertices()[0];
        ExecutionVertex executionVertex2 = executionJobVertex2.getTaskVertices()[1];
        ExecutionVertex executionVertex3 = executionJobVertex2.getTaskVertices()[2];
        ExecutionVertex executionVertex4 = executionJobVertex2.getTaskVertices()[3];
        ConsumerVertexGroup consumerVertexGroup = (ConsumerVertexGroup) intermediateResultPartition.getConsumerVertexGroups().get(0);
        ConsumerVertexGroup consumerVertexGroup2 = (ConsumerVertexGroup) intermediateResultPartition2.getConsumerVertexGroups().get(0);
        ConsumerVertexGroup consumerVertexGroup3 = (ConsumerVertexGroup) intermediateResultPartition4.getConsumerVertexGroups().get(0);
        Assertions.assertThat(consumerVertexGroup).containsExactlyInAnyOrder(new ExecutionVertexID[]{executionVertex.getID(), executionVertex2.getID()});
        Assertions.assertThat(consumerVertexGroup2).containsExactlyInAnyOrder(new ExecutionVertexID[]{executionVertex3.getID()});
        Assertions.assertThat((Iterable) intermediateResultPartition3.getConsumerVertexGroups().get(0)).isEqualTo(consumerVertexGroup2);
        Assertions.assertThat(consumerVertexGroup3).containsExactlyInAnyOrder(new ExecutionVertexID[]{executionVertex4.getID()});
        ConsumedPartitionGroup consumedPartitionGroup = executionVertex.getConsumedPartitionGroup(0);
        ConsumedPartitionGroup consumedPartitionGroup2 = executionVertex3.getConsumedPartitionGroup(0);
        ConsumedPartitionGroup consumedPartitionGroup3 = executionVertex4.getConsumedPartitionGroup(0);
        Assertions.assertThat(consumedPartitionGroup).containsExactlyInAnyOrder(new IntermediateResultPartitionID[]{intermediateResultPartition.getPartitionId()});
        Assertions.assertThat(executionVertex2.getConsumedPartitionGroup(0)).isEqualTo(consumedPartitionGroup);
        Assertions.assertThat(consumedPartitionGroup2).containsExactlyInAnyOrder(new IntermediateResultPartitionID[]{intermediateResultPartition2.getPartitionId(), intermediateResultPartition3.getPartitionId()});
        Assertions.assertThat(consumedPartitionGroup3).containsExactlyInAnyOrder(new IntermediateResultPartitionID[]{intermediateResultPartition4.getPartitionId()});
        Assertions.assertThat(consumerVertexGroup.getConsumedPartitionGroup()).isEqualTo(consumedPartitionGroup);
        Assertions.assertThat(consumedPartitionGroup.getConsumerVertexGroup()).isEqualTo(consumerVertexGroup);
        Assertions.assertThat(consumerVertexGroup2.getConsumedPartitionGroup()).isEqualTo(consumedPartitionGroup2);
        Assertions.assertThat(consumedPartitionGroup2.getConsumerVertexGroup()).isEqualTo(consumerVertexGroup2);
        Assertions.assertThat(consumerVertexGroup3.getConsumedPartitionGroup()).isEqualTo(consumedPartitionGroup3);
        Assertions.assertThat(consumedPartitionGroup3.getConsumerVertexGroup()).isEqualTo(consumerVertexGroup3);
    }

    private void testGetMaxNumEdgesToTarget(int i, int i2, DistributionPattern distributionPattern) throws Exception {
        Pair<ExecutionJobVertex, ExecutionJobVertex> pair = setupExecutionGraph(i, i2, distributionPattern);
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) pair.getLeft();
        ExecutionJobVertex executionJobVertex2 = (ExecutionJobVertex) pair.getRight();
        int computeMaxEdgesToTargetExecutionVertex = EdgeManagerBuildUtil.computeMaxEdgesToTargetExecutionVertex(i, i2, distributionPattern);
        int i3 = -1;
        for (ExecutionVertex executionVertex : executionJobVertex.getTaskVertices()) {
            Assertions.assertThat(executionVertex.getProducedPartitions()).hasSize(1);
            int size = ((ConsumerVertexGroup) ((IntermediateResultPartition) executionVertex.getProducedPartitions().values().iterator().next()).getConsumerVertexGroups().get(0)).size();
            if (size > i3) {
                i3 = size;
            }
        }
        Assertions.assertThat(i3).isEqualTo(computeMaxEdgesToTargetExecutionVertex);
        int computeMaxEdgesToTargetExecutionVertex2 = EdgeManagerBuildUtil.computeMaxEdgesToTargetExecutionVertex(i2, i, distributionPattern);
        int i4 = -1;
        for (ExecutionVertex executionVertex2 : executionJobVertex2.getTaskVertices()) {
            Assertions.assertThat(executionVertex2.getNumberOfInputs()).isOne();
            int size2 = executionVertex2.getConsumedPartitionGroup(0).size();
            if (size2 > i4) {
                i4 = size2;
            }
        }
        Assertions.assertThat(i4).isEqualTo(computeMaxEdgesToTargetExecutionVertex2);
    }

    private Pair<ExecutionJobVertex, ExecutionJobVertex> setupExecutionGraph(int i, int i2, DistributionPattern distributionPattern) throws Exception {
        Iterator it = setupExecutionGraph(i, i2, distributionPattern, false).getVerticesTopologically().iterator();
        return Pair.of(it.next(), it.next());
    }

    private ExecutionGraph setupExecutionGraph(int i, int i2, DistributionPattern distributionPattern, boolean z) throws Exception {
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        jobVertex.setParallelism(i);
        jobVertex2.setParallelism(i2);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex2.setInvokableClass(AbstractInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, distributionPattern, ResultPartitionType.PIPELINED);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2));
        TestingDefaultExecutionGraphBuilder vertexParallelismStore = TestingDefaultExecutionGraphBuilder.newBuilder().setVertexParallelismStore(IntermediateResultPartitionTest.computeVertexParallelismStoreConsideringDynamicGraph(arrayList, z, 128));
        DefaultExecutionGraph buildDynamicGraph = z ? vertexParallelismStore.buildDynamicGraph((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()) : vertexParallelismStore.build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        buildDynamicGraph.attachJobGraph(arrayList, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
        return buildDynamicGraph;
    }
}
