/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.scheduler.ExecutionDeployer;
import org.apache.flink.runtime.scheduler.ExecutionOperations;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocator;
import org.apache.flink.runtime.scheduler.ExecutionSlotAssignment;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;

public class DefaultExecutionDeployer
implements ExecutionDeployer {
    private final Logger log;
    private final ExecutionSlotAllocator executionSlotAllocator;
    private final ExecutionOperations executionOperations;
    private final ExecutionVertexVersioner executionVertexVersioner;
    private final Duration partitionRegistrationTimeout;
    private final BiConsumer<ExecutionVertexID, AllocationID> allocationReservationFunc;
    private final ComponentMainThreadExecutor mainThreadExecutor;

    private DefaultExecutionDeployer(Logger log, ExecutionSlotAllocator executionSlotAllocator, ExecutionOperations executionOperations, ExecutionVertexVersioner executionVertexVersioner, Duration partitionRegistrationTimeout, BiConsumer<ExecutionVertexID, AllocationID> allocationReservationFunc, ComponentMainThreadExecutor mainThreadExecutor) {
        this.log = Preconditions.checkNotNull(log);
        this.executionSlotAllocator = Preconditions.checkNotNull(executionSlotAllocator);
        this.executionOperations = Preconditions.checkNotNull(executionOperations);
        this.executionVertexVersioner = Preconditions.checkNotNull(executionVertexVersioner);
        this.partitionRegistrationTimeout = Preconditions.checkNotNull(partitionRegistrationTimeout);
        this.allocationReservationFunc = Preconditions.checkNotNull(allocationReservationFunc);
        this.mainThreadExecutor = Preconditions.checkNotNull(mainThreadExecutor);
    }

    @Override
    public void allocateSlotsAndDeploy(List<Execution> executionsToDeploy, Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex) {
        this.validateExecutionStates(executionsToDeploy);
        this.transitionToScheduled(executionsToDeploy);
        Map<ExecutionAttemptID, ExecutionSlotAssignment> executionSlotAssignmentMap = this.allocateSlotsFor(executionsToDeploy);
        List<ExecutionDeploymentHandle> deploymentHandles = this.createDeploymentHandles(executionsToDeploy, requiredVersionByVertex, executionSlotAssignmentMap);
        this.waitForAllSlotsAndDeploy(deploymentHandles);
    }

    private void validateExecutionStates(Collection<Execution> executionsToDeploy) {
        executionsToDeploy.forEach(e -> Preconditions.checkState(e.getState() == ExecutionState.CREATED, "Expected execution %s to be in CREATED state, was: %s", new Object[]{e.getAttemptId(), e.getState()}));
    }

    private void transitionToScheduled(List<Execution> executionsToDeploy) {
        executionsToDeploy.forEach(e -> e.transitionState(ExecutionState.SCHEDULED));
    }

    private Map<ExecutionAttemptID, ExecutionSlotAssignment> allocateSlotsFor(List<Execution> executionsToDeploy) {
        List<ExecutionAttemptID> executionAttemptIds = executionsToDeploy.stream().map(Execution::getAttemptId).collect(Collectors.toList());
        return this.executionSlotAllocator.allocateSlotsFor(executionAttemptIds);
    }

    private List<ExecutionDeploymentHandle> createDeploymentHandles(List<Execution> executionsToDeploy, Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex, Map<ExecutionAttemptID, ExecutionSlotAssignment> executionSlotAssignmentMap) {
        Preconditions.checkState(executionsToDeploy.size() == executionSlotAssignmentMap.size());
        ArrayList<ExecutionDeploymentHandle> deploymentHandles = new ArrayList<ExecutionDeploymentHandle>(executionsToDeploy.size());
        for (Execution execution : executionsToDeploy) {
            ExecutionSlotAssignment assignment = Preconditions.checkNotNull(executionSlotAssignmentMap.get(execution.getAttemptId()));
            ExecutionVertexID executionVertexId = execution.getVertex().getID();
            ExecutionDeploymentHandle deploymentHandle = new ExecutionDeploymentHandle(execution, assignment, requiredVersionByVertex.get(executionVertexId));
            deploymentHandles.add(deploymentHandle);
        }
        return deploymentHandles;
    }

    private void waitForAllSlotsAndDeploy(List<ExecutionDeploymentHandle> deploymentHandles) {
        FutureUtils.assertNoException(this.assignAllResourcesAndRegisterProducedPartitions(deploymentHandles).handle(this.deployAll(deploymentHandles)));
    }

    private CompletableFuture<Void> assignAllResourcesAndRegisterProducedPartitions(List<ExecutionDeploymentHandle> deploymentHandles) {
        ArrayList<CompletionStage> resultFutures = new ArrayList<CompletionStage>();
        for (ExecutionDeploymentHandle deploymentHandle : deploymentHandles) {
            CompletionStage resultFuture = ((CompletableFuture)((CompletableFuture)deploymentHandle.getLogicalSlotFuture().handle(this.assignResource(deploymentHandle))).thenCompose(this.registerProducedPartitions(deploymentHandle))).handle((ignore, throwable) -> {
                if (throwable != null) {
                    this.handleTaskDeploymentFailure(deploymentHandle.getExecution(), (Throwable)throwable);
                }
                return null;
            });
            resultFutures.add(resultFuture);
        }
        return FutureUtils.waitForAll(resultFutures);
    }

    private BiFunction<Void, Throwable, Void> deployAll(List<ExecutionDeploymentHandle> deploymentHandles) {
        return (ignored, throwable) -> {
            DefaultExecutionDeployer.propagateIfNonNull(throwable);
            for (ExecutionDeploymentHandle deploymentHandle : deploymentHandles) {
                CompletableFuture<LogicalSlot> slotAssigned = deploymentHandle.getLogicalSlotFuture();
                Preconditions.checkState(slotAssigned.isDone());
                FutureUtils.assertNoException(slotAssigned.handle(this.deployOrHandleError(deploymentHandle)));
            }
            return null;
        };
    }

    private static void propagateIfNonNull(Throwable throwable) {
        if (throwable != null) {
            throw new CompletionException(throwable);
        }
    }

    private BiFunction<LogicalSlot, Throwable, LogicalSlot> assignResource(ExecutionDeploymentHandle deploymentHandle) {
        return (logicalSlot, throwable) -> {
            ExecutionVertexVersion requiredVertexVersion = deploymentHandle.getRequiredVertexVersion();
            Execution execution = deploymentHandle.getExecution();
            if (execution.getState() != ExecutionState.SCHEDULED || this.executionVertexVersioner.isModified(requiredVertexVersion)) {
                if (throwable == null) {
                    this.log.debug("Refusing to assign slot to execution {} because this deployment was superseded by another deployment", (Object)deploymentHandle.getExecutionAttemptId());
                    DefaultExecutionDeployer.releaseSlotIfPresent(logicalSlot);
                }
                return null;
            }
            if (throwable != null) {
                throw new CompletionException(DefaultExecutionDeployer.maybeWrapWithNoResourceAvailableException(throwable));
            }
            if (!execution.tryAssignResource((LogicalSlot)logicalSlot)) {
                throw new IllegalStateException("Could not assign resource " + String.valueOf(logicalSlot) + " to execution " + String.valueOf(execution) + ".");
            }
            this.allocationReservationFunc.accept(execution.getAttemptId().getExecutionVertexId(), logicalSlot.getAllocationId());
            return logicalSlot;
        };
    }

    private static void releaseSlotIfPresent(@Nullable LogicalSlot logicalSlot) {
        if (logicalSlot != null) {
            logicalSlot.releaseSlot(null);
        }
    }

    private static Throwable maybeWrapWithNoResourceAvailableException(Throwable failure) {
        Throwable strippedThrowable = ExceptionUtils.stripCompletionException(failure);
        if (strippedThrowable instanceof TimeoutException) {
            return new NoResourceAvailableException("Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources.", failure);
        }
        return failure;
    }

    private Function<LogicalSlot, CompletableFuture<Void>> registerProducedPartitions(ExecutionDeploymentHandle deploymentHandle) {
        return logicalSlot -> {
            if (logicalSlot != null) {
                Execution execution = deploymentHandle.getExecution();
                CompletableFuture<Void> partitionRegistrationFuture = execution.registerProducedPartitions(logicalSlot.getTaskManagerLocation());
                return FutureUtils.orTimeout(partitionRegistrationFuture, this.partitionRegistrationTimeout.toMillis(), TimeUnit.MILLISECONDS, (Executor)this.mainThreadExecutor, String.format("Registering produced partitions for execution %s timed out after %d ms.", execution.getAttemptId(), this.partitionRegistrationTimeout.toMillis()));
            }
            return FutureUtils.completedVoidFuture();
        };
    }

    private BiFunction<Object, Throwable, Void> deployOrHandleError(ExecutionDeploymentHandle deploymentHandle) {
        return (ignored, throwable) -> {
            ExecutionVertexVersion requiredVertexVersion = deploymentHandle.getRequiredVertexVersion();
            Execution execution = deploymentHandle.getExecution();
            if (execution.getState() != ExecutionState.SCHEDULED || this.executionVertexVersioner.isModified(requiredVertexVersion)) {
                if (throwable == null) {
                    this.log.debug("Refusing to assign slot to execution {} because this deployment was superseded by another deployment", (Object)deploymentHandle.getExecutionAttemptId());
                }
                return null;
            }
            if (throwable == null) {
                this.deployTaskSafe(execution);
            } else {
                this.handleTaskDeploymentFailure(execution, (Throwable)throwable);
            }
            return null;
        };
    }

    private void deployTaskSafe(Execution execution) {
        try {
            this.executionOperations.deploy(execution);
        }
        catch (Throwable e) {
            this.handleTaskDeploymentFailure(execution, e);
        }
    }

    private void handleTaskDeploymentFailure(Execution execution, Throwable error) {
        this.executionOperations.markFailed(execution, error);
    }

    public static class Factory
    implements ExecutionDeployer.Factory {
        @Override
        public DefaultExecutionDeployer createInstance(Logger log, ExecutionSlotAllocator executionSlotAllocator, ExecutionOperations executionOperations, ExecutionVertexVersioner executionVertexVersioner, Duration partitionRegistrationTimeout, BiConsumer<ExecutionVertexID, AllocationID> allocationReservationFunc, ComponentMainThreadExecutor mainThreadExecutor) {
            return new DefaultExecutionDeployer(log, executionSlotAllocator, executionOperations, executionVertexVersioner, partitionRegistrationTimeout, allocationReservationFunc, mainThreadExecutor);
        }
    }

    private static class ExecutionDeploymentHandle {
        private final Execution execution;
        private final ExecutionSlotAssignment executionSlotAssignment;
        private final ExecutionVertexVersion requiredVertexVersion;

        ExecutionDeploymentHandle(Execution execution, ExecutionSlotAssignment executionSlotAssignment, ExecutionVertexVersion requiredVertexVersion) {
            this.execution = Preconditions.checkNotNull(execution);
            this.executionSlotAssignment = Preconditions.checkNotNull(executionSlotAssignment);
            this.requiredVertexVersion = Preconditions.checkNotNull(requiredVertexVersion);
        }

        Execution getExecution() {
            return this.execution;
        }

        ExecutionAttemptID getExecutionAttemptId() {
            return this.execution.getAttemptId();
        }

        CompletableFuture<LogicalSlot> getLogicalSlotFuture() {
            return this.executionSlotAssignment.getLogicalSlotFuture();
        }

        ExecutionVertexVersion getRequiredVertexVersion() {
            return this.requiredVertexVersion;
        }
    }
}

