/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.State;
import org.apache.flink.runtime.scheduler.adaptive.StateFactory;
import org.apache.flink.runtime.scheduler.adaptive.StateTransitions;
import org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

class Restarting
extends StateWithExecutionGraph {
    private final Context context;
    private final Duration backoffTime;
    @Nullable
    private ScheduledFuture<?> goToSubsequentStateFuture;
    @Nullable
    private final VertexParallelism restartWithParallelism;

    Restarting(Context context, ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Logger logger, Duration backoffTime, @Nullable VertexParallelism restartWithParallelism, ClassLoader userCodeClassLoader, List<ExceptionHistoryEntry> failureCollection) {
        super(context, executionGraph, executionGraphHandler, operatorCoordinatorHandler, logger, userCodeClassLoader, failureCollection);
        this.context = context;
        this.backoffTime = backoffTime;
        this.restartWithParallelism = restartWithParallelism;
        this.getExecutionGraph().cancel();
    }

    @Override
    public void onLeave(Class<? extends State> newState) {
        if (this.goToSubsequentStateFuture != null) {
            this.goToSubsequentStateFuture.cancel(false);
        }
        super.onLeave(newState);
    }

    @Override
    public JobStatus getJobStatus() {
        return JobStatus.RESTARTING;
    }

    @Override
    public void suspend(Throwable cause) {
        this.suspend(cause, JobStatus.SUSPENDED);
    }

    @Override
    public void cancel() {
        this.context.goToCanceling(this.getExecutionGraph(), this.getExecutionGraphHandler(), this.getOperatorCoordinatorHandler(), this.getFailures());
    }

    @Override
    void onFailure(Throwable failure, CompletableFuture<Map<String, String>> failureLabels) {
    }

    @Override
    void onGloballyTerminalState(JobStatus globallyTerminalState) {
        Preconditions.checkArgument(globallyTerminalState == JobStatus.CANCELED);
        this.goToSubsequentStateFuture = this.context.runIfState(this, this::goToSubsequentState, this.backoffTime);
    }

    private void goToSubsequentState() {
        if (this.availableParallelismNotChanged(this.restartWithParallelism)) {
            this.context.goToCreatingExecutionGraph(this.getExecutionGraph());
        } else {
            this.context.goToWaitingForResources(this.getExecutionGraph());
        }
    }

    private boolean availableParallelismNotChanged(VertexParallelism restartWithParallelism) {
        if (this.restartWithParallelism == null) {
            return false;
        }
        return this.context.getAvailableVertexParallelism().map(vertexParallelism -> vertexParallelism.getVertices().stream().allMatch(vertex -> restartWithParallelism.getParallelism((JobVertexID)vertex) == vertexParallelism.getParallelism((JobVertexID)vertex))).orElse(false);
    }

    static class Factory
    implements StateFactory<Restarting> {
        private final Context context;
        private final Logger log;
        private final ExecutionGraph executionGraph;
        private final ExecutionGraphHandler executionGraphHandler;
        private final OperatorCoordinatorHandler operatorCoordinatorHandler;
        private final Duration backoffTime;
        @Nullable
        private final VertexParallelism restartWithParallelism;
        private final ClassLoader userCodeClassLoader;
        private final List<ExceptionHistoryEntry> failureCollection;

        public Factory(Context context, ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Logger log, Duration backoffTime, @Nullable VertexParallelism restartWithParallelism, ClassLoader userCodeClassLoader, List<ExceptionHistoryEntry> failureCollection) {
            this.context = context;
            this.log = log;
            this.executionGraph = executionGraph;
            this.executionGraphHandler = executionGraphHandler;
            this.operatorCoordinatorHandler = operatorCoordinatorHandler;
            this.backoffTime = backoffTime;
            this.restartWithParallelism = restartWithParallelism;
            this.userCodeClassLoader = userCodeClassLoader;
            this.failureCollection = failureCollection;
        }

        @Override
        public Class<Restarting> getStateClass() {
            return Restarting.class;
        }

        @Override
        public Restarting getState() {
            return new Restarting(this.context, this.executionGraph, this.executionGraphHandler, this.operatorCoordinatorHandler, this.log, this.backoffTime, this.restartWithParallelism, this.userCodeClassLoader, this.failureCollection);
        }
    }

    static interface Context
    extends StateWithExecutionGraph.Context,
    StateTransitions.ToCancelling,
    StateTransitions.ToWaitingForResources,
    StateTransitions.ToCreatingExecutionGraph {
        public ScheduledFuture<?> runIfState(State var1, Runnable var2, Duration var3);

        public Optional<VertexParallelism> getAvailableVertexParallelism();
    }
}

