/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceTracker;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotState;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotStatusSyncer;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerInfo;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerSlotInformation;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerTracker;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.util.MdcUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultSlotStatusSyncer
implements SlotStatusSyncer {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultSlotStatusSyncer.class);
    private final Set<AllocationID> pendingSlotAllocations = new HashSet<AllocationID>();
    private final Duration taskManagerRequestTimeout;
    @Nullable
    private TaskManagerTracker taskManagerTracker;
    @Nullable
    private ResourceTracker resourceTracker;
    @Nullable
    private Executor mainThreadExecutor;
    @Nullable
    private ResourceManagerId resourceManagerId;
    private boolean started = false;

    public DefaultSlotStatusSyncer(Duration taskManagerRequestTimeout) {
        this.taskManagerRequestTimeout = (Duration)Preconditions.checkNotNull((Object)taskManagerRequestTimeout);
    }

    @Override
    public void initialize(TaskManagerTracker taskManagerTracker, ResourceTracker resourceTracker, ResourceManagerId resourceManagerId, Executor mainThreadExecutor) {
        this.taskManagerTracker = (TaskManagerTracker)Preconditions.checkNotNull((Object)taskManagerTracker);
        this.resourceTracker = (ResourceTracker)Preconditions.checkNotNull((Object)resourceTracker);
        this.mainThreadExecutor = (Executor)Preconditions.checkNotNull((Object)mainThreadExecutor);
        this.resourceManagerId = (ResourceManagerId)((Object)Preconditions.checkNotNull((Object)((Object)resourceManagerId)));
        this.pendingSlotAllocations.clear();
        this.started = true;
    }

    @Override
    public void close() {
        this.taskManagerTracker = null;
        this.resourceTracker = null;
        this.mainThreadExecutor = null;
        this.resourceManagerId = null;
        this.pendingSlotAllocations.clear();
        this.started = false;
    }

    @Override
    public CompletableFuture<Void> allocateSlot(InstanceID instanceId, JobID jobId, String targetAddress, ResourceProfile resourceProfile) {
        Preconditions.checkNotNull((Object)((Object)instanceId));
        Preconditions.checkNotNull((Object)jobId);
        Preconditions.checkNotNull((Object)targetAddress);
        Preconditions.checkNotNull((Object)resourceProfile);
        this.checkStarted();
        Optional<TaskManagerInfo> taskManager = this.taskManagerTracker.getRegisteredTaskManager(instanceId);
        Preconditions.checkState((boolean)taskManager.isPresent(), (Object)("Could not find a registered task manager for instance id " + String.valueOf((Object)instanceId) + "."));
        try (MdcUtils.MdcCloseable ignored = MdcUtils.withContext((Map)MdcUtils.asContextData((JobID)jobId));){
            AllocationID allocationId = new AllocationID();
            TaskExecutorGateway gateway = taskManager.get().getTaskExecutorConnection().getTaskExecutorGateway();
            ResourceID resourceId = taskManager.get().getTaskExecutorConnection().getResourceID();
            LOG.info("Starting allocation of slot {} from {} for job {} with resource profile {}.", new Object[]{allocationId, resourceId, jobId, resourceProfile});
            this.taskManagerTracker.notifySlotStatus(allocationId, jobId, instanceId, resourceProfile, SlotState.PENDING);
            this.resourceTracker.notifyAcquiredResource(jobId, resourceProfile);
            this.pendingSlotAllocations.add(allocationId);
            CompletableFuture<Acknowledge> requestFuture = gateway.requestSlot(SlotID.getDynamicSlotID(resourceId), jobId, allocationId, resourceProfile, targetAddress, this.resourceManagerId, this.taskManagerRequestTimeout);
            CompletableFuture<Void> returnedFuture = new CompletableFuture<Void>();
            FutureUtils.assertNoException((CompletableFuture)requestFuture.handleAsync(this.handleSlotAllocation(instanceId, jobId, resourceProfile, allocationId, returnedFuture), this.mainThreadExecutor));
            CompletableFuture<Void> completableFuture = returnedFuture;
            return completableFuture;
        }
    }

    private BiFunction<Acknowledge, Throwable, Object> handleSlotAllocation(InstanceID instanceId, JobID jobId, ResourceProfile resourceProfile, AllocationID allocationId, CompletableFuture<Void> returnedFuture) {
        return (acknowledge, throwable) -> {
            try (MdcUtils.MdcCloseable ignored = MdcUtils.withContext((Map)MdcUtils.asContextData((JobID)jobId));){
                if (!this.pendingSlotAllocations.remove((Object)allocationId)) {
                    LOG.debug("Ignoring slot allocation update from task manager {} for allocation {} and job {}, because the allocation was already completed or cancelled.", new Object[]{instanceId, allocationId, jobId});
                    returnedFuture.complete(null);
                    Object var9_9 = null;
                    return var9_9;
                }
                if (!this.taskManagerTracker.getAllocatedOrPendingSlot(allocationId).isPresent()) {
                    LOG.debug("The slot {} has been removed before. Ignore the future.", (Object)allocationId);
                    returnedFuture.complete(null);
                    Object var9_10 = null;
                    return var9_10;
                }
                if (acknowledge != null) {
                    LOG.trace("Completed allocation of allocation {} for job {}.", (Object)allocationId, (Object)jobId);
                    this.taskManagerTracker.notifySlotStatus(allocationId, jobId, instanceId, resourceProfile, SlotState.ALLOCATED);
                    returnedFuture.complete(null);
                } else {
                    if (throwable instanceof SlotOccupiedException) {
                        LOG.error("Should not get this exception.", throwable);
                    } else {
                        LOG.warn("Slot allocation for allocation {} for job {} failed.", new Object[]{allocationId, jobId, throwable});
                        this.resourceTracker.notifyLostResource(jobId, resourceProfile);
                        this.taskManagerTracker.notifySlotStatus(allocationId, jobId, instanceId, resourceProfile, SlotState.FREE);
                    }
                    returnedFuture.completeExceptionally((Throwable)throwable);
                }
                Object var9_11 = null;
                return var9_11;
            }
        };
    }

    @Override
    public void freeSlot(AllocationID allocationId) {
        Preconditions.checkNotNull((Object)((Object)allocationId));
        this.checkStarted();
        Optional<TaskManagerSlotInformation> slotOptional = this.taskManagerTracker.getAllocatedOrPendingSlot(allocationId);
        if (!slotOptional.isPresent()) {
            LOG.warn("Try to free unknown slot {}.", (Object)allocationId);
            return;
        }
        TaskManagerSlotInformation slot = slotOptional.get();
        try (MdcUtils.MdcCloseable ignored = MdcUtils.withContext((Map)MdcUtils.asContextData((JobID)slot.getJobId()));){
            LOG.info("Freeing slot {}.", (Object)allocationId);
            if (slot.getState() == SlotState.PENDING) {
                this.pendingSlotAllocations.remove((Object)allocationId);
            }
            this.resourceTracker.notifyLostResource(slot.getJobId(), slot.getResourceProfile());
            this.taskManagerTracker.notifySlotStatus(allocationId, slot.getJobId(), slot.getInstanceId(), slot.getResourceProfile(), SlotState.FREE);
        }
    }

    @Override
    public boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) {
        Preconditions.checkNotNull((Object)slotReport);
        Preconditions.checkNotNull((Object)((Object)instanceId));
        this.checkStarted();
        Optional<TaskManagerInfo> taskManager = this.taskManagerTracker.getRegisteredTaskManager(instanceId);
        if (!taskManager.isPresent()) {
            LOG.debug("Received slot report for unknown task manager with instance id {}. Ignoring this report.", (Object)instanceId);
            return false;
        }
        LOG.debug("Received slot report from instance {}: {}.", (Object)instanceId, (Object)slotReport);
        boolean canApplyPreviousAllocations = true;
        HashSet reportedAllocationIds = new HashSet();
        slotReport.iterator().forEachRemaining(slotStatus -> reportedAllocationIds.add(slotStatus.getAllocationID()));
        for (TaskManagerSlotInformation slot : new HashSet<TaskManagerSlotInformation>(taskManager.get().getAllocatedSlots().values())) {
            if (reportedAllocationIds.contains((Object)slot.getAllocationId()) || slot.getState() != SlotState.ALLOCATED) continue;
            MdcUtils.MdcCloseable ignored = MdcUtils.withContext((Map)MdcUtils.asContextData((JobID)slot.getJobId()));
            try {
                LOG.info("Freeing slot {} by slot report.", (Object)slot.getAllocationId());
                this.taskManagerTracker.notifySlotStatus(slot.getAllocationId(), slot.getJobId(), slot.getInstanceId(), slot.getResourceProfile(), SlotState.FREE);
                this.resourceTracker.notifyLostResource(slot.getJobId(), slot.getResourceProfile());
                canApplyPreviousAllocations = false;
            }
            finally {
                if (ignored == null) continue;
                ignored.close();
            }
        }
        for (SlotStatus slotStatus2 : slotReport) {
            if (slotStatus2.getAllocationID() == null || this.syncAllocatedSlotStatus(slotStatus2, taskManager.get())) continue;
            canApplyPreviousAllocations = false;
        }
        return canApplyPreviousAllocations;
    }

    private boolean syncAllocatedSlotStatus(SlotStatus slotStatus, TaskManagerInfo taskManager) {
        AllocationID allocationId = (AllocationID)((Object)Preconditions.checkNotNull((Object)((Object)slotStatus.getAllocationID())));
        JobID jobId = (JobID)Preconditions.checkNotNull((Object)slotStatus.getJobID());
        try (MdcUtils.MdcCloseable ignored = MdcUtils.withContext((Map)MdcUtils.asContextData((JobID)jobId));){
            ResourceProfile resourceProfile = (ResourceProfile)Preconditions.checkNotNull((Object)slotStatus.getResourceProfile());
            if (taskManager.getAllocatedSlots().containsKey((Object)allocationId)) {
                if (taskManager.getAllocatedSlots().get((Object)allocationId).getState() == SlotState.PENDING) {
                    TaskManagerSlotInformation slot = taskManager.getAllocatedSlots().get((Object)allocationId);
                    this.pendingSlotAllocations.remove((Object)slot.getAllocationId());
                    this.taskManagerTracker.notifySlotStatus(slot.getAllocationId(), slot.getJobId(), slot.getInstanceId(), slot.getResourceProfile(), SlotState.ALLOCATED);
                }
                boolean bl = true;
                return bl;
            }
            Preconditions.checkState((!this.taskManagerTracker.getAllocatedOrPendingSlot(allocationId).isPresent() ? 1 : 0) != 0, (Object)("Duplicated allocation for " + String.valueOf((Object)allocationId)));
            this.taskManagerTracker.notifySlotStatus(allocationId, jobId, taskManager.getInstanceId(), resourceProfile, SlotState.ALLOCATED);
            this.resourceTracker.notifyAcquiredResource(jobId, resourceProfile);
            boolean bl = false;
            return bl;
        }
    }

    @Override
    public void freeInactiveSlots(JobID jobId) {
        this.checkStarted();
        for (TaskManagerInfo taskManagerInfo : this.taskManagerTracker.getTaskManagersWithAllocatedSlotsForJob(jobId)) {
            taskManagerInfo.getTaskExecutorConnection().getTaskExecutorGateway().freeInactiveSlots(jobId, this.taskManagerRequestTimeout);
        }
    }

    private void checkStarted() {
        Preconditions.checkState((boolean)this.started);
        Preconditions.checkNotNull((Object)this.taskManagerTracker);
        Preconditions.checkNotNull((Object)this.resourceTracker);
        Preconditions.checkNotNull((Object)this.mainThreadExecutor);
        Preconditions.checkNotNull((Object)((Object)this.resourceManagerId));
    }
}

