package org.apache.flink.runtime.scheduler.adaptive;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.KvStateHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.VertexEndOfDataListener;
import org.apache.flink.runtime.scheduler.adaptive.StateTransitions;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.class */
public abstract class StateWithExecutionGraph implements State {
    private final Context context;
    private final ExecutionGraph executionGraph;
    private final ExecutionGraphHandler executionGraphHandler;
    private final OperatorCoordinatorHandler operatorCoordinatorHandler;
    private final KvStateHandler kvStateHandler;
    private final Logger logger;
    private final ClassLoader userCodeClassLoader;
    private final List<ExceptionHistoryEntry> failureCollection;
    private final VertexEndOfDataListener vertexEndOfDataListener;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph$Context.class */
    public interface Context extends StateTransitions.ToFinished {
        void runIfState(State state, Runnable runnable);

        boolean isState(State state);

        /* renamed from: getMainThreadExecutor */
        Executor mo658getMainThreadExecutor();

        void archiveFailure(RootExceptionHistoryEntry rootExceptionHistoryEntry);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StateWithExecutionGraph(Context context, ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Logger logger, ClassLoader classLoader, List<ExceptionHistoryEntry> list) {
        this.context = context;
        this.executionGraph = executionGraph;
        this.executionGraphHandler = executionGraphHandler;
        this.operatorCoordinatorHandler = operatorCoordinatorHandler;
        this.kvStateHandler = new KvStateHandler(executionGraph);
        this.logger = logger;
        this.userCodeClassLoader = classLoader;
        this.failureCollection = new ArrayList(list);
        this.vertexEndOfDataListener = new VertexEndOfDataListener(executionGraph);
        FutureUtils.assertNoException(executionGraph.getTerminationFuture().thenAcceptAsync(jobStatus -> {
            if (jobStatus.isGloballyTerminalState()) {
                context.runIfState(this, () -> {
                    Optional<RootExceptionHistoryEntry> convertFailures = convertFailures(this.failureCollection);
                    context.getClass();
                    convertFailures.ifPresent(context::archiveFailure);
                    onGloballyTerminalState(jobStatus);
                });
            }
        }, context.mo658getMainThreadExecutor()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ExecutionGraph getExecutionGraph() {
        return this.executionGraph;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public JobID getJobId() {
        return this.executionGraph.getJobID();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public OperatorCoordinatorHandler getOperatorCoordinatorHandler() {
        return this.operatorCoordinatorHandler;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ExecutionGraphHandler getExecutionGraphHandler() {
        return this.executionGraphHandler;
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.State
    public void onLeave(Class<? extends State> cls) {
        if (StateWithExecutionGraph.class.isAssignableFrom(cls)) {
            return;
        }
        this.operatorCoordinatorHandler.disposeAllOperatorCoordinators();
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.State
    public ArchivedExecutionGraph getJob() {
        return ArchivedExecutionGraph.createFrom(this.executionGraph, getJobStatus());
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.State
    public void suspend(Throwable th) {
        suspend(th, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void suspend(Throwable th, @Nullable JobStatus jobStatus) {
        this.executionGraph.suspend(th);
        Preconditions.checkState(this.executionGraph.getState().isTerminalState());
        this.context.goToFinished(ArchivedExecutionGraph.createFrom(this.executionGraph, jobStatus));
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.State
    public Logger getLogger() {
        return this.logger;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SerializedInputSplit requestNextInputSplit(JobVertexID jobVertexID, ExecutionAttemptID executionAttemptID) throws IOException {
        return this.executionGraphHandler.requestNextInputSplit(jobVertexID, executionAttemptID);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ExecutionState requestPartitionState(IntermediateDataSetID intermediateDataSetID, ResultPartitionID resultPartitionID) throws PartitionProducerDisposedException {
        return this.executionGraphHandler.requestPartitionState(intermediateDataSetID, resultPartitionID);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long j, CheckpointMetrics checkpointMetrics, TaskStateSnapshot taskStateSnapshot) {
        this.executionGraphHandler.acknowledgeCheckpoint(jobID, executionAttemptID, j, checkpointMetrics, taskStateSnapshot);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void declineCheckpoint(DeclineCheckpoint declineCheckpoint) {
        this.executionGraphHandler.declineCheckpoint(declineCheckpoint);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyEndOfData(ExecutionAttemptID executionAttemptID) {
        CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = this.executionGraph.getCheckpointCoordinatorConfiguration();
        if (checkpointCoordinatorConfiguration != null && checkpointCoordinatorConfiguration.isCheckpointingEnabled() && checkpointCoordinatorConfiguration.isEnableCheckpointsAfterTasksFinish()) {
            this.vertexEndOfDataListener.recordTaskEndOfData(executionAttemptID);
            if (this.vertexEndOfDataListener.areAllTasksOfJobVertexEndOfData(executionAttemptID.getJobVertexId())) {
                List<OperatorIDPair> operatorIDs = this.executionGraph.getJobVertex(executionAttemptID.getJobVertexId()).getOperatorIDs();
                CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
                if (checkpointCoordinator != null) {
                    Iterator<OperatorIDPair> it = operatorIDs.iterator();
                    while (it.hasNext()) {
                        checkpointCoordinator.setIsProcessingBacklog(it.next().getGeneratedOperatorID(), false);
                    }
                }
            }
            if (this.vertexEndOfDataListener.areAllTasksEndOfData()) {
                triggerCheckpoint(CheckpointType.CONFIGURED);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportCheckpointMetrics(ExecutionAttemptID executionAttemptID, long j, CheckpointMetrics checkpointMetrics) {
        this.executionGraphHandler.reportCheckpointMetrics(executionAttemptID, j, checkpointMetrics);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportInitializationMetrics(SubTaskInitializationMetrics subTaskInitializationMetrics) {
        this.executionGraphHandler.reportInitializationMetrics(subTaskInitializationMetrics);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
        this.executionGraph.updateAccumulators(accumulatorSnapshot);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KvStateLocation requestKvStateLocation(JobID jobID, String str) throws FlinkJobNotFoundException, UnknownKvStateLocation {
        return this.kvStateHandler.requestKvStateLocation(jobID, str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyKvStateRegistered(JobID jobID, JobVertexID jobVertexID, KeyGroupRange keyGroupRange, String str, KvStateID kvStateID, InetSocketAddress inetSocketAddress) throws FlinkJobNotFoundException {
        this.kvStateHandler.notifyKvStateRegistered(jobID, jobVertexID, keyGroupRange, str, kvStateID, inetSocketAddress);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyKvStateUnregistered(JobID jobID, JobVertexID jobVertexID, KeyGroupRange keyGroupRange, String str) throws FlinkJobNotFoundException {
        this.kvStateHandler.notifyKvStateUnregistered(jobID, jobVertexID, keyGroupRange, str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<String> triggerSavepoint(String str, boolean z, SavepointFormatType savepointFormatType) {
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        StopWithSavepointTerminationManager.checkSavepointActionPreconditions(checkpointCoordinator, str, getJobId(), this.logger);
        this.logger.info("Triggering {}savepoint for job {}.", z ? "cancel-with-" : "", this.executionGraph.getJobID());
        if (z) {
            checkpointCoordinator.stopCheckpointScheduler();
        }
        return checkpointCoordinator.triggerSavepoint(str, savepointFormatType).thenApply((v0) -> {
            return v0.getExternalPointer();
        }).handleAsync((BiFunction<? super U, Throwable, ? extends U>) (str2, th) -> {
            if (th != null) {
                if (z && this.context.isState(this)) {
                    startCheckpointScheduler(checkpointCoordinator);
                }
                throw new CompletionException(th);
            }
            if (z && this.context.isState(this)) {
                this.logger.info("Savepoint stored in {}. Now cancelling {}.", str2, this.executionGraph.getJobID());
                cancel();
            }
            return str2;
        }, this.context.mo658getMainThreadExecutor());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType) {
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        JobID jobID = this.executionGraph.getJobID();
        if (checkpointCoordinator == null) {
            throw new IllegalStateException(String.format("Job %s is not a streaming job.", jobID));
        }
        this.logger.info("Triggering a checkpoint for job {}.", jobID);
        return checkpointCoordinator.triggerCheckpoint(checkpointType).handleAsync((completedCheckpoint, th) -> {
            if (th != null) {
                throw new CompletionException(th);
            }
            return completedCheckpoint;
        }, this.context.mo658getMainThreadExecutor());
    }

    private void startCheckpointScheduler(CheckpointCoordinator checkpointCoordinator) {
        if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
            try {
                checkpointCoordinator.startCheckpointScheduler();
            } catch (IllegalStateException e) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deliverOperatorEventToCoordinator(ExecutionAttemptID executionAttemptID, OperatorID operatorID, OperatorEvent operatorEvent) throws FlinkException {
        this.operatorCoordinatorHandler.deliverOperatorEventToCoordinator(executionAttemptID, operatorID, operatorEvent);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(OperatorID operatorID, CoordinationRequest coordinationRequest) throws FlinkException {
        return this.operatorCoordinatorHandler.deliverCoordinationRequestToCoordinator(operatorID, coordinationRequest);
    }

    abstract void onFailure(Throwable th, CompletableFuture<Map<String, String>> completableFuture);

    abstract void onGloballyTerminalState(JobStatus jobStatus);

    @Override // org.apache.flink.runtime.scheduler.adaptive.LabeledGlobalFailureHandler
    public void handleGlobalFailure(Throwable th, CompletableFuture<Map<String, String>> completableFuture) {
        this.failureCollection.add(ExceptionHistoryEntry.createGlobal(th, completableFuture));
        onFailure(th, completableFuture);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionStateTransition, CompletableFuture<Map<String, String>> completableFuture) {
        Optional<AccessExecution> findExecution = this.executionGraph.findExecution(taskExecutionStateTransition.getID());
        Optional<String> findVertexWithAttempt = this.executionGraph.findVertexWithAttempt(taskExecutionStateTransition.getID());
        ExecutionState executionState = taskExecutionStateTransition.getExecutionState();
        boolean updateState = getExecutionGraph().updateState(taskExecutionStateTransition);
        if (updateState && executionState == ExecutionState.FAILED) {
            AccessExecution orElseThrow = findExecution.orElseThrow(NoSuchElementException::new);
            String orElseThrow2 = findVertexWithAttempt.orElseThrow(NoSuchElementException::new);
            if (orElseThrow.getState() == executionState) {
                this.failureCollection.add(ExceptionHistoryEntry.create(orElseThrow, orElseThrow2, completableFuture));
                onFailure(ErrorInfo.handleMissingThrowable(taskExecutionStateTransition.getError(this.userCodeClassLoader)), completableFuture);
            }
        }
        return updateState;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<ExceptionHistoryEntry> getFailures() {
        return this.failureCollection;
    }

    private static Optional<RootExceptionHistoryEntry> convertFailures(List<ExceptionHistoryEntry> list) {
        return list.isEmpty() ? Optional.empty() : Optional.of(RootExceptionHistoryEntry.fromExceptionHistoryEntry(list.remove(0), list));
    }
}
