package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculator;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator;
import org.apache.flink.runtime.checkpoint.ExecutionAttemptMappingProvider;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.entrypoint.ClusterEntryPointExceptionUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionGroupReleaseStrategy;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.scheduler.InternalFailuresListener;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.class */
public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionGraphAccessor {
    static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
    private final JobInformation jobInformation;
    private final Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey;
    private final ScheduledExecutorService futureExecutor;
    private final Executor ioExecutor;

    @Nonnull
    private ComponentMainThreadExecutor jobMasterMainThreadExecutor;
    private final Time rpcTimeout;
    private final ClassLoader userClassLoader;
    private final KvStateLocationRegistry kvStateLocationRegistry;
    private final BlobWriter blobWriter;
    private int numVerticesTotal;
    private final PartitionGroupReleaseStrategy.Factory partitionGroupReleaseStrategyFactory;
    private PartitionGroupReleaseStrategy partitionGroupReleaseStrategy;
    private DefaultExecutionTopology executionTopology;

    @Nullable
    private InternalFailuresListener internalTaskFailuresListener;
    private final TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint;
    private final int maxPriorAttemptsHistoryLength;
    private int numFinishedVertices;
    private Throwable failureCause;
    private ErrorInfo failureInfo;
    private final JobMasterPartitionTracker partitionTracker;
    private final ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker;

    @Nullable
    private CompletableFuture<Void> schedulingFuture;
    private final VertexAttemptNumberStore initialAttemptCounts;
    private final VertexParallelismStore parallelismStore;

    @Nullable
    private CheckpointCoordinator checkpointCoordinator;

    @Nullable
    private ScheduledExecutorService checkpointCoordinatorTimer;
    private CheckpointStatsTracker checkpointStatsTracker;

    @Nullable
    private String stateBackendName;

    @Nullable
    private String checkpointStorageName;
    private String jsonPlan;
    private final ShuffleMaster<?> shuffleMaster;
    private final ExecutionDeploymentListener executionDeploymentListener;
    private final ExecutionStateUpdateListener executionStateUpdateListener;
    private final EdgeManager edgeManager;
    private final Map<ExecutionVertexID, ExecutionVertex> executionVerticesById;
    private final Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitionsById;
    private boolean isStoppable = true;
    private final Counter numberOfRestartsCounter = new SimpleCounter();
    private volatile JobStatus state = JobStatus.CREATED;
    private final CompletableFuture<JobStatus> terminationFuture = new CompletableFuture<>();
    private final Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap(16);
    private final Map<IntermediateDataSetID, IntermediateResult> intermediateResults = new HashMap(16);
    private final List<ExecutionJobVertex> verticesInCreationOrder = new ArrayList(16);
    private final Map<ExecutionAttemptID, Execution> currentExecutions = new HashMap(16);
    private final List<JobStatusListener> jobStatusListeners = new ArrayList();
    private final long[] stateTimestamps = new long[JobStatus.values().length];

    public DefaultExecutionGraph(JobInformation jobInformation, ScheduledExecutorService scheduledExecutorService, Executor executor, Time time, int i, ClassLoader classLoader, BlobWriter blobWriter, PartitionGroupReleaseStrategy.Factory factory, ShuffleMaster<?> shuffleMaster, JobMasterPartitionTracker jobMasterPartitionTracker, TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint, ExecutionDeploymentListener executionDeploymentListener, ExecutionStateUpdateListener executionStateUpdateListener, long j, VertexAttemptNumberStore vertexAttemptNumberStore, VertexParallelismStore vertexParallelismStore) throws IOException {
        this.jobInformation = (JobInformation) Preconditions.checkNotNull(jobInformation);
        this.blobWriter = (BlobWriter) Preconditions.checkNotNull(blobWriter);
        this.partitionLocationConstraint = (TaskDeploymentDescriptorFactory.PartitionLocationConstraint) Preconditions.checkNotNull(partitionLocationConstraint);
        this.jobInformationOrBlobKey = BlobWriter.serializeAndTryOffload(jobInformation, jobInformation.getJobId(), blobWriter);
        this.futureExecutor = (ScheduledExecutorService) Preconditions.checkNotNull(scheduledExecutorService);
        this.ioExecutor = (Executor) Preconditions.checkNotNull(executor);
        this.userClassLoader = (ClassLoader) Preconditions.checkNotNull(classLoader, "userClassLoader");
        this.stateTimestamps[JobStatus.INITIALIZING.ordinal()] = j;
        this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
        this.rpcTimeout = (Time) Preconditions.checkNotNull(time);
        this.partitionGroupReleaseStrategyFactory = (PartitionGroupReleaseStrategy.Factory) Preconditions.checkNotNull(factory);
        this.kvStateLocationRegistry = new KvStateLocationRegistry(jobInformation.getJobId(), getAllVertices());
        this.maxPriorAttemptsHistoryLength = i;
        this.schedulingFuture = null;
        this.jobMasterMainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor("ExecutionGraph is not initialized with proper main thread executor. Call to ExecutionGraph.start(...) required.");
        this.shuffleMaster = (ShuffleMaster) Preconditions.checkNotNull(shuffleMaster);
        this.partitionTracker = (JobMasterPartitionTracker) Preconditions.checkNotNull(jobMasterPartitionTracker);
        this.resultPartitionAvailabilityChecker = new ExecutionGraphResultPartitionAvailabilityChecker(this::createResultPartitionId, jobMasterPartitionTracker);
        this.executionDeploymentListener = executionDeploymentListener;
        this.executionStateUpdateListener = executionStateUpdateListener;
        this.initialAttemptCounts = vertexAttemptNumberStore;
        this.parallelismStore = vertexParallelismStore;
        this.edgeManager = new EdgeManager();
        this.executionVerticesById = new HashMap();
        this.resultPartitionsById = new HashMap();
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public void start(@Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) {
        this.jobMasterMainThreadExecutor = componentMainThreadExecutor;
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public SchedulingTopology getSchedulingTopology() {
        return this.executionTopology;
    }

    @Override // org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor
    public TaskDeploymentDescriptorFactory.PartitionLocationConstraint getPartitionLocationConstraint() {
        return this.partitionLocationConstraint;
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph, org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor
    @Nonnull
    public ComponentMainThreadExecutor getJobMasterMainThreadExecutor() {
        return this.jobMasterMainThreadExecutor;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public Optional<String> getStateBackendName() {
        return Optional.ofNullable(this.stateBackendName);
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public Optional<String> getCheckpointStorageName() {
        return Optional.ofNullable(this.checkpointStorageName);
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public void enableCheckpointing(CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration, List<MasterTriggerRestoreHook<?>> list, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, StateBackend stateBackend, CheckpointStorage checkpointStorage, CheckpointStatsTracker checkpointStatsTracker, CheckpointsCleaner checkpointsCleaner) {
        Preconditions.checkState(this.state == JobStatus.CREATED, "Job must be in CREATED state");
        Preconditions.checkState(this.checkpointCoordinator == null, "checkpointing already enabled");
        Collection<OperatorCoordinatorCheckpointContext> buildOpCoordinatorCheckpointContexts = buildOpCoordinatorCheckpointContexts();
        this.checkpointStatsTracker = (CheckpointStatsTracker) Preconditions.checkNotNull(checkpointStatsTracker, "CheckpointStatsTracker");
        CheckpointFailureManager checkpointFailureManager = new CheckpointFailureManager(checkpointCoordinatorConfiguration.getTolerableCheckpointFailureNumber(), new CheckpointFailureManager.FailJobCallback() { // from class: org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.1
            @Override // org.apache.flink.runtime.checkpoint.CheckpointFailureManager.FailJobCallback
            public void failJob(Throwable th) {
                DefaultExecutionGraph.this.getJobMasterMainThreadExecutor().execute(() -> {
                    DefaultExecutionGraph.this.failGlobal(th);
                });
            }

            @Override // org.apache.flink.runtime.checkpoint.CheckpointFailureManager.FailJobCallback
            public void failJobDueToTaskFailure(Throwable th, ExecutionAttemptID executionAttemptID) {
                DefaultExecutionGraph.this.getJobMasterMainThreadExecutor().execute(() -> {
                    DefaultExecutionGraph.this.failGlobalIfExecutionIsStillRunning(th, executionAttemptID);
                });
            }
        });
        Preconditions.checkState(this.checkpointCoordinatorTimer == null);
        this.checkpointCoordinatorTimer = Executors.newSingleThreadScheduledExecutor(new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));
        this.checkpointCoordinator = new CheckpointCoordinator(this.jobInformation.getJobId(), checkpointCoordinatorConfiguration, buildOpCoordinatorCheckpointContexts, checkpointIDCounter, completedCheckpointStore, checkpointStorage, this.ioExecutor, checkpointsCleaner, new ScheduledExecutorServiceAdapter(this.checkpointCoordinatorTimer), SharedStateRegistry.DEFAULT_FACTORY, checkpointFailureManager, createCheckpointPlanCalculator(checkpointCoordinatorConfiguration.isEnableCheckpointsAfterTasksFinish()), new ExecutionAttemptMappingProvider(getAllExecutionVertices()));
        for (MasterTriggerRestoreHook<?> masterTriggerRestoreHook : list) {
            if (!this.checkpointCoordinator.addMasterHook(masterTriggerRestoreHook)) {
                LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", masterTriggerRestoreHook.getIdentifier());
            }
        }
        this.checkpointCoordinator.setCheckpointStatsTracker(this.checkpointStatsTracker);
        if (this.checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
            registerJobStatusListener(this.checkpointCoordinator.createActivatorDeactivator());
        }
        this.stateBackendName = stateBackend.getClass().getSimpleName();
        this.checkpointStorageName = checkpointStorage.getClass().getSimpleName();
    }

    private CheckpointPlanCalculator createCheckpointPlanCalculator(boolean z) {
        return new DefaultCheckpointPlanCalculator(getJobID(), new ExecutionGraphCheckpointPlanCalculatorContext(this), getVerticesTopologically(), z);
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    @Nullable
    public CheckpointCoordinator getCheckpointCoordinator() {
        return this.checkpointCoordinator;
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public KvStateLocationRegistry getKvStateLocationRegistry() {
        return this.kvStateLocationRegistry;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public CheckpointCoordinatorConfiguration getCheckpointCoordinatorConfiguration() {
        if (this.checkpointStatsTracker != null) {
            return this.checkpointStatsTracker.getJobCheckpointingConfiguration();
        }
        return null;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public CheckpointStatsSnapshot getCheckpointStatsSnapshot() {
        if (this.checkpointStatsTracker != null) {
            return this.checkpointStatsTracker.createSnapshot();
        }
        return null;
    }

    private Collection<OperatorCoordinatorCheckpointContext> buildOpCoordinatorCheckpointContexts() {
        ArrayList arrayList = new ArrayList();
        Iterator<ExecutionJobVertex> it = this.verticesInCreationOrder.iterator();
        while (it.hasNext()) {
            arrayList.addAll(it.next().getOperatorCoordinators());
        }
        arrayList.trimToSize();
        return arrayList;
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public void setJsonPlan(String str) {
        this.jsonPlan = str;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public String getJsonPlan() {
        return this.jsonPlan;
    }

    @Override // org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor
    public Either<SerializedValue<JobInformation>, PermanentBlobKey> getJobInformationOrBlobKey() {
        return this.jobInformationOrBlobKey;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public JobID getJobID() {
        return this.jobInformation.getJobId();
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public String getJobName() {
        return this.jobInformation.getJobName();
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public boolean isStoppable() {
        return this.isStoppable;
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public Configuration getJobConfiguration() {
        return this.jobInformation.getJobConfiguration();
    }

    @Override // org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor
    public ClassLoader getUserClassLoader() {
        return this.userClassLoader;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph, org.apache.flink.runtime.executiongraph.JobStatusProvider
    public JobStatus getState() {
        return this.state;
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public Throwable getFailureCause() {
        return this.failureCause;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public ErrorInfo getFailureInfo() {
        return this.failureInfo;
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public long getNumberOfRestarts() {
        return this.numberOfRestartsCounter.getCount();
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public int getNumFinishedVertices() {
        return this.numFinishedVertices;
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph, org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public ExecutionJobVertex getJobVertex(JobVertexID jobVertexID) {
        return this.tasks.get(jobVertexID);
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph, org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public Map<JobVertexID, ExecutionJobVertex> getAllVertices() {
        return Collections.unmodifiableMap(this.tasks);
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph, org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public Iterable<ExecutionJobVertex> getVerticesTopologically() {
        final int size = this.verticesInCreationOrder.size();
        return new Iterable<ExecutionJobVertex>() { // from class: org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.2
            @Override // java.lang.Iterable
            public Iterator<ExecutionJobVertex> iterator() {
                return new Iterator<ExecutionJobVertex>() { // from class: org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.2.1
                    private int pos = 0;

                    @Override // java.util.Iterator
                    public boolean hasNext() {
                        return this.pos < size;
                    }

                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.Iterator
                    public ExecutionJobVertex next() {
                        if (!hasNext()) {
                            throw new NoSuchElementException();
                        }
                        List list = DefaultExecutionGraph.this.verticesInCreationOrder;
                        int i = this.pos;
                        this.pos = i + 1;
                        return (ExecutionJobVertex) list.get(i);
                    }

                    @Override // java.util.Iterator
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
            }
        };
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public int getTotalNumberOfVertices() {
        return this.numVerticesTotal;
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public Map<IntermediateDataSetID, IntermediateResult> getAllIntermediateResults() {
        return Collections.unmodifiableMap(this.intermediateResults);
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph, org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public Iterable<ExecutionVertex> getAllExecutionVertices() {
        return () -> {
            return new AllVerticesIterator(getVerticesTopologically().iterator());
        };
    }

    @Override // org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor
    public EdgeManager getEdgeManager() {
        return this.edgeManager;
    }

    @Override // org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor
    public ExecutionVertex getExecutionVertexOrThrow(ExecutionVertexID executionVertexID) {
        return (ExecutionVertex) Preconditions.checkNotNull(this.executionVerticesById.get(executionVertexID));
    }

    @Override // org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor
    public IntermediateResultPartition getResultPartitionOrThrow(IntermediateResultPartitionID intermediateResultPartitionID) {
        return (IntermediateResultPartition) Preconditions.checkNotNull(this.resultPartitionsById.get(intermediateResultPartitionID));
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph, org.apache.flink.runtime.executiongraph.JobStatusProvider
    public long getStatusTimestamp(JobStatus jobStatus) {
        return this.stateTimestamps[jobStatus.ordinal()];
    }

    @Override // org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor
    public final BlobWriter getBlobWriter() {
        return this.blobWriter;
    }

    @Override // org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor
    public Executor getFutureExecutor() {
        return this.futureExecutor;
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public Map<String, OptionalFailure<Accumulator<?, ?>>> aggregateUserAccumulators() {
        HashMap hashMap = new HashMap();
        Iterator<ExecutionVertex> it = getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            Map<String, Accumulator<?, ?>> userAccumulators = it.next().getCurrentExecutionAttempt().getUserAccumulators();
            if (userAccumulators != null) {
                AccumulatorHelper.mergeInto(hashMap, userAccumulators);
            }
        }
        return hashMap;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public Map<String, SerializedValue<OptionalFailure<Object>>> getAccumulatorsSerialized() {
        return (Map) aggregateUserAccumulators().entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return serializeAccumulator((String) entry.getKey(), (OptionalFailure) entry.getValue());
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static SerializedValue<OptionalFailure<Object>> serializeAccumulator(String str, OptionalFailure<Accumulator<?, ?>> optionalFailure) {
        try {
            return optionalFailure.isFailure() ? new SerializedValue<>(OptionalFailure.ofFailure(optionalFailure.getFailureCause())) : new SerializedValue<>(OptionalFailure.of(((Accumulator) optionalFailure.getUnchecked()).getLocalValue()));
        } catch (IOException e) {
            LOG.error("Could not serialize accumulator " + str + '.', e);
            try {
                return new SerializedValue<>(OptionalFailure.ofFailure(e));
            } catch (IOException e2) {
                throw new RuntimeException("It should never happen that we cannot serialize the accumulator serialization exception.", e2);
            }
        }
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public StringifiedAccumulatorResult[] getAccumulatorResultsStringified() {
        return StringifiedAccumulatorResult.stringifyAccumulatorResults(aggregateUserAccumulators());
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public void setInternalTaskFailuresListener(InternalFailuresListener internalFailuresListener) {
        Preconditions.checkNotNull(internalFailuresListener);
        Preconditions.checkState(this.internalTaskFailuresListener == null, "internalTaskFailuresListener can be only set once");
        this.internalTaskFailuresListener = internalFailuresListener;
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public void attachJobGraph(List<JobVertex> list) throws JobException {
        assertRunningInJobMasterMainThread();
        LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} vertices and {} intermediate results.", new Object[]{Integer.valueOf(list.size()), Integer.valueOf(this.tasks.size()), Integer.valueOf(this.intermediateResults.size())});
        long currentTimeMillis = System.currentTimeMillis();
        for (JobVertex jobVertex : list) {
            if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
                this.isStoppable = false;
            }
            ExecutionJobVertex executionJobVertex = new ExecutionJobVertex(this, jobVertex, this.maxPriorAttemptsHistoryLength, this.rpcTimeout, currentTimeMillis, this.parallelismStore.getParallelismInfo(jobVertex.getID()), this.initialAttemptCounts.getAttemptCounts(jobVertex.getID()));
            executionJobVertex.connectToPredecessors(this.intermediateResults);
            ExecutionJobVertex putIfAbsent = this.tasks.putIfAbsent(jobVertex.getID(), executionJobVertex);
            if (putIfAbsent != null) {
                throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]", jobVertex.getID(), executionJobVertex, putIfAbsent));
            }
            for (IntermediateResult intermediateResult : executionJobVertex.getProducedDataSets()) {
                IntermediateResult putIfAbsent2 = this.intermediateResults.putIfAbsent(intermediateResult.getId(), intermediateResult);
                if (putIfAbsent2 != null) {
                    throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]", intermediateResult.getId(), intermediateResult, putIfAbsent2));
                }
            }
            this.verticesInCreationOrder.add(executionJobVertex);
            this.numVerticesTotal += executionJobVertex.getParallelism();
        }
        registerExecutionVerticesAndResultPartitions(this.verticesInCreationOrder);
        this.executionTopology = DefaultExecutionTopology.fromExecutionGraph(this);
        this.partitionGroupReleaseStrategy = this.partitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology());
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public void transitionToRunning() {
        if (!transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
            throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
        }
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public void cancel() {
        assertRunningInJobMasterMainThread();
        while (true) {
            JobStatus jobStatus = this.state;
            if (jobStatus == JobStatus.RUNNING || jobStatus == JobStatus.CREATED || jobStatus == JobStatus.RESTARTING) {
                if (transitionState(jobStatus, JobStatus.CANCELLING)) {
                    incrementRestarts();
                    CompletableFuture<Void> completableFuture = this.schedulingFuture;
                    if (completableFuture != null) {
                        completableFuture.cancel(false);
                    }
                    cancelVerticesAsync().whenComplete((r9, th) -> {
                        if (th != null) {
                            transitionState(JobStatus.CANCELLING, JobStatus.FAILED, new FlinkException("Could not cancel job " + getJobName() + " because not all execution job vertices could be cancelled.", th));
                        } else {
                            allVerticesInTerminalState();
                        }
                    });
                    return;
                }
            } else if (jobStatus != JobStatus.FAILING || transitionState(jobStatus, JobStatus.CANCELLING)) {
                return;
            }
        }
    }

    @VisibleForTesting
    protected FutureUtils.ConjunctFuture<Void> cancelVerticesAsync() {
        ArrayList arrayList = new ArrayList(this.verticesInCreationOrder.size());
        Iterator<ExecutionJobVertex> it = this.verticesInCreationOrder.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().cancelWithFuture());
        }
        return FutureUtils.waitForAll(arrayList);
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public void suspend(Throwable th) {
        assertRunningInJobMasterMainThread();
        if (this.state.isTerminalState()) {
            return;
        }
        if (!transitionState(this.state, JobStatus.SUSPENDED, th)) {
            throw new IllegalStateException(String.format("Could not suspend because transition from %s to %s failed.", this.state, JobStatus.SUSPENDED));
        }
        initFailureCause(th, System.currentTimeMillis());
        incrementRestarts();
        if (this.schedulingFuture != null) {
            this.schedulingFuture.cancel(false);
        }
        ArrayList arrayList = new ArrayList(this.verticesInCreationOrder.size());
        Iterator<ExecutionJobVertex> it = this.verticesInCreationOrder.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().suspend());
        }
        FutureUtils.ConjunctFuture waitForAll = FutureUtils.waitForAll(arrayList);
        Preconditions.checkState(waitForAll.isDone(), "Suspend needs to happen atomically");
        waitForAll.whenComplete((r5, th2) -> {
            if (th2 != null) {
                LOG.debug("Could not properly suspend the execution graph.", th2);
            }
            onTerminalState(this.state);
            LOG.info("Job {} has been suspended.", getJobID());
        });
    }

    void failGlobalIfExecutionIsStillRunning(Throwable th, ExecutionAttemptID executionAttemptID) {
        Execution execution = this.currentExecutions.get(executionAttemptID);
        if (execution == null || !(execution.getState() == ExecutionState.RUNNING || execution.getState() == ExecutionState.INITIALIZING)) {
            LOG.debug("The failing attempt {} belongs to an already not running task thus won't fail the job", executionAttemptID);
        } else {
            failGlobal(th);
        }
    }

    @Override // org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor
    public void failGlobal(Throwable th) {
        Preconditions.checkState(this.internalTaskFailuresListener != null);
        this.internalTaskFailuresListener.notifyGlobalFailure(th);
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionGraph
    public ArchivedExecutionConfig getArchivedExecutionConfig() {
        try {
            ExecutionConfig executionConfig = (ExecutionConfig) this.jobInformation.getSerializedExecutionConfig().deserializeValue(this.userClassLoader);
            if (executionConfig != null) {
                return executionConfig.archive();
            }
            return null;
        } catch (IOException | ClassNotFoundException e) {
            LOG.error("Couldn't create ArchivedExecutionConfig for job {} ", getJobID(), e);
            return null;
        }
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public CompletableFuture<JobStatus> getTerminationFuture() {
        return this.terminationFuture;
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    @VisibleForTesting
    public JobStatus waitUntilTerminal() throws InterruptedException {
        try {
            return this.terminationFuture.get();
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public boolean transitionState(JobStatus jobStatus, JobStatus jobStatus2) {
        return transitionState(jobStatus, jobStatus2, null);
    }

    private void transitionState(JobStatus jobStatus, Throwable th) {
        transitionState(this.state, jobStatus, th);
    }

    private boolean transitionState(JobStatus jobStatus, JobStatus jobStatus2, Throwable th) {
        assertRunningInJobMasterMainThread();
        if (jobStatus.isTerminalState()) {
            String str = "Job is trying to leave terminal state " + jobStatus;
            LOG.error(str);
            throw new IllegalStateException(str);
        }
        if (this.state != jobStatus) {
            return false;
        }
        this.state = jobStatus2;
        LOG.info("Job {} ({}) switched from state {} to {}.", new Object[]{getJobName(), getJobID(), jobStatus, jobStatus2, th});
        this.stateTimestamps[jobStatus2.ordinal()] = System.currentTimeMillis();
        notifyJobStatusChange(jobStatus2, th);
        return true;
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public void incrementRestarts() {
        this.numberOfRestartsCounter.inc();
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public void initFailureCause(Throwable th, long j) {
        this.failureCause = th;
        this.failureInfo = new ErrorInfo(th, j);
    }

    @Override // org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor
    public void vertexFinished() {
        assertRunningInJobMasterMainThread();
        int i = this.numFinishedVertices + 1;
        this.numFinishedVertices = i;
        if (i == this.numVerticesTotal && this.state == JobStatus.RUNNING) {
            try {
                Iterator<ExecutionJobVertex> it = this.verticesInCreationOrder.iterator();
                while (it.hasNext()) {
                    it.next().getJobVertex().finalizeOnMaster(getUserClassLoader());
                }
                if (transitionState(JobStatus.RUNNING, JobStatus.FINISHED)) {
                    onTerminalState(JobStatus.FINISHED);
                }
            } catch (Throwable th) {
                ExceptionUtils.rethrowIfFatalError(th);
                ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(th);
                failGlobal(new Exception("Failed to finalize execution on master", th));
            }
        }
    }

    @Override // org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor
    public void vertexUnFinished() {
        assertRunningInJobMasterMainThread();
        this.numFinishedVertices--;
    }

    private void allVerticesInTerminalState() {
        assertRunningInJobMasterMainThread();
        while (true) {
            JobStatus jobStatus = this.state;
            if (jobStatus == JobStatus.RUNNING) {
                failGlobal(new Exception("ExecutionGraph went into allVerticesInTerminalState() from RUNNING"));
            } else {
                if (jobStatus != JobStatus.CANCELLING) {
                    if (jobStatus == JobStatus.FAILING) {
                        return;
                    }
                    if (jobStatus.isGloballyTerminalState()) {
                        LOG.warn("Job has entered globally terminal state without waiting for all job vertices to reach final state.");
                        return;
                    } else {
                        failGlobal(new Exception("ExecutionGraph went into final state from state " + jobStatus));
                        return;
                    }
                }
                if (transitionState(jobStatus, JobStatus.CANCELED)) {
                    onTerminalState(JobStatus.CANCELED);
                    return;
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public void failJob(Throwable th, long j) {
        if (this.state == JobStatus.FAILING || this.state.isTerminalState()) {
            return;
        }
        transitionState(JobStatus.FAILING, th);
        initFailureCause(th, j);
        FutureUtils.assertNoException(cancelVerticesAsync().whenComplete((r7, th2) -> {
            if (transitionState(JobStatus.FAILING, JobStatus.FAILED, th)) {
                onTerminalState(JobStatus.FAILED);
            } else if (this.state == JobStatus.CANCELLING) {
                transitionState(JobStatus.CANCELLING, JobStatus.CANCELED);
                onTerminalState(JobStatus.CANCELED);
            } else if (!this.state.isTerminalState()) {
                throw new IllegalStateException("Cannot complete job failing from an unexpected state: " + this.state);
            }
        }));
    }

    private void onTerminalState(JobStatus jobStatus) {
        LOG.debug("ExecutionGraph {} reached terminal state {}.", getJobID(), jobStatus);
        try {
            CheckpointCoordinator checkpointCoordinator = this.checkpointCoordinator;
            this.checkpointCoordinator = null;
            if (checkpointCoordinator != null) {
                checkpointCoordinator.shutdown();
            }
            if (this.checkpointCoordinatorTimer != null) {
                this.checkpointCoordinatorTimer.shutdownNow();
                this.checkpointCoordinatorTimer = null;
            }
        } catch (Exception e) {
            LOG.error("Error while cleaning up after execution", e);
        } finally {
            this.terminationFuture.complete(jobStatus);
        }
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public boolean updateState(TaskExecutionStateTransition taskExecutionStateTransition) {
        assertRunningInJobMasterMainThread();
        Execution execution = this.currentExecutions.get(taskExecutionStateTransition.getID());
        if (execution == null) {
            return false;
        }
        try {
            boolean updateStateInternal = updateStateInternal(taskExecutionStateTransition, execution);
            maybeReleasePartitionGroupsFor(execution);
            return updateStateInternal;
        } catch (Throwable th) {
            ExceptionUtils.rethrowIfFatalErrorOrOOM(th);
            failGlobal(th);
            return false;
        }
    }

    private boolean updateStateInternal(TaskExecutionStateTransition taskExecutionStateTransition, Execution execution) {
        switch (taskExecutionStateTransition.getExecutionState()) {
            case INITIALIZING:
                return execution.switchToRecovering();
            case RUNNING:
                return execution.switchToRunning();
            case FINISHED:
                execution.markFinished(deserializeAccumulators(taskExecutionStateTransition), taskExecutionStateTransition.getIOMetrics());
                return true;
            case CANCELED:
                execution.completeCancelling(deserializeAccumulators(taskExecutionStateTransition), taskExecutionStateTransition.getIOMetrics(), false);
                return true;
            case FAILED:
                execution.markFailed(taskExecutionStateTransition.getError(this.userClassLoader), taskExecutionStateTransition.getCancelTask(), deserializeAccumulators(taskExecutionStateTransition), taskExecutionStateTransition.getIOMetrics(), taskExecutionStateTransition.getReleasePartitions(), true);
                return true;
            default:
                execution.fail(new Exception("TaskManager sent illegal state update: " + taskExecutionStateTransition.getExecutionState()));
                return false;
        }
    }

    private void maybeReleasePartitionGroupsFor(Execution execution) {
        ExecutionVertexID id = execution.getVertex().getID();
        if (execution.getState() == ExecutionState.FINISHED) {
            releasePartitionGroups(this.partitionGroupReleaseStrategy.vertexFinished(id));
        } else {
            this.partitionGroupReleaseStrategy.vertexUnfinished(id);
        }
    }

    private void releasePartitionGroups(List<ConsumedPartitionGroup> list) {
        if (list.size() > 0) {
            for (ConsumedPartitionGroup consumedPartitionGroup : list) {
                ((IntermediateResult) Preconditions.checkNotNull(this.intermediateResults.get(consumedPartitionGroup.getIntermediateDataSetID()))).clearCachedInformationForPartitionGroup(consumedPartitionGroup);
            }
            this.partitionTracker.stopTrackingAndReleasePartitions((List) list.stream().flatMap((v0) -> {
                return IterableUtils.toStream(v0);
            }).map(this::createResultPartitionId).collect(Collectors.toList()));
        }
    }

    ResultPartitionID createResultPartitionId(IntermediateResultPartitionID intermediateResultPartitionID) {
        ExecutionVertexID id = getSchedulingTopology().getResultPartition(intermediateResultPartitionID).getProducer2().getId();
        JobVertexID jobVertexId = id.getJobVertexId();
        ExecutionJobVertex jobVertex = getJobVertex(jobVertexId);
        Preconditions.checkNotNull(jobVertex, "Unknown job vertex %s", new Object[]{jobVertexId});
        ExecutionVertex[] taskVertices = jobVertex.getTaskVertices();
        int subtaskIndex = id.getSubtaskIndex();
        Preconditions.checkState(subtaskIndex < taskVertices.length, "Invalid subtask index %d for job vertex %s", new Object[]{Integer.valueOf(subtaskIndex), jobVertexId});
        return new ResultPartitionID(intermediateResultPartitionID, taskVertices[subtaskIndex].getCurrentExecutionAttempt().getAttemptId());
    }

    private Map<String, Accumulator<?, ?>> deserializeAccumulators(TaskExecutionStateTransition taskExecutionStateTransition) {
        AccumulatorSnapshot accumulators = taskExecutionStateTransition.getAccumulators();
        if (accumulators == null) {
            return null;
        }
        try {
            return accumulators.deserializeUserAccumulators(this.userClassLoader);
        } catch (Throwable th) {
            LOG.error("Failed to deserialize final accumulator results.", th);
            return null;
        }
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public void notifyPartitionDataAvailable(ResultPartitionID resultPartitionID) {
        assertRunningInJobMasterMainThread();
        Execution execution = this.currentExecutions.get(resultPartitionID.getProducerId());
        Preconditions.checkState(execution != null, "Cannot find execution for execution Id " + resultPartitionID.getPartitionId() + ScopeFormat.SCOPE_SEPARATOR);
        execution.getVertex().notifyPartitionDataAvailable(resultPartitionID);
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public Map<ExecutionAttemptID, Execution> getRegisteredExecutions() {
        return Collections.unmodifiableMap(this.currentExecutions);
    }

    @Override // org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor
    public void registerExecution(Execution execution) {
        assertRunningInJobMasterMainThread();
        if (this.currentExecutions.putIfAbsent(execution.getAttemptId(), execution) != null) {
            failGlobal(new Exception("Trying to register execution " + execution + " for already used ID " + execution.getAttemptId()));
        }
    }

    @Override // org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor
    public void deregisterExecution(Execution execution) {
        assertRunningInJobMasterMainThread();
        Execution remove = this.currentExecutions.remove(execution.getAttemptId());
        if (remove == null || remove == execution) {
            return;
        }
        failGlobal(new Exception("De-registering execution " + execution + " failed. Found for same ID execution " + remove));
    }

    private void registerExecutionVerticesAndResultPartitions(List<ExecutionJobVertex> list) {
        Iterator<ExecutionJobVertex> it = list.iterator();
        while (it.hasNext()) {
            for (ExecutionVertex executionVertex : it.next().getTaskVertices()) {
                this.executionVerticesById.put(executionVertex.getID(), executionVertex);
                this.resultPartitionsById.putAll(executionVertex.getProducedPartitions());
            }
        }
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
        try {
            Map<String, Accumulator<?, ?>> deserializeUserAccumulators = accumulatorSnapshot.deserializeUserAccumulators(this.userClassLoader);
            ExecutionAttemptID executionAttemptID = accumulatorSnapshot.getExecutionAttemptID();
            Execution execution = this.currentExecutions.get(executionAttemptID);
            if (execution != null) {
                execution.setAccumulators(deserializeUserAccumulators);
            } else {
                LOG.debug("Received accumulator result for unknown execution {}.", executionAttemptID);
            }
        } catch (Exception e) {
            LOG.error("Cannot update accumulators for job {}.", getJobID(), e);
        }
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public void registerJobStatusListener(JobStatusListener jobStatusListener) {
        if (jobStatusListener != null) {
            this.jobStatusListeners.add(jobStatusListener);
        }
    }

    private void notifyJobStatusChange(JobStatus jobStatus, Throwable th) {
        if (this.jobStatusListeners.size() > 0) {
            long currentTimeMillis = System.currentTimeMillis();
            SerializedThrowable serializedThrowable = th == null ? null : new SerializedThrowable(th);
            Iterator<JobStatusListener> it = this.jobStatusListeners.iterator();
            while (it.hasNext()) {
                try {
                    it.next().jobStatusChanges(getJobID(), jobStatus, currentTimeMillis, serializedThrowable);
                } catch (Throwable th2) {
                    LOG.warn("Error while notifying JobStatusListener", th2);
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor
    public void notifyExecutionChange(Execution execution, ExecutionState executionState) {
        this.executionStateUpdateListener.onStateUpdate(execution.getAttemptId(), executionState);
    }

    private void assertRunningInJobMasterMainThread() {
        if (this.jobMasterMainThreadExecutor instanceof ComponentMainThreadExecutor.DummyComponentMainThreadExecutor) {
            return;
        }
        this.jobMasterMainThreadExecutor.assertRunningInMainThread();
    }

    @Override // org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor
    public void notifySchedulerNgAboutInternalTaskFailure(ExecutionAttemptID executionAttemptID, Throwable th, boolean z, boolean z2) {
        Preconditions.checkState(this.internalTaskFailuresListener != null);
        this.internalTaskFailuresListener.notifyTaskFailure(executionAttemptID, th, z, z2);
    }

    @Override // org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor
    public void deleteBlobs(List<PermanentBlobKey> list) {
        CompletableFuture.runAsync(() -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                this.blobWriter.deletePermanent(getJobID(), (PermanentBlobKey) it.next());
            }
        }, this.ioExecutor);
    }

    @Override // org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor
    public ShuffleMaster<?> getShuffleMaster() {
        return this.shuffleMaster;
    }

    @Override // org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor
    public JobMasterPartitionTracker getPartitionTracker() {
        return this.partitionTracker;
    }

    @Override // org.apache.flink.runtime.executiongraph.ExecutionGraph
    public ResultPartitionAvailabilityChecker getResultPartitionAvailabilityChecker() {
        return this.resultPartitionAvailabilityChecker;
    }

    @Override // org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor
    public PartitionGroupReleaseStrategy getPartitionGroupReleaseStrategy() {
        return this.partitionGroupReleaseStrategy;
    }

    @Override // org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor
    public ExecutionDeploymentListener getExecutionDeploymentListener() {
        return this.executionDeploymentListener;
    }
}
