package org.apache.flink.runtime.scheduler.stopwithsavepoint;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.class */
public class StopWithSavepointTerminationHandlerImpl implements StopWithSavepointTerminationHandler {
    private final Logger log;
    private final SchedulerNG scheduler;
    private final CheckpointScheduling checkpointScheduling;
    private final JobID jobId;
    private final CompletableFuture<String> result;
    private State state;

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl$FinalState.class */
    private static final class FinalState implements State {
        private FinalState() {
        }

        @Override // org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl.State
        public State onExecutionsFinished() {
            return this;
        }

        @Override // org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl.State
        public State onAnyExecutionNotFinished(Iterable<ExecutionState> iterable) {
            return this;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl$SavepointCreated.class */
    private final class SavepointCreated implements State {
        private final CompletedCheckpoint completedSavepoint;

        private SavepointCreated(CompletedCheckpoint completedCheckpoint) {
            this.completedSavepoint = completedCheckpoint;
        }

        @Override // org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl.State
        public State onExecutionsFinished() {
            StopWithSavepointTerminationHandlerImpl.this.terminateSuccessfully(this.completedSavepoint);
            return new FinalState();
        }

        @Override // org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl.State
        public State onAnyExecutionNotFinished(Iterable<ExecutionState> iterable) {
            StopWithSavepointTerminationHandlerImpl.this.terminateExceptionallyWithGlobalFailover(iterable, this.completedSavepoint.getExternalPointer());
            return new FinalState();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl$State.class */
    public interface State {
        default State onSavepointCreation(CompletedCheckpoint completedCheckpoint) {
            throw new UnsupportedOperationException(getClass().getSimpleName() + " state does not support onSavepointCreation.");
        }

        default State onSavepointCreationFailure(Throwable th) {
            throw new UnsupportedOperationException(getClass().getSimpleName() + " state does not support onSavepointCreationFailure.");
        }

        default State onExecutionsFinished() {
            throw new UnsupportedOperationException(getClass().getSimpleName() + " state does not support onExecutionsFinished.");
        }

        default State onAnyExecutionNotFinished(Iterable<ExecutionState> iterable) {
            throw new UnsupportedOperationException(getClass().getSimpleName() + " state does not support onAnyExecutionNotFinished.");
        }

        default String getName() {
            return getClass().getSimpleName();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl$WaitingForSavepoint.class */
    private final class WaitingForSavepoint implements State {
        private WaitingForSavepoint() {
        }

        @Override // org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl.State
        public State onSavepointCreation(CompletedCheckpoint completedCheckpoint) {
            return new SavepointCreated(completedCheckpoint);
        }

        @Override // org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl.State
        public State onSavepointCreationFailure(Throwable th) {
            StopWithSavepointTerminationHandlerImpl.this.terminateExceptionally(th);
            return new FinalState();
        }
    }

    public <S extends SchedulerNG & CheckpointScheduling> StopWithSavepointTerminationHandlerImpl(JobID jobID, S s, Logger logger) {
        this(jobID, s, s, logger);
    }

    @VisibleForTesting
    StopWithSavepointTerminationHandlerImpl(JobID jobID, SchedulerNG schedulerNG, CheckpointScheduling checkpointScheduling, Logger logger) {
        this.result = new CompletableFuture<>();
        this.state = new WaitingForSavepoint();
        this.jobId = (JobID) Preconditions.checkNotNull(jobID);
        this.scheduler = (SchedulerNG) Preconditions.checkNotNull(schedulerNG);
        this.checkpointScheduling = (CheckpointScheduling) Preconditions.checkNotNull(checkpointScheduling);
        this.log = (Logger) Preconditions.checkNotNull(logger);
    }

    @Override // org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandler
    public CompletableFuture<String> getSavepointPath() {
        return this.result;
    }

    @Override // org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandler
    public void handleSavepointCreation(CompletedCheckpoint completedCheckpoint, Throwable th) {
        if (th == null) {
            handleSavepointCreationSuccess((CompletedCheckpoint) Preconditions.checkNotNull(completedCheckpoint));
        } else {
            Preconditions.checkArgument(completedCheckpoint == null, "No savepoint should be provided if a throwable is passed.");
            handleSavepointCreationFailure(th);
        }
    }

    @Override // org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandler
    public void handleExecutionsTermination(Collection<ExecutionState> collection) {
        Set<ExecutionState> set = (Set) ((Collection) Preconditions.checkNotNull(collection)).stream().filter(executionState -> {
            return executionState != ExecutionState.FINISHED;
        }).collect(Collectors.toSet());
        if (set.isEmpty()) {
            handleExecutionsFinished();
        } else {
            handleAnyExecutionNotFinished(set);
        }
    }

    private void handleSavepointCreationSuccess(CompletedCheckpoint completedCheckpoint) {
        State state = this.state;
        this.state = this.state.onSavepointCreation(completedCheckpoint);
        this.log.debug("Stop-with-savepoint transitioned from {} to {} on savepoint creation handling for job {}.", new Object[]{state.getName(), this.state.getName(), this.jobId});
    }

    private void handleSavepointCreationFailure(Throwable th) {
        State state = this.state;
        this.state = this.state.onSavepointCreationFailure(th);
        this.log.debug("Stop-with-savepoint transitioned from {} to {} on savepoint creation failure handling for job {}.", new Object[]{state.getName(), this.state.getName(), this.jobId});
    }

    private void handleExecutionsFinished() {
        State state = this.state;
        this.state = this.state.onExecutionsFinished();
        this.log.debug("Stop-with-savepoint transitioned from {} to {} on execution termination handling with all executions being finished for job {}.", new Object[]{state.getName(), this.state.getName(), this.jobId});
    }

    private void handleAnyExecutionNotFinished(Set<ExecutionState> set) {
        State state = this.state;
        this.state = this.state.onAnyExecutionNotFinished(set);
        this.log.warn("Stop-with-savepoint transitioned from {} to {} on execution termination handling for job {} with some executions being in an not-finished state: {}", new Object[]{state.getName(), this.state.getName(), this.jobId, set});
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void terminateExceptionallyWithGlobalFailover(Iterable<ExecutionState> iterable, String str) {
        FlinkException stopWithSavepointStoppingException = new StopWithSavepointStoppingException(str, this.jobId);
        this.log.warn("Inconsistent execution state after stopping with savepoint. At least one execution is still in one of the following states: {}.", StringUtils.join(iterable, ", "), stopWithSavepointStoppingException);
        this.scheduler.handleGlobalFailure(stopWithSavepointStoppingException);
        this.result.completeExceptionally(stopWithSavepointStoppingException);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void terminateExceptionally(Throwable th) {
        this.checkpointScheduling.startCheckpointScheduler();
        this.result.completeExceptionally(th);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void terminateSuccessfully(CompletedCheckpoint completedCheckpoint) {
        this.result.complete(completedCheckpoint.getExternalPointer());
    }
}
