/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.scheduler.ExecutionGraphToInputsLocationsRetrieverAdapter;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.IterableUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ExecutionGraphToInputsLocationsRetrieverAdapterTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();

    ExecutionGraphToInputsLocationsRetrieverAdapterTest() {
    }

    @Test
    void testGetConsumedPartitionGroupsAndProducers() throws Exception {
        JobVertex producer1 = ExecutionGraphTestUtils.createNoOpVertex(1);
        JobVertex producer2 = ExecutionGraphTestUtils.createNoOpVertex(1);
        JobVertex consumer = ExecutionGraphTestUtils.createNoOpVertex(1);
        IntermediateDataSet dataSet1 = consumer.connectNewDataSetAsInput(producer1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED).getSource();
        IntermediateDataSet dataSet2 = consumer.connectNewDataSetAsInput(producer2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED).getSource();
        DefaultExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph((ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor(), producer1, producer2, consumer);
        ExecutionGraphToInputsLocationsRetrieverAdapter inputsLocationsRetriever = new ExecutionGraphToInputsLocationsRetrieverAdapter((ExecutionGraph)eg);
        ExecutionVertexID evIdOfProducer1 = new ExecutionVertexID(producer1.getID(), 0);
        ExecutionVertexID evIdOfProducer2 = new ExecutionVertexID(producer2.getID(), 0);
        ExecutionVertexID evIdOfConsumer = new ExecutionVertexID(consumer.getID(), 0);
        Collection consumedPartitionGroupsOfProducer1 = inputsLocationsRetriever.getConsumedPartitionGroups(evIdOfProducer1);
        Collection consumedPartitionGroupsOfProducer2 = inputsLocationsRetriever.getConsumedPartitionGroups(evIdOfProducer2);
        Collection consumedPartitionGroupsOfConsumer = inputsLocationsRetriever.getConsumedPartitionGroups(evIdOfConsumer);
        IntermediateResultPartitionID partitionId1 = new IntermediateResultPartitionID(dataSet1.getId(), 0);
        IntermediateResultPartitionID partitionId2 = new IntermediateResultPartitionID(dataSet2.getId(), 0);
        Assertions.assertThat((Collection)consumedPartitionGroupsOfProducer1).isEmpty();
        Assertions.assertThat((Collection)consumedPartitionGroupsOfProducer2).isEmpty();
        Assertions.assertThat((Collection)consumedPartitionGroupsOfConsumer).hasSize(2);
        Assertions.assertThat((Collection)consumedPartitionGroupsOfConsumer.stream().flatMap(IterableUtils::toStream).collect(Collectors.toSet())).containsExactlyInAnyOrder((Object[])new IntermediateResultPartitionID[]{partitionId1, partitionId2});
        for (ConsumedPartitionGroup consumedPartitionGroup : consumedPartitionGroupsOfConsumer) {
            if (consumedPartitionGroup.getFirst().equals((Object)partitionId1)) {
                Assertions.assertThat((Collection)inputsLocationsRetriever.getProducersOfConsumedPartitionGroup(consumedPartitionGroup)).containsExactly((Object[])new ExecutionVertexID[]{evIdOfProducer1});
                continue;
            }
            Assertions.assertThat((Collection)inputsLocationsRetriever.getProducersOfConsumedPartitionGroup(consumedPartitionGroup)).containsExactly((Object[])new ExecutionVertexID[]{evIdOfProducer2});
        }
    }

    @Test
    void testGetEmptyTaskManagerLocationIfVertexNotScheduled() throws Exception {
        JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
        DefaultExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph((ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor(), jobVertex);
        ExecutionGraphToInputsLocationsRetrieverAdapter inputsLocationsRetriever = new ExecutionGraphToInputsLocationsRetrieverAdapter((ExecutionGraph)eg);
        ExecutionVertexID executionVertexId = new ExecutionVertexID(jobVertex.getID(), 0);
        Optional taskManagerLocation = inputsLocationsRetriever.getTaskManagerLocation(executionVertexId);
        Assertions.assertThat((Optional)taskManagerLocation).isNotPresent();
    }

    @Test
    void testGetTaskManagerLocationWhenScheduled() throws Exception {
        JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
        TestingLogicalSlot testingLogicalSlot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
        DefaultExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph((ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor(), jobVertex);
        ExecutionGraphToInputsLocationsRetrieverAdapter inputsLocationsRetriever = new ExecutionGraphToInputsLocationsRetrieverAdapter((ExecutionGraph)eg);
        ExecutionVertex onlyExecutionVertex = (ExecutionVertex)eg.getAllExecutionVertices().iterator().next();
        onlyExecutionVertex.getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED);
        onlyExecutionVertex.deployToSlot((LogicalSlot)testingLogicalSlot);
        ExecutionVertexID executionVertexId = new ExecutionVertexID(jobVertex.getID(), 0);
        Optional taskManagerLocationOptional = inputsLocationsRetriever.getTaskManagerLocation(executionVertexId);
        Assertions.assertThat((Optional)taskManagerLocationOptional).isPresent();
        CompletableFuture taskManagerLocationFuture = (CompletableFuture)taskManagerLocationOptional.get();
        Assertions.assertThat((Comparable)((Comparable)taskManagerLocationFuture.get())).isEqualTo((Object)testingLogicalSlot.getTaskManagerLocation());
    }

    @Test
    void testGetNonExistingExecutionVertexWillThrowException() throws Exception {
        JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
        DefaultExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph((ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor(), jobVertex);
        ExecutionGraphToInputsLocationsRetrieverAdapter inputsLocationsRetriever = new ExecutionGraphToInputsLocationsRetrieverAdapter((ExecutionGraph)eg);
        ExecutionVertexID invalidExecutionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
        Assertions.assertThatThrownBy(() -> inputsLocationsRetriever.getTaskManagerLocation(invalidExecutionVertexId), (String)"Should throw exception if execution vertex doesn't exist!", (Object[])new Object[0]).isInstanceOf(IllegalStateException.class);
    }
}

