/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive.allocator;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
import org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotMatchingResolver;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotTaskExecutorWeight;
import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
import org.apache.flink.runtime.scheduler.loading.WeightLoadable;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkRuntimeException;

public enum TasksBalancedSlotMatchingResolver implements SlotMatchingResolver
{
    INSTANCE;


    @Override
    public Collection<JobSchedulingPlan.SlotAssignment> matchSlotSharingGroupWithSlots(Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroup> requestGroups, Collection<PhysicalSlot> freeSlots) {
        ArrayList<JobSchedulingPlan.SlotAssignment> slotAssignments = new ArrayList<JobSchedulingPlan.SlotAssignment>(requestGroups.size());
        Map<ResourceID, Set<PhysicalSlot>> slotsPerTaskExecutor = AllocatorUtil.getSlotsPerTaskExecutor(freeSlots);
        TreeMap<LoadingWeight, Set<PhysicalSlot>> loadingSlotsMap = TasksBalancedSlotMatchingResolver.getLoadingSlotsMap(freeSlots);
        for (SlotSharingSlotAllocator.ExecutionSlotSharingGroup requestGroup : WeightLoadable.sortByLoadingDescend(requestGroups)) {
            SlotTaskExecutorWeight<LoadingWeight> best = TasksBalancedSlotMatchingResolver.getTheBestSlotTaskExecutorLoading(loadingSlotsMap);
            slotAssignments.add(new JobSchedulingPlan.SlotAssignment(best.physicalSlot, requestGroup));
            LoadingWeight newLoading = ((LoadingWeight)best.taskExecutorWeight).merge(requestGroup.getLoading());
            TasksBalancedSlotMatchingResolver.updateSlotsPerTaskExecutor(slotsPerTaskExecutor, best);
            Set<PhysicalSlot> physicalSlots = slotsPerTaskExecutor.get(best.getResourceID());
            TasksBalancedSlotMatchingResolver.updateLoadingSlotsMap(loadingSlotsMap, best, physicalSlots, newLoading);
        }
        return slotAssignments;
    }

    private static void updateLoadingSlotsMap(Map<LoadingWeight, Set<PhysicalSlot>> loadingSlotsMap, SlotTaskExecutorWeight<LoadingWeight> best, Set<PhysicalSlot> slotsToAdjust, LoadingWeight newLoading) {
        Set<PhysicalSlot> physicalSlots = loadingSlotsMap.get(best.taskExecutorWeight);
        if (!CollectionUtil.isNullOrEmpty(physicalSlots)) {
            physicalSlots.remove(best.physicalSlot);
        }
        if (!CollectionUtil.isNullOrEmpty(slotsToAdjust) && !CollectionUtil.isNullOrEmpty(physicalSlots)) {
            physicalSlots.removeAll(slotsToAdjust);
        }
        if (CollectionUtil.isNullOrEmpty(physicalSlots)) {
            loadingSlotsMap.remove(best.taskExecutorWeight);
        }
        if (!CollectionUtil.isNullOrEmpty(slotsToAdjust)) {
            Set slotsOfNewKey = loadingSlotsMap.computeIfAbsent(newLoading, ignored -> CollectionUtil.newHashSetWithExpectedSize(slotsToAdjust.size()));
            slotsOfNewKey.addAll(slotsToAdjust);
        }
    }

    private static void updateSlotsPerTaskExecutor(Map<ResourceID, Set<PhysicalSlot>> slotsPerTaskExecutor, SlotTaskExecutorWeight<LoadingWeight> best) {
        Set<PhysicalSlot> slots = slotsPerTaskExecutor.get(best.getResourceID());
        if (Objects.nonNull(slots)) {
            slots.remove(best.physicalSlot);
        }
        if (CollectionUtil.isNullOrEmpty(slots)) {
            slotsPerTaskExecutor.remove(best.getResourceID());
        }
    }

    private static TreeMap<LoadingWeight, Set<PhysicalSlot>> getLoadingSlotsMap(final Collection<PhysicalSlot> slots) {
        return new TreeMap<LoadingWeight, Set<PhysicalSlot>>(){
            {
                HashSet slotsValue = CollectionUtil.newHashSetWithExpectedSize(slots.size());
                slotsValue.addAll(slots);
                this.put(DefaultLoadingWeight.EMPTY, slotsValue);
            }
        };
    }

    private static SlotTaskExecutorWeight<LoadingWeight> getTheBestSlotTaskExecutorLoading(TreeMap<LoadingWeight, Set<PhysicalSlot>> slotsByLoading) {
        Map.Entry<LoadingWeight, Set<PhysicalSlot>> firstEntry = slotsByLoading.firstEntry();
        if (firstEntry == null || firstEntry.getKey() == null || CollectionUtil.isNullOrEmpty((Collection)firstEntry.getValue())) {
            throw (FlinkRuntimeException)NO_SLOTS_EXCEPTION_GETTER.get();
        }
        return new SlotTaskExecutorWeight<LoadingWeight>(firstEntry.getKey(), firstEntry.getValue().iterator().next());
    }
}

