package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
import org.apache.flink.types.Either;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionJobVertex.class */
public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable<ArchivedExecutionJobVertex> {
    private static final Logger LOG = DefaultExecutionGraph.LOG;
    private final InternalExecutionGraphAccessor graph;
    private final JobVertex jobVertex;

    @Nullable
    private ExecutionVertex[] taskVertices;

    @Nullable
    private IntermediateResult[] producedDataSets;

    @Nullable
    private List<IntermediateResult> inputs;
    private final VertexParallelismInformation parallelismInfo;
    private final SlotSharingGroup slotSharingGroup;

    @Nullable
    private final CoLocationGroup coLocationGroup;

    @Nullable
    private InputSplit[] inputSplits;
    private final ResourceProfile resourceProfile;
    private int numExecutionVertexFinished;
    private final Collection<OperatorCoordinatorHolder> operatorCoordinators;

    @Nullable
    private InputSplitAssigner splitAssigner;
    private final Object stateMonitor = new Object();
    private Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInformationOrBlobKey = null;

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionJobVertex$Factory.class */
    public static class Factory {
        /* JADX INFO: Access modifiers changed from: package-private */
        public ExecutionJobVertex createExecutionJobVertex(InternalExecutionGraphAccessor internalExecutionGraphAccessor, JobVertex jobVertex, VertexParallelismInformation vertexParallelismInformation, CoordinatorStore coordinatorStore, JobManagerJobMetricGroup jobManagerJobMetricGroup) throws JobException {
            return new ExecutionJobVertex(internalExecutionGraphAccessor, jobVertex, vertexParallelismInformation, coordinatorStore, jobManagerJobMetricGroup);
        }
    }

    @VisibleForTesting
    public ExecutionJobVertex(InternalExecutionGraphAccessor internalExecutionGraphAccessor, JobVertex jobVertex, VertexParallelismInformation vertexParallelismInformation, CoordinatorStore coordinatorStore, JobManagerJobMetricGroup jobManagerJobMetricGroup) throws JobException {
        if (internalExecutionGraphAccessor == null || jobVertex == null) {
            throw new NullPointerException();
        }
        this.graph = internalExecutionGraphAccessor;
        this.jobVertex = jobVertex;
        this.parallelismInfo = vertexParallelismInformation;
        if (this.parallelismInfo.getParallelism() > this.parallelismInfo.getMaxParallelism()) {
            throw new JobException(String.format("Vertex %s's parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.", jobVertex.getName(), Integer.valueOf(this.parallelismInfo.getParallelism()), Integer.valueOf(this.parallelismInfo.getMaxParallelism())));
        }
        this.resourceProfile = ResourceProfile.fromResourceSpec(jobVertex.getMinResources(), MemorySize.ZERO);
        this.slotSharingGroup = (SlotSharingGroup) Preconditions.checkNotNull(jobVertex.getSlotSharingGroup());
        this.coLocationGroup = jobVertex.getCoLocationGroup();
        List<SerializedValue<OperatorCoordinator.Provider>> operatorCoordinators = getJobVertex().getOperatorCoordinators();
        if (operatorCoordinators.isEmpty()) {
            this.operatorCoordinators = Collections.emptyList();
            return;
        }
        ArrayList arrayList = new ArrayList(operatorCoordinators.size());
        try {
            Iterator<SerializedValue<OperatorCoordinator.Provider>> it = operatorCoordinators.iterator();
            while (it.hasNext()) {
                arrayList.add(createOperatorCoordinatorHolder(it.next(), internalExecutionGraphAccessor.getUserClassLoader(), coordinatorStore, jobManagerJobMetricGroup));
            }
            this.operatorCoordinators = Collections.unmodifiableList(arrayList);
        } catch (Exception | LinkageError e) {
            IOUtils.closeAllQuietly(arrayList);
            throw new JobException("Cannot instantiate the coordinator for operator " + getName(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Finally extract failed */
    public void initialize(int i, Time time, long j, SubtaskAttemptNumberStore subtaskAttemptNumberStore) throws JobException {
        Preconditions.checkState(this.parallelismInfo.getParallelism() > 0);
        Preconditions.checkState(!isInitialized());
        this.taskVertices = new ExecutionVertex[this.parallelismInfo.getParallelism()];
        this.inputs = new ArrayList(this.jobVertex.getInputs().size());
        this.producedDataSets = new IntermediateResult[this.jobVertex.getNumberOfProducedIntermediateDataSets()];
        for (int i2 = 0; i2 < this.jobVertex.getProducedDataSets().size(); i2++) {
            IntermediateDataSet intermediateDataSet = this.jobVertex.getProducedDataSets().get(i2);
            this.producedDataSets[i2] = new IntermediateResult(intermediateDataSet, this, this.parallelismInfo.getParallelism(), intermediateDataSet.getResultType());
        }
        for (int i3 = 0; i3 < this.parallelismInfo.getParallelism(); i3++) {
            this.taskVertices[i3] = createExecutionVertex(this, i3, this.producedDataSets, time, j, i, subtaskAttemptNumberStore.getAttemptCount(i3));
        }
        for (IntermediateResult intermediateResult : this.producedDataSets) {
            if (intermediateResult.getNumberOfAssignedPartitions() != this.parallelismInfo.getParallelism()) {
                throw new RuntimeException("The intermediate result's partitions were not correctly assigned.");
            }
        }
        try {
            InputSplitSource<?> inputSplitSource = this.jobVertex.getInputSplitSource();
            if (inputSplitSource != null) {
                Thread currentThread = Thread.currentThread();
                ClassLoader contextClassLoader = currentThread.getContextClassLoader();
                currentThread.setContextClassLoader(this.graph.getUserClassLoader());
                try {
                    this.inputSplits = inputSplitSource.createInputSplits(this.parallelismInfo.getParallelism());
                    if (this.inputSplits != null) {
                        this.splitAssigner = inputSplitSource.getInputSplitAssigner(this.inputSplits);
                    }
                    currentThread.setContextClassLoader(contextClassLoader);
                } catch (Throwable th) {
                    currentThread.setContextClassLoader(contextClassLoader);
                    throw th;
                }
            } else {
                this.inputSplits = null;
            }
        } catch (Throwable th2) {
            throw new JobException("Creating the input splits caused an error: " + th2.getMessage(), th2);
        }
    }

    protected ExecutionVertex createExecutionVertex(ExecutionJobVertex executionJobVertex, int i, IntermediateResult[] intermediateResultArr, Time time, long j, int i2, int i3) {
        return new ExecutionVertex(executionJobVertex, i, intermediateResultArr, time, j, i2, i3);
    }

    protected OperatorCoordinatorHolder createOperatorCoordinatorHolder(SerializedValue<OperatorCoordinator.Provider> serializedValue, ClassLoader classLoader, CoordinatorStore coordinatorStore, JobManagerJobMetricGroup jobManagerJobMetricGroup) throws Exception {
        return OperatorCoordinatorHolder.create(serializedValue, this, classLoader, coordinatorStore, false, getTaskInformation(), jobManagerJobMetricGroup);
    }

    public boolean isInitialized() {
        return this.taskVertices != null;
    }

    public boolean isParallelismDecided() {
        return this.parallelismInfo.getParallelism() > 0;
    }

    public List<OperatorIDPair> getOperatorIDs() {
        return this.jobVertex.getOperatorIDs();
    }

    public void setMaxParallelism(int i) {
        this.parallelismInfo.setMaxParallelism(i);
    }

    public InternalExecutionGraphAccessor getGraph() {
        return this.graph;
    }

    public void setParallelism(int i) {
        this.parallelismInfo.setParallelism(i);
    }

    public JobVertex getJobVertex() {
        return this.jobVertex;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex
    public String getName() {
        return getJobVertex().getName();
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex
    public int getParallelism() {
        return this.parallelismInfo.getParallelism();
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex
    public int getMaxParallelism() {
        return this.parallelismInfo.getMaxParallelism();
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex
    public ResourceProfile getResourceProfile() {
        return this.resourceProfile;
    }

    public boolean canRescaleMaxParallelism(int i) {
        return this.parallelismInfo.canRescaleMaxParallelism(i);
    }

    public JobID getJobId() {
        return this.graph.getJobID();
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex
    public JobVertexID getJobVertexId() {
        return this.jobVertex.getID();
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex
    public ExecutionVertex[] getTaskVertices() {
        if (this.taskVertices != null) {
            return this.taskVertices;
        }
        LOG.debug("Trying to get execution vertices of an uninitialized job vertex " + getJobVertexId());
        return new ExecutionVertex[0];
    }

    public IntermediateResult[] getProducedDataSets() {
        Preconditions.checkState(isInitialized());
        return this.producedDataSets;
    }

    public InputSplitAssigner getSplitAssigner() {
        Preconditions.checkState(isInitialized());
        return this.splitAssigner;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex
    public SlotSharingGroup getSlotSharingGroup() {
        return this.slotSharingGroup;
    }

    @Nullable
    public CoLocationGroup getCoLocationGroup() {
        return this.coLocationGroup;
    }

    public List<IntermediateResult> getInputs() {
        Preconditions.checkState(isInitialized());
        return this.inputs;
    }

    public Collection<OperatorCoordinatorHolder> getOperatorCoordinators() {
        Preconditions.checkState(isInitialized());
        return this.operatorCoordinators;
    }

    public List<SourceCoordinator<?, ?>> getSourceCoordinators() {
        ArrayList arrayList = new ArrayList();
        for (OperatorCoordinatorHolder operatorCoordinatorHolder : this.operatorCoordinators) {
            if (operatorCoordinatorHolder.coordinator() instanceof RecreateOnResetOperatorCoordinator) {
                RecreateOnResetOperatorCoordinator recreateOnResetOperatorCoordinator = (RecreateOnResetOperatorCoordinator) operatorCoordinatorHolder.coordinator();
                try {
                    if (recreateOnResetOperatorCoordinator.getInternalCoordinator() instanceof SourceCoordinator) {
                        arrayList.add((SourceCoordinator) recreateOnResetOperatorCoordinator.getInternalCoordinator());
                    }
                } catch (Throwable th) {
                    throw new RuntimeException("Unexpected error occurred when get sourceCoordinators.", th);
                }
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getNumExecutionVertexFinished() {
        return this.numExecutionVertexFinished;
    }

    public Either<SerializedValue<TaskInformation>, PermanentBlobKey> getTaskInformationOrBlobKey() throws IOException {
        Either<SerializedValue<TaskInformation>, PermanentBlobKey> either;
        synchronized (this.stateMonitor) {
            if (this.taskInformationOrBlobKey == null) {
                this.taskInformationOrBlobKey = BlobWriter.serializeAndTryOffload(getTaskInformation(), getJobId(), this.graph.getBlobWriter());
            }
            either = this.taskInformationOrBlobKey;
        }
        return either;
    }

    public TaskInformation getTaskInformation() {
        return new TaskInformation(this.jobVertex.getID(), this.jobVertex.getName(), this.parallelismInfo.getParallelism(), this.parallelismInfo.getMaxParallelism(), this.jobVertex.getInvokableClassName(), this.jobVertex.getConfiguration());
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex
    public ExecutionState getAggregateState() {
        int[] iArr = new int[ExecutionState.values().length];
        for (ExecutionVertex executionVertex : getTaskVertices()) {
            int ordinal = executionVertex.getExecutionState().ordinal();
            iArr[ordinal] = iArr[ordinal] + 1;
        }
        return getAggregateJobVertexState(iArr, this.parallelismInfo.getParallelism());
    }

    public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> map) throws JobException {
        Preconditions.checkState(isInitialized());
        List<JobEdge> inputs = this.jobVertex.getInputs();
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.", this.jobVertex.getID(), this.jobVertex.getName(), Integer.valueOf(inputs.size())));
        }
        for (int i = 0; i < inputs.size(); i++) {
            JobEdge jobEdge = inputs.get(i);
            if (LOG.isDebugEnabled()) {
                if (jobEdge.getSource() == null) {
                    LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.", Integer.valueOf(i), this.jobVertex.getID(), this.jobVertex.getName(), jobEdge.getSourceId()));
                } else {
                    LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).", Integer.valueOf(i), this.jobVertex.getID(), this.jobVertex.getName(), jobEdge.getSource().getProducer().getID(), jobEdge.getSource().getProducer().getName()));
                }
            }
            IntermediateResult intermediateResult = map.get(jobEdge.getSourceId());
            if (intermediateResult == null) {
                throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID " + jobEdge.getSourceId());
            }
            this.inputs.add(intermediateResult);
            EdgeManagerBuildUtil.connectVertexToResult(this, intermediateResult);
        }
    }

    public void cancel() {
        for (ExecutionVertex executionVertex : getTaskVertices()) {
            executionVertex.cancel();
        }
    }

    public CompletableFuture<Void> cancelWithFuture() {
        return FutureUtils.waitForAll(mapExecutionVertices((v0) -> {
            return v0.cancel();
        }));
    }

    public CompletableFuture<Void> suspend() {
        return FutureUtils.waitForAll(mapExecutionVertices((v0) -> {
            return v0.suspend();
        }));
    }

    @Nonnull
    private Collection<CompletableFuture<?>> mapExecutionVertices(Function<ExecutionVertex, CompletableFuture<?>> function) {
        return (Collection) Arrays.stream(getTaskVertices()).map(function).collect(Collectors.toList());
    }

    public void fail(Throwable th) {
        for (ExecutionVertex executionVertex : getTaskVertices()) {
            executionVertex.fail(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void executionVertexFinished() {
        Preconditions.checkState(isInitialized());
        this.numExecutionVertexFinished++;
        if (this.numExecutionVertexFinished == this.parallelismInfo.getParallelism()) {
            getGraph().jobVertexFinished();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void executionVertexUnFinished() {
        Preconditions.checkState(isInitialized());
        if (this.numExecutionVertexFinished == this.parallelismInfo.getParallelism()) {
            getGraph().jobVertexUnFinished();
        }
        this.numExecutionVertexFinished--;
    }

    public boolean isFinished() {
        return isParallelismDecided() && this.numExecutionVertexFinished == this.parallelismInfo.getParallelism();
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex
    public StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified() {
        HashMap hashMap = new HashMap();
        for (ExecutionVertex executionVertex : getTaskVertices()) {
            Map<String, Accumulator<?, ?>> userAccumulators = executionVertex.getCurrentExecutionAttempt().getUserAccumulators();
            if (userAccumulators != null) {
                AccumulatorHelper.mergeInto(hashMap, userAccumulators);
            }
        }
        return StringifiedAccumulatorResult.stringifyAccumulatorResults(hashMap);
    }

    /* renamed from: archive, reason: merged with bridge method [inline-methods] */
    public ArchivedExecutionJobVertex m159archive() {
        return new ArchivedExecutionJobVertex(this);
    }

    public static ExecutionState getAggregateJobVertexState(int[] iArr, int i) {
        if (iArr == null || iArr.length != ExecutionState.values().length) {
            throw new IllegalArgumentException("Must provide an array as large as there are execution states.");
        }
        return iArr[ExecutionState.FAILED.ordinal()] > 0 ? ExecutionState.FAILED : iArr[ExecutionState.CANCELING.ordinal()] > 0 ? ExecutionState.CANCELING : iArr[ExecutionState.CANCELED.ordinal()] > 0 ? ExecutionState.CANCELED : iArr[ExecutionState.INITIALIZING.ordinal()] > 0 ? ExecutionState.INITIALIZING : iArr[ExecutionState.RUNNING.ordinal()] > 0 ? ExecutionState.RUNNING : iArr[ExecutionState.FINISHED.ordinal()] > 0 ? iArr[ExecutionState.FINISHED.ordinal()] == i ? ExecutionState.FINISHED : ExecutionState.RUNNING : ExecutionState.CREATED;
    }
}
