/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.strategy;

import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.InputConsumableDecider;
import org.apache.flink.runtime.scheduler.strategy.ResultPartitionState;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;

public class DefaultInputConsumableDecider
implements InputConsumableDecider {
    private final Function<IntermediateResultPartitionID, SchedulingResultPartition> resultPartitionRetriever;
    private final Function<ExecutionVertexID, Boolean> scheduledVertexRetriever;

    DefaultInputConsumableDecider(Function<ExecutionVertexID, Boolean> scheduledVertexRetriever, Function<IntermediateResultPartitionID, SchedulingResultPartition> resultPartitionRetriever) {
        this.scheduledVertexRetriever = scheduledVertexRetriever;
        this.resultPartitionRetriever = resultPartitionRetriever;
    }

    @Override
    public boolean isInputConsumable(SchedulingExecutionVertex executionVertex, Set<ExecutionVertexID> verticesToSchedule, Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
        for (ConsumedPartitionGroup consumedPartitionGroup : executionVertex.getConsumedPartitionGroups()) {
            if (consumableStatusCache.computeIfAbsent(consumedPartitionGroup, group -> this.isConsumedPartitionGroupConsumable((ConsumedPartitionGroup)group, verticesToSchedule)).booleanValue()) continue;
            return false;
        }
        return true;
    }

    @Override
    public boolean isConsumableBasedOnFinishedProducers(ConsumedPartitionGroup consumedPartitionGroup) {
        if (consumedPartitionGroup.getResultPartitionType().canBePipelinedConsumed()) {
            return false;
        }
        return consumedPartitionGroup.areAllPartitionsFinished();
    }

    private boolean isConsumedPartitionGroupConsumable(ConsumedPartitionGroup consumedPartitionGroup, Set<ExecutionVertexID> verticesToSchedule) {
        if (consumedPartitionGroup.getResultPartitionType().canBePipelinedConsumed()) {
            for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
                ExecutionVertexID producerVertex = (ExecutionVertexID)((SchedulingExecutionVertex)this.resultPartitionRetriever.apply(partitionId).getProducer()).getId();
                if (verticesToSchedule.contains(producerVertex) || this.scheduledVertexRetriever.apply(producerVertex).booleanValue()) continue;
                return false;
            }
        } else {
            for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
                if (this.resultPartitionRetriever.apply(partitionId).getState() == ResultPartitionState.ALL_DATA_PRODUCED) continue;
                return false;
            }
        }
        return true;
    }

    public static class Factory
    implements InputConsumableDecider.Factory {
        public static final InputConsumableDecider.Factory INSTANCE = new Factory();

        private Factory() {
        }

        @Override
        public InputConsumableDecider createInstance(SchedulingTopology schedulingTopology, Function<ExecutionVertexID, Boolean> scheduledVertexRetriever) {
            return new DefaultInputConsumableDecider(scheduledVertexRetriever, schedulingTopology::getResultPartition);
        }
    }
}

