/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive.allocator;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.TaskBalancedExecutionSlotSharingGroupBuilder;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingResolver;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;

@Internal
public enum TaskBalancedSlotSharingResolver implements SlotSharingResolver
{
    INSTANCE;


    public List<SlotSharingSlotAllocator.ExecutionSlotSharingGroup> getExecutionSlotSharingGroups(JobInformation jobInformation, VertexParallelism vertexParallelism) {
        return new TaskBalancedExecutionSlotSharingGroupBuilder(TaskBalancedSlotSharingResolver.getAllVertices(jobInformation, vertexParallelism), jobInformation.getSlotSharingGroups(), jobInformation.getCoLocationGroups()).build().values().stream().distinct().map(fromGroup -> new SlotSharingSlotAllocator.ExecutionSlotSharingGroup(fromGroup.getSlotSharingGroup(), fromGroup.getExecutionVertexIds())).collect(Collectors.toList());
    }

    static Map<JobVertexID, List<ExecutionVertexID>> getAllVertices(JobInformation jobInformation, VertexParallelism vertexParallelism) {
        HashMap<JobVertexID, List<ExecutionVertexID>> jobVertexToExecutionVertices = new HashMap<JobVertexID, List<ExecutionVertexID>>();
        for (SlotSharingGroup slotSharingGroup : jobInformation.getSlotSharingGroups()) {
            slotSharingGroup.getJobVertexIds().forEach(jobVertexId -> {
                int parallelism = vertexParallelism.getParallelism((JobVertexID)jobVertexId);
                for (int subtaskIdx = 0; subtaskIdx < parallelism; ++subtaskIdx) {
                    jobVertexToExecutionVertices.computeIfAbsent((JobVertexID)jobVertexId, ignored -> new ArrayList()).add(new ExecutionVertexID((JobVertexID)jobVertexId, subtaskIdx));
                }
            });
        }
        return jobVertexToExecutionVertices;
    }
}

