package org.apache.flink.runtime.executiongraph.failover.partitionrelease;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.executiongraph.failover.partitionrelease.PartitionGroupReleaseStrategy;
import org.apache.flink.runtime.scheduler.SchedulingTopologyListener;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/partitionrelease/RegionPartitionGroupReleaseStrategy.class */
public class RegionPartitionGroupReleaseStrategy implements PartitionGroupReleaseStrategy, SchedulingTopologyListener {
    private final SchedulingTopology schedulingTopology;
    private final Map<ExecutionVertexID, PipelinedRegionExecutionView> regionExecutionViewByVertex = new HashMap();
    private final Map<ConsumedPartitionGroup, ConsumerRegionGroupExecutionView> partitionGroupConsumerRegions = new HashMap();
    private final ConsumerRegionGroupExecutionViewMaintainer consumerRegionGroupExecutionViewMaintainer = new ConsumerRegionGroupExecutionViewMaintainer();

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/partitionrelease/RegionPartitionGroupReleaseStrategy$Factory.class */
    public static class Factory implements PartitionGroupReleaseStrategy.Factory {
        @Override // org.apache.flink.runtime.executiongraph.failover.partitionrelease.PartitionGroupReleaseStrategy.Factory
        public PartitionGroupReleaseStrategy createInstance(SchedulingTopology schedulingTopology) {
            return new RegionPartitionGroupReleaseStrategy(schedulingTopology);
        }
    }

    public RegionPartitionGroupReleaseStrategy(SchedulingTopology schedulingTopology) {
        this.schedulingTopology = (SchedulingTopology) Preconditions.checkNotNull(schedulingTopology);
        schedulingTopology.registerSchedulingTopologyListener(this);
        notifySchedulingTopologyUpdatedInternal(schedulingTopology.getAllPipelinedRegions());
    }

    private void initRegionExecutionViewByVertex(Iterable<? extends SchedulingPipelinedRegion> iterable) {
        for (SchedulingPipelinedRegion schedulingPipelinedRegion : iterable) {
            PipelinedRegionExecutionView pipelinedRegionExecutionView = new PipelinedRegionExecutionView(schedulingPipelinedRegion);
            Iterator<? extends SchedulingExecutionVertex> it = schedulingPipelinedRegion.getVertices().iterator();
            while (it.hasNext()) {
                this.regionExecutionViewByVertex.put(it.next().getId(), pipelinedRegionExecutionView);
            }
        }
    }

    private Iterable<ConsumerRegionGroupExecutionView> initPartitionGroupConsumerRegions(Iterable<? extends SchedulingPipelinedRegion> iterable) {
        ArrayList arrayList = new ArrayList();
        for (SchedulingPipelinedRegion schedulingPipelinedRegion : iterable) {
            Iterator<ConsumedPartitionGroup> it = schedulingPipelinedRegion.getAllReleaseBySchedulerConsumedPartitionGroups().iterator();
            while (it.hasNext()) {
                this.partitionGroupConsumerRegions.computeIfAbsent(it.next(), consumedPartitionGroup -> {
                    ConsumerRegionGroupExecutionView consumerRegionGroupExecutionView = new ConsumerRegionGroupExecutionView();
                    arrayList.add(consumerRegionGroupExecutionView);
                    return consumerRegionGroupExecutionView;
                }).add(schedulingPipelinedRegion);
            }
        }
        return arrayList;
    }

    @Override // org.apache.flink.runtime.executiongraph.failover.partitionrelease.PartitionGroupReleaseStrategy
    public List<ConsumedPartitionGroup> vertexFinished(ExecutionVertexID executionVertexID) {
        PipelinedRegionExecutionView pipelinedRegionExecutionViewForVertex = getPipelinedRegionExecutionViewForVertex(executionVertexID);
        pipelinedRegionExecutionViewForVertex.vertexFinished(executionVertexID);
        if (!pipelinedRegionExecutionViewForVertex.isFinished()) {
            return Collections.emptyList();
        }
        SchedulingPipelinedRegion pipelinedRegionOfVertex = this.schedulingTopology.getPipelinedRegionOfVertex(executionVertexID);
        this.consumerRegionGroupExecutionViewMaintainer.regionFinished(pipelinedRegionOfVertex);
        return filterReleasablePartitionGroups(pipelinedRegionOfVertex.getAllReleaseBySchedulerConsumedPartitionGroups());
    }

    @Override // org.apache.flink.runtime.executiongraph.failover.partitionrelease.PartitionGroupReleaseStrategy
    public void vertexUnfinished(ExecutionVertexID executionVertexID) {
        getPipelinedRegionExecutionViewForVertex(executionVertexID).vertexUnfinished(executionVertexID);
        this.consumerRegionGroupExecutionViewMaintainer.regionUnfinished(this.schedulingTopology.getPipelinedRegionOfVertex(executionVertexID));
    }

    private PipelinedRegionExecutionView getPipelinedRegionExecutionViewForVertex(ExecutionVertexID executionVertexID) {
        PipelinedRegionExecutionView pipelinedRegionExecutionView = this.regionExecutionViewByVertex.get(executionVertexID);
        Preconditions.checkState(pipelinedRegionExecutionView != null, "PipelinedRegionExecutionView not found for execution vertex %s", new Object[]{executionVertexID});
        return pipelinedRegionExecutionView;
    }

    private List<ConsumedPartitionGroup> filterReleasablePartitionGroups(Iterable<ConsumedPartitionGroup> iterable) {
        ArrayList arrayList = new ArrayList();
        for (ConsumedPartitionGroup consumedPartitionGroup : iterable) {
            if (this.partitionGroupConsumerRegions.get(consumedPartitionGroup).isFinished() && !consumedPartitionGroup.getResultPartitionType().isPersistent()) {
                arrayList.add(consumedPartitionGroup);
            }
        }
        return arrayList;
    }

    private void notifySchedulingTopologyUpdatedInternal(Iterable<? extends SchedulingPipelinedRegion> iterable) {
        initRegionExecutionViewByVertex(iterable);
        this.consumerRegionGroupExecutionViewMaintainer.notifyNewRegionGroupExecutionViews(initPartitionGroupConsumerRegions(iterable));
    }

    @VisibleForTesting
    public boolean isRegionOfVertexFinished(ExecutionVertexID executionVertexID) {
        return getPipelinedRegionExecutionViewForVertex(executionVertexID).isFinished();
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulingTopologyListener
    public void notifySchedulingTopologyUpdated(SchedulingTopology schedulingTopology, List<ExecutionVertexID> list) {
        Stream<ExecutionVertexID> stream = list.stream();
        schedulingTopology.getClass();
        notifySchedulingTopologyUpdatedInternal((Set) stream.map((v1) -> {
            return r1.getPipelinedRegionOfVertex(v1);
        }).collect(Collectors.toSet()));
    }
}
