/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.util.Map;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroupComputeUtil;
import org.apache.flink.runtime.jobgraph.forwardgroup.JobVertexForwardGroup;
import org.apache.flink.runtime.jobmaster.event.JobEvent;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveExecutionHandler;
import org.apache.flink.runtime.scheduler.adaptivebatch.ExecutionPlanSchedulingContext;
import org.apache.flink.runtime.scheduler.adaptivebatch.JobGraphUpdateListener;
import org.apache.flink.runtime.scheduler.adaptivebatch.NonAdaptiveExecutionPlanSchedulingContext;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NonAdaptiveExecutionHandler
implements AdaptiveExecutionHandler {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final JobGraph jobGraph;
    private final Map<JobVertexID, JobVertexForwardGroup> forwardGroupsByJobVertexId;

    public NonAdaptiveExecutionHandler(JobGraph jobGraph) {
        this.jobGraph = Preconditions.checkNotNull(jobGraph);
        this.forwardGroupsByJobVertexId = ForwardGroupComputeUtil.computeForwardGroupsAndCheckParallelism(this.getJobGraph().getVerticesSortedTopologicallyFromSources());
    }

    @Override
    public JobGraph getJobGraph() {
        return this.jobGraph;
    }

    @Override
    public void handleJobEvent(JobEvent jobEvent) {
    }

    @Override
    public void registerJobGraphUpdateListener(JobGraphUpdateListener listener) {
    }

    @Override
    public int getInitialParallelism(JobVertexID jobVertexId) {
        JobVertex jobVertex = this.jobGraph.findVertexByID(jobVertexId);
        int vertexInitialParallelism = jobVertex.getParallelism();
        JobVertexForwardGroup forwardGroup = this.forwardGroupsByJobVertexId.get(jobVertexId);
        if (jobVertex.getParallelism() == -1 && forwardGroup != null && forwardGroup.isParallelismDecided()) {
            vertexInitialParallelism = forwardGroup.getParallelism();
            this.log.info("Parallelism of JobVertex: {} ({}) is decided to be {} according to forward group's parallelism.", new Object[]{jobVertex.getName(), jobVertex, vertexInitialParallelism});
        }
        return vertexInitialParallelism;
    }

    @Override
    public void notifyJobVertexParallelismDecided(JobVertexID jobVertexId, int parallelism) {
        JobVertexForwardGroup forwardGroup = this.forwardGroupsByJobVertexId.get(jobVertexId);
        if (forwardGroup != null && !forwardGroup.isParallelismDecided()) {
            forwardGroup.setParallelism(parallelism);
        } else if (forwardGroup != null) {
            Preconditions.checkArgument(forwardGroup.getParallelism() == parallelism, "Incompatible parallelism for forward group.");
        }
    }

    @Override
    public ExecutionPlanSchedulingContext createExecutionPlanSchedulingContext(int defaultMaxParallelism) {
        return NonAdaptiveExecutionPlanSchedulingContext.INSTANCE;
    }
}

