/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.AbstractSlotSharingStrategy;
import org.apache.flink.runtime.scheduler.ExecutionSlotSharingGroup;
import org.apache.flink.runtime.scheduler.SlotSharingStrategy;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class TaskBalancedPreferredSlotSharingStrategy
extends AbstractSlotSharingStrategy {
    public static final Logger LOG = LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class);

    TaskBalancedPreferredSlotSharingStrategy(SchedulingTopology topology, Set<SlotSharingGroup> slotSharingGroups, Set<CoLocationGroup> coLocationGroups) {
        super(topology, slotSharingGroups, coLocationGroups);
    }

    @Override
    protected Map<ExecutionVertexID, ExecutionSlotSharingGroup> computeExecutionSlotSharingGroups(SchedulingTopology schedulingTopology) {
        return new TaskBalancedExecutionSlotSharingGroupBuilder(schedulingTopology, this.logicalSlotSharingGroups, this.coLocationGroups).build();
    }

    private static class TaskBalancedExecutionSlotSharingGroupBuilder {
        private final SchedulingTopology topology;
        private final Map<JobVertexID, SlotSharingGroup> slotSharingGroupMap;
        private final Map<SlotSharingGroup, List<ExecutionSlotSharingGroup>> paralleledExecutionSlotSharingGroupsMap;
        private final Map<SlotSharingGroup, Integer> slotSharingGroupIndexMap;
        private final Map<ExecutionVertexID, ExecutionSlotSharingGroup> executionSlotSharingGroupMap;
        private final Map<JobVertexID, CoLocationGroup> coLocationGroupMap;
        private final Map<CoLocationConstraint, ExecutionSlotSharingGroup> constraintToExecutionSlotSharingGroupMap;

        private TaskBalancedExecutionSlotSharingGroupBuilder(SchedulingTopology topology, Set<SlotSharingGroup> slotSharingGroups, Set<CoLocationGroup> coLocationGroups) {
            this.topology = (SchedulingTopology)Preconditions.checkNotNull((Object)topology);
            this.coLocationGroupMap = new HashMap<JobVertexID, CoLocationGroup>();
            for (CoLocationGroup coLocationGroup : coLocationGroups) {
                for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) {
                    this.coLocationGroupMap.put(jobVertexId, coLocationGroup);
                }
            }
            this.constraintToExecutionSlotSharingGroupMap = new HashMap<CoLocationConstraint, ExecutionSlotSharingGroup>();
            this.paralleledExecutionSlotSharingGroupsMap = new HashMap<SlotSharingGroup, List<ExecutionSlotSharingGroup>>(slotSharingGroups.size());
            this.slotSharingGroupIndexMap = new HashMap<SlotSharingGroup, Integer>(slotSharingGroups.size());
            this.slotSharingGroupMap = new HashMap<JobVertexID, SlotSharingGroup>();
            this.executionSlotSharingGroupMap = new HashMap<ExecutionVertexID, ExecutionSlotSharingGroup>();
            for (SlotSharingGroup slotSharingGroup : slotSharingGroups) {
                for (JobVertexID jobVertexId : slotSharingGroup.getJobVertexIds()) {
                    this.slotSharingGroupMap.put(jobVertexId, slotSharingGroup);
                }
            }
        }

        private Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() {
            LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> allVertices = AbstractSlotSharingStrategy.getExecutionVertices(this.topology);
            this.initParalleledExecutionSlotSharingGroupsMap(allVertices);
            for (Map.Entry<JobVertexID, List<SchedulingExecutionVertex>> executionVertexInfos : allVertices.entrySet()) {
                JobVertexID jobVertexID = executionVertexInfos.getKey();
                List<SchedulingExecutionVertex> executionVertices = executionVertexInfos.getValue();
                SlotSharingGroup slotSharingGroup = this.slotSharingGroupMap.get(jobVertexID);
                if (!this.coLocationGroupMap.containsKey(jobVertexID)) {
                    this.allocateNonCoLocatedVertices(slotSharingGroup, executionVertices);
                    continue;
                }
                this.allocateCoLocatedVertices(slotSharingGroup, executionVertices);
            }
            return this.executionSlotSharingGroupMap;
        }

        private void initParalleledExecutionSlotSharingGroupsMap(LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> allVertices) {
            allVertices.entrySet().stream().map(jobVertexExecutionVertices -> Tuple2.of((Object)this.slotSharingGroupMap.get(jobVertexExecutionVertices.getKey()), (Object)((List)jobVertexExecutionVertices.getValue()).size())).collect(Collectors.groupingBy(tuple -> (SlotSharingGroup)tuple.f0, Collectors.summarizingInt(tuple -> (Integer)tuple.f1))).forEach((slotSharingGroup, statistics) -> {
                int slotNum = statistics.getMax();
                this.paralleledExecutionSlotSharingGroupsMap.put((SlotSharingGroup)slotSharingGroup, this.createExecutionSlotSharingGroups((SlotSharingGroup)slotSharingGroup, slotNum));
            });
        }

        private List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(SlotSharingGroup slotSharingGroup, int slotNum) {
            ArrayList<ExecutionSlotSharingGroup> executionSlotSharingGroups = new ArrayList<ExecutionSlotSharingGroup>(slotNum);
            for (int i = 0; i < slotNum; ++i) {
                ExecutionSlotSharingGroup executionSlotSharingGroup = new ExecutionSlotSharingGroup(slotSharingGroup);
                executionSlotSharingGroups.add(i, executionSlotSharingGroup);
                LOG.debug("Create {}th executionSlotSharingGroup {}.", (Object)i, (Object)executionSlotSharingGroup);
            }
            return executionSlotSharingGroups;
        }

        private void allocateCoLocatedVertices(SlotSharingGroup slotSharingGroup, List<SchedulingExecutionVertex> executionVertices) {
            List<ExecutionSlotSharingGroup> executionSlotSharingGroups = this.paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup);
            for (SchedulingExecutionVertex executionVertex : executionVertices) {
                CoLocationConstraint coLocationConstraint = this.getCoLocationConstraint(executionVertex);
                ExecutionSlotSharingGroup executionSlotSharingGroup = this.constraintToExecutionSlotSharingGroupMap.get(coLocationConstraint);
                if (Objects.isNull(executionSlotSharingGroup)) {
                    executionSlotSharingGroup = executionSlotSharingGroups.get(this.getLeastUtilizeSlotIndex(executionSlotSharingGroups, executionVertex));
                    this.constraintToExecutionSlotSharingGroupMap.put(coLocationConstraint, executionSlotSharingGroup);
                }
                this.addVertexToExecutionSlotSharingGroup(executionSlotSharingGroup, executionVertex);
            }
            int jobVertexParallel = executionVertices.size();
            if (!this.isMaxParallelism(jobVertexParallel, slotSharingGroup)) {
                int index = this.getLeastUtilizeSlotIndex(executionSlotSharingGroups, null);
                this.updateSlotRoundRobinIndexIfNeeded(jobVertexParallel, slotSharingGroup, index);
            }
        }

        private void allocateNonCoLocatedVertices(SlotSharingGroup slotSharingGroup, List<SchedulingExecutionVertex> executionVertices) {
            int jobVertexParallel = executionVertices.size();
            int index = this.getSlotRoundRobinIndex(jobVertexParallel, slotSharingGroup);
            List<ExecutionSlotSharingGroup> executionSlotSharingGroups = this.paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup);
            for (SchedulingExecutionVertex executionVertex : executionVertices) {
                this.addVertexToExecutionSlotSharingGroup(executionSlotSharingGroups.get(index), executionVertex);
                ++index;
                index %= executionSlotSharingGroups.size();
            }
            this.updateSlotRoundRobinIndexIfNeeded(executionVertices.size(), slotSharingGroup, index);
        }

        private void addVertexToExecutionSlotSharingGroup(ExecutionSlotSharingGroup executionSlotSharingGroup, SchedulingExecutionVertex executionVertex) {
            ExecutionVertexID executionVertexId = (ExecutionVertexID)executionVertex.getId();
            executionSlotSharingGroup.addVertex(executionVertexId);
            this.executionSlotSharingGroupMap.put(executionVertexId, executionSlotSharingGroup);
        }

        private CoLocationConstraint getCoLocationConstraint(SchedulingExecutionVertex sev) {
            JobVertexID jobVertexID = ((ExecutionVertexID)sev.getId()).getJobVertexId();
            int subtaskIndex = ((ExecutionVertexID)sev.getId()).getSubtaskIndex();
            return this.coLocationGroupMap.get(jobVertexID).getLocationConstraint(subtaskIndex);
        }

        private int getSlotRoundRobinIndex(int jobVertexParallelism, SlotSharingGroup slotSharingGroup) {
            boolean maxParallel = this.isMaxParallelism(jobVertexParallelism, slotSharingGroup);
            return maxParallel ? 0 : this.slotSharingGroupIndexMap.getOrDefault(slotSharingGroup, 0);
        }

        private void updateSlotRoundRobinIndexIfNeeded(int jobVertexParallelism, SlotSharingGroup slotSharingGroup, int nextIndex) {
            if (!this.isMaxParallelism(jobVertexParallelism, slotSharingGroup)) {
                this.slotSharingGroupIndexMap.put(slotSharingGroup, nextIndex);
            }
        }

        private boolean isMaxParallelism(int jobVertexParallelism, SlotSharingGroup slotSharingGroup) {
            List<ExecutionSlotSharingGroup> executionSlotSharingGroups = this.paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup);
            return jobVertexParallelism == executionSlotSharingGroups.size();
        }

        private int getLeastUtilizeSlotIndex(List<ExecutionSlotSharingGroup> executionSlotSharingGroups, @Nullable SchedulingExecutionVertex executionVertex) {
            int indexWithLeastExecutionVertices = 0;
            int leastExecutionVertices = Integer.MAX_VALUE;
            for (int index = 0; index < executionSlotSharingGroups.size(); ++index) {
                ExecutionSlotSharingGroup executionSlotSharingGroup = executionSlotSharingGroups.get(index);
                int executionVertices = executionSlotSharingGroup.getExecutionVertexIds().size();
                if (leastExecutionVertices <= executionVertices || !Objects.isNull(executionVertex) && !this.allocatable(executionSlotSharingGroup, executionVertex)) continue;
                indexWithLeastExecutionVertices = index;
                leastExecutionVertices = executionVertices;
            }
            return indexWithLeastExecutionVertices;
        }

        private boolean allocatable(ExecutionSlotSharingGroup executionSlotSharingGroup, @Nonnull SchedulingExecutionVertex executionVertex) {
            ExecutionVertexID executionVertexId = (ExecutionVertexID)executionVertex.getId();
            JobVertexID jobVertexId = executionVertexId.getJobVertexId();
            Set allocatedJobVertices = executionSlotSharingGroup.getExecutionVertexIds().stream().map(ExecutionVertexID::getJobVertexId).collect(Collectors.toSet());
            return !allocatedJobVertices.contains(jobVertexId);
        }
    }

    static class Factory
    implements SlotSharingStrategy.Factory {
        Factory() {
        }

        @Override
        public TaskBalancedPreferredSlotSharingStrategy create(SchedulingTopology topology, Set<SlotSharingGroup> slotSharingGroups, Set<CoLocationGroup> coLocationGroups) {
            return new TaskBalancedPreferredSlotSharingStrategy(topology, slotSharingGroups, coLocationGroups);
        }
    }
}

