/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolFactory;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService;
import org.apache.flink.runtime.jobmaster.slotpool.FreeSlotTracker;
import org.apache.flink.runtime.jobmaster.slotpool.PendingRequest;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.RequestSlotMatchingStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.concurrent.FutureUtils;

public class DeclarativeSlotPoolBridge
extends DeclarativeSlotPoolService
implements SlotPool {
    private final Map<SlotRequestId, PendingRequest> pendingRequests;
    private final Map<SlotRequestId, AllocationID> fulfilledRequests;
    private final Duration idleSlotTimeout;
    private final RequestSlotMatchingStrategy requestSlotMatchingStrategy;
    private final Duration batchSlotTimeout;
    private boolean isBatchSlotRequestTimeoutCheckDisabled;
    private boolean isJobRestarting = false;
    private final boolean slotBatchAllocatable;

    public DeclarativeSlotPoolBridge(JobID jobId, DeclarativeSlotPoolFactory declarativeSlotPoolFactory, Clock clock, Duration rpcTimeout, Duration idleSlotTimeout, Duration batchSlotTimeout, RequestSlotMatchingStrategy requestSlotMatchingStrategy, Duration slotRequestMaxInterval, boolean slotBatchAllocatable, @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) {
        super(jobId, declarativeSlotPoolFactory, clock, idleSlotTimeout, rpcTimeout, slotRequestMaxInterval, componentMainThreadExecutor);
        this.idleSlotTimeout = idleSlotTimeout;
        this.batchSlotTimeout = Preconditions.checkNotNull(batchSlotTimeout);
        this.log.debug("Using the request slot matching strategy: {}", (Object)requestSlotMatchingStrategy.getClass().getSimpleName());
        this.requestSlotMatchingStrategy = requestSlotMatchingStrategy;
        this.slotBatchAllocatable = slotBatchAllocatable;
        this.isBatchSlotRequestTimeoutCheckDisabled = false;
        this.pendingRequests = new LinkedHashMap<SlotRequestId, PendingRequest>();
        this.fulfilledRequests = new HashMap<SlotRequestId, AllocationID>();
    }

    @Override
    public <T> Optional<T> castInto(Class<T> clazz) {
        if (clazz.isAssignableFrom(this.getClass())) {
            return Optional.of(clazz.cast(this));
        }
        return Optional.empty();
    }

    @Override
    protected void onStart() {
        this.getDeclarativeSlotPool().registerNewSlotsListener(this::newSlotsAreAvailable);
        this.componentMainThreadExecutor.schedule(this::checkIdleSlotTimeout, this.idleSlotTimeout.toMillis(), TimeUnit.MILLISECONDS);
        this.componentMainThreadExecutor.schedule(this::checkBatchSlotTimeout, this.batchSlotTimeout.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    protected void onClose() {
        FlinkException cause = new FlinkException("Closing slot pool");
        this.cancelPendingRequests(request -> true, cause);
    }

    @Override
    public void setIsJobRestarting(boolean isJobRestarting) {
        this.isJobRestarting = isJobRestarting;
    }

    @Override
    public Collection<SlotOffer> offerSlots(TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, Collection<SlotOffer> offers) {
        this.assertHasBeenStarted();
        if (!this.isTaskManagerRegistered(taskManagerLocation.getResourceID())) {
            this.log.debug("Ignoring offered slots from unknown task manager {}.", (Object)taskManagerLocation.getResourceID());
            return Collections.emptyList();
        }
        if (this.isJobRestarting) {
            return this.getDeclarativeSlotPool().registerSlots(offers, taskManagerLocation, taskManagerGateway, this.getRelativeTimeMillis());
        }
        return this.getDeclarativeSlotPool().offerSlots(offers, taskManagerLocation, taskManagerGateway, this.getRelativeTimeMillis());
    }

    private void cancelPendingRequests(Predicate<PendingRequest> requestPredicate, FlinkException cancelCause) {
        ResourceCounter decreasedResourceRequirements = ResourceCounter.empty();
        ArrayList<PendingRequest> pendingRequestsToFail = new ArrayList<PendingRequest>(this.pendingRequests.values());
        this.pendingRequests.clear();
        for (PendingRequest pendingRequest : pendingRequestsToFail) {
            if (requestPredicate.test(pendingRequest)) {
                pendingRequest.failRequest(cancelCause);
                decreasedResourceRequirements = decreasedResourceRequirements.add(pendingRequest.getResourceProfile(), 1);
                continue;
            }
            this.pendingRequests.put(pendingRequest.getSlotRequestId(), pendingRequest);
        }
        this.getDeclarativeSlotPool().decreaseResourceRequirementsBy(decreasedResourceRequirements);
    }

    @Override
    protected void onReleaseTaskManager(ResourceCounter previouslyFulfilledRequirement) {
        this.getDeclarativeSlotPool().decreaseResourceRequirementsBy(previouslyFulfilledRequirement);
    }

    @VisibleForTesting
    void newSlotsAreAvailable(Collection<? extends PhysicalSlot> newSlots) {
        if (this.pendingRequests.isEmpty()) {
            return;
        }
        if (this.slotBatchAllocatable) {
            this.newSlotsAvailableForSlotBatchAllocatable(newSlots);
        } else {
            this.newSlotsAvailableForDirectlyAllocatable(newSlots);
        }
    }

    private void newSlotsAvailableForSlotBatchAllocatable(Collection<? extends PhysicalSlot> newSlots) {
        this.log.debug("Received new available slots: {}", newSlots);
        FreeSlotTracker freeSlotInfoTracker = this.getDeclarativeSlotPool().getFreeSlotTracker();
        int slotsNum = freeSlotInfoTracker.getAvailableSlots().size();
        if (slotsNum < this.pendingRequests.size()) {
            this.log.debug("The number of available slots: {}, the required number of slots: {}, waiting for more available slots.", (Object)slotsNum, (Object)this.pendingRequests.size());
            return;
        }
        Collection<PhysicalSlot> availableSlots = freeSlotInfoTracker.getFreeSlotsInformation();
        Collection<RequestSlotMatchingStrategy.RequestSlotMatch> requestSlotMatches = this.requestSlotMatchingStrategy.matchRequestsAndSlots(availableSlots, this.pendingRequests.values());
        if (requestSlotMatches.size() == this.pendingRequests.size()) {
            this.reserveAndFulfillMatchedFreeSlots(requestSlotMatches);
        } else if (requestSlotMatches.size() < this.pendingRequests.size()) {
            this.log.debug("Ignored the matched results: {}, pendingRequests: {}, waiting for more available slots.", requestSlotMatches, this.pendingRequests);
        } else {
            throw new IllegalStateException("The number of matched slots is not equals to the pendingRequests.");
        }
    }

    private void newSlotsAvailableForDirectlyAllocatable(Collection<? extends PhysicalSlot> newSlots) {
        Collection<RequestSlotMatchingStrategy.RequestSlotMatch> requestSlotMatches = this.requestSlotMatchingStrategy.matchRequestsAndSlots(newSlots, this.pendingRequests.values());
        this.reserveAndFulfillMatchedFreeSlots(requestSlotMatches);
    }

    private void reserveAndFulfillMatchedFreeSlots(Collection<RequestSlotMatchingStrategy.RequestSlotMatch> requestSlotMatches) {
        PhysicalSlot slot;
        PendingRequest pendingRequest;
        for (RequestSlotMatchingStrategy.RequestSlotMatch match : requestSlotMatches) {
            pendingRequest = match.getPendingRequest();
            slot = match.getSlot();
            this.log.debug("Matched pending request {} with slot {}.", (Object)pendingRequest, (Object)slot);
            Preconditions.checkNotNull(this.pendingRequests.remove(pendingRequest.getSlotRequestId()), "Cannot fulfill a non existing pending slot request.");
            this.reserveFreeSlot(pendingRequest.getSlotRequestId(), slot.getAllocationId(), pendingRequest.getResourceProfile());
        }
        for (RequestSlotMatchingStrategy.RequestSlotMatch requestSlotMatch : requestSlotMatches) {
            pendingRequest = requestSlotMatch.getPendingRequest();
            slot = requestSlotMatch.getSlot();
            Preconditions.checkState(pendingRequest.fulfill(slot), "Pending requests must be fulfillable.");
        }
    }

    @VisibleForTesting
    Collection<PhysicalSlot> getFreeSlotsInformation() {
        return this.getDeclarativeSlotPool().getFreeSlotTracker().getFreeSlotsInformation();
    }

    private void reserveFreeSlot(SlotRequestId slotRequestId, AllocationID allocationId, ResourceProfile resourceProfile) {
        this.log.debug("Reserve slot {} for slot request id {}", (Object)allocationId, (Object)slotRequestId);
        this.getDeclarativeSlotPool().reserveFreeSlot(allocationId, resourceProfile);
        this.fulfilledRequests.put(slotRequestId, allocationId);
    }

    @Override
    public Optional<PhysicalSlot> allocateAvailableSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull AllocationID allocationID, @Nonnull ResourceProfile requirementProfile) {
        this.assertRunningInMainThread();
        Preconditions.checkNotNull(requirementProfile, "The requiredSlotProfile must not be null.");
        this.log.debug("Reserving free slot {} for slot request id {} and profile {}.", new Object[]{allocationID, slotRequestId, requirementProfile});
        return Optional.of(this.reserveFreeSlotForResource(slotRequestId, allocationID, requirementProfile));
    }

    private PhysicalSlot reserveFreeSlotForResource(SlotRequestId slotRequestId, AllocationID allocationId, ResourceProfile requiredSlotProfile) {
        this.getDeclarativeSlotPool().increaseResourceRequirementsBy(ResourceCounter.withResource(requiredSlotProfile, 1));
        PhysicalSlot physicalSlot = this.getDeclarativeSlotPool().reserveFreeSlot(allocationId, requiredSlotProfile);
        this.fulfilledRequests.put(slotRequestId, allocationId);
        return physicalSlot;
    }

    @Override
    @Nonnull
    public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile, @Nonnull Collection<AllocationID> preferredAllocations, @Nullable Duration timeout) {
        this.assertRunningInMainThread();
        this.log.debug("Request new allocated slot with slot request id {} and resource profile {}", (Object)slotRequestId, (Object)resourceProfile);
        PendingRequest pendingRequest = PendingRequest.createNormalRequest(slotRequestId, resourceProfile, preferredAllocations);
        return this.internalRequestNewSlot(pendingRequest, timeout);
    }

    @Override
    @Nonnull
    public CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile, @Nonnull Collection<AllocationID> preferredAllocations) {
        this.assertRunningInMainThread();
        this.log.debug("Request new allocated batch slot with slot request id {} and resource profile {}", (Object)slotRequestId, (Object)resourceProfile);
        PendingRequest pendingRequest = PendingRequest.createBatchRequest(slotRequestId, resourceProfile, preferredAllocations);
        return this.internalRequestNewSlot(pendingRequest, null);
    }

    private CompletableFuture<PhysicalSlot> internalRequestNewSlot(PendingRequest pendingRequest, @Nullable Duration timeout) {
        this.internalRequestNewAllocatedSlot(pendingRequest);
        if (timeout == null) {
            return pendingRequest.getSlotFuture();
        }
        return FutureUtils.orTimeout(pendingRequest.getSlotFuture(), timeout.toMillis(), TimeUnit.MILLISECONDS, (Executor)this.componentMainThreadExecutor, String.format("Pending slot request %s timed out after %d ms.", pendingRequest.getSlotRequestId(), timeout.toMillis())).whenComplete((physicalSlot, throwable) -> {
            if (throwable instanceof TimeoutException) {
                this.timeoutPendingSlotRequest(pendingRequest.getSlotRequestId());
            }
        });
    }

    private void timeoutPendingSlotRequest(SlotRequestId slotRequestId) {
        this.releaseSlot(slotRequestId, new TimeoutException("Pending slot request timed out in slot pool."));
    }

    private void internalRequestNewAllocatedSlot(PendingRequest pendingRequest) {
        this.pendingRequests.put(pendingRequest.getSlotRequestId(), pendingRequest);
        this.getDeclarativeSlotPool().increaseResourceRequirementsBy(ResourceCounter.withResource(pendingRequest.getResourceProfile(), 1));
    }

    @Override
    protected void onFailAllocation(ResourceCounter previouslyFulfilledRequirements) {
        this.getDeclarativeSlotPool().decreaseResourceRequirementsBy(previouslyFulfilledRequirements);
    }

    @Override
    public void releaseSlot(@Nonnull SlotRequestId slotRequestId, @Nullable Throwable cause) {
        this.log.debug("Release slot with slot request id {}", (Object)slotRequestId);
        this.assertRunningInMainThread();
        PendingRequest pendingRequest = this.pendingRequests.remove(slotRequestId);
        if (pendingRequest != null) {
            this.getDeclarativeSlotPool().decreaseResourceRequirementsBy(ResourceCounter.withResource(pendingRequest.getResourceProfile(), 1));
            pendingRequest.failRequest(new FlinkException(String.format("Pending slot request with %s has been released.", pendingRequest.getSlotRequestId()), cause));
        } else {
            AllocationID allocationId = this.fulfilledRequests.remove(slotRequestId);
            if (allocationId != null) {
                ResourceCounter previouslyFulfilledRequirement = this.getDeclarativeSlotPool().freeReservedSlot(allocationId, cause, this.getRelativeTimeMillis());
                this.getDeclarativeSlotPool().decreaseResourceRequirementsBy(previouslyFulfilledRequirement);
            } else {
                this.log.debug("Could not find slot which has fulfilled slot request {}. Ignoring the release operation.", (Object)slotRequestId);
            }
        }
    }

    @Override
    public void notifyNotEnoughResourcesAvailable(Collection<ResourceRequirement> acquiredResources) {
        this.assertRunningInMainThread();
        this.failPendingRequests(acquiredResources);
    }

    private void failPendingRequests(Collection<ResourceRequirement> acquiredResources) {
        Predicate<PendingRequest> predicate = request -> !request.isBatchRequest();
        if (this.pendingRequests.values().stream().anyMatch(predicate)) {
            this.log.warn("Could not acquire the minimum required resources, failing slot requests. Acquired: {}. Current slot pool status: {}", acquiredResources, (Object)this.getSlotServiceStatus());
            this.cancelPendingRequests(predicate, NoResourceAvailableException.withoutStackTrace("Could not acquire the minimum required resources."));
        }
    }

    @Override
    public Collection<SlotInfo> getAllocatedSlotsInformation() {
        this.assertRunningInMainThread();
        Collection<? extends SlotInfo> allSlotsInformation = this.getDeclarativeSlotPool().getAllSlotsInformation();
        Set<AllocationID> freeSlots = this.getDeclarativeSlotPool().getFreeSlotTracker().getAvailableSlots();
        return allSlotsInformation.stream().filter(slotInfo -> !freeSlots.contains(slotInfo.getAllocationId())).collect(Collectors.toList());
    }

    @Override
    public FreeSlotTracker getFreeSlotTracker() {
        this.assertRunningInMainThread();
        return this.getDeclarativeSlotPool().getFreeSlotTracker();
    }

    @Override
    public void disableBatchSlotRequestTimeoutCheck() {
        this.isBatchSlotRequestTimeoutCheckDisabled = true;
    }

    private void assertRunningInMainThread() {
        if (this.componentMainThreadExecutor == null) {
            throw new IllegalStateException("The FutureSlotPool has not been started yet.");
        }
        this.componentMainThreadExecutor.assertRunningInMainThread();
    }

    private void checkIdleSlotTimeout() {
        this.getDeclarativeSlotPool().releaseIdleSlots(this.getRelativeTimeMillis());
        if (this.componentMainThreadExecutor != null) {
            this.componentMainThreadExecutor.schedule(this::checkIdleSlotTimeout, this.idleSlotTimeout.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    void checkBatchSlotTimeout() {
        this.assertRunningInMainThread();
        if (this.isBatchSlotRequestTimeoutCheckDisabled) {
            return;
        }
        Collection<PendingRequest> pendingBatchRequests = this.getPendingBatchRequests();
        if (!pendingBatchRequests.isEmpty()) {
            Set<ResourceProfile> allResourceProfiles = this.getResourceProfilesFromAllSlots();
            Map<Boolean, List<PendingRequest>> fulfillableAndUnfulfillableRequests = pendingBatchRequests.stream().collect(Collectors.partitioningBy(DeclarativeSlotPoolBridge.canBeFulfilledWithAnySlot(allResourceProfiles)));
            List<PendingRequest> fulfillableRequests = fulfillableAndUnfulfillableRequests.get(true);
            List<PendingRequest> unfulfillableRequests = fulfillableAndUnfulfillableRequests.get(false);
            long currentTimestamp = this.getRelativeTimeMillis();
            for (PendingRequest fulfillableRequest : fulfillableRequests) {
                fulfillableRequest.markFulfillable();
            }
            for (PendingRequest unfulfillableRequest : unfulfillableRequests) {
                unfulfillableRequest.markUnfulfillable(currentTimestamp);
                if (unfulfillableRequest.getUnfulfillableSince() + this.batchSlotTimeout.toMillis() > currentTimestamp) continue;
                this.timeoutPendingSlotRequest(unfulfillableRequest.getSlotRequestId());
            }
        }
        if (this.componentMainThreadExecutor != null) {
            this.componentMainThreadExecutor.schedule(this::checkBatchSlotTimeout, this.batchSlotTimeout.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private Set<ResourceProfile> getResourceProfilesFromAllSlots() {
        return Stream.concat(this.getFreeSlotTracker().getFreeSlotsInformation().stream(), this.getAllocatedSlotsInformation().stream()).map(SlotInfo::getResourceProfile).collect(Collectors.toSet());
    }

    private Collection<PendingRequest> getPendingBatchRequests() {
        return this.pendingRequests.values().stream().filter(PendingRequest::isBatchRequest).collect(Collectors.toList());
    }

    private static Predicate<PendingRequest> canBeFulfilledWithAnySlot(Set<ResourceProfile> allocatedResourceProfiles) {
        return pendingRequest -> {
            for (ResourceProfile allocatedResourceProfile : allocatedResourceProfiles) {
                if (!allocatedResourceProfile.isMatching(pendingRequest.getResourceProfile())) continue;
                return true;
            }
            return false;
        };
    }

    @VisibleForTesting
    public int getNumPendingRequests() {
        return this.pendingRequests.size();
    }

    @VisibleForTesting
    void increaseResourceRequirementsBy(ResourceCounter increment) {
        this.getDeclarativeSlotPool().increaseResourceRequirementsBy(increment);
    }

    @VisibleForTesting
    boolean isBatchSlotRequestTimeoutCheckEnabled() {
        return !this.isBatchSlotRequestTimeoutCheckDisabled;
    }
}

