/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.library.vertexmanager;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.AbstractList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.events.VertexManagerEvent;

@InterfaceAudience.Private
public class InputReadyVertexManager
extends VertexManagerPlugin {
    private static final Log LOG = LogFactory.getLog(InputReadyVertexManager.class);
    Map<String, SourceVertexInfo> srcVertexInfo = Maps.newHashMap();
    boolean[] taskIsStarted;
    int[] oneToOneSrcTasksDoneCount;
    TaskLocationHint[] oneToOneLocationHints;
    int numOneToOneEdges;

    public InputReadyVertexManager(VertexManagerPluginContext context) {
        super(context);
    }

    public void initialize() {
    }

    public void onVertexStarted(Map<String, List<Integer>> completions) {
        int numManagedTasks = this.getContext().getVertexNumTasks(this.getContext().getVertexName());
        LOG.info((Object)("Managing " + numManagedTasks + " tasks for vertex: " + this.getContext().getVertexName()));
        this.taskIsStarted = new boolean[numManagedTasks];
        Map edges = this.getContext().getInputVertexEdgeProperties();
        int oneToOneSrcTaskCount = 0;
        this.numOneToOneEdges = 0;
        for (Map.Entry entry : edges.entrySet()) {
            EdgeProperty edgeProp = (EdgeProperty)entry.getValue();
            String srcVertex = (String)entry.getKey();
            int numSrcTasks = this.getContext().getVertexNumTasks(srcVertex);
            switch (edgeProp.getDataMovementType()) {
                case CUSTOM: {
                    throw new TezUncheckedException("Cannot handle custom edge");
                }
                case ONE_TO_ONE: {
                    ++this.numOneToOneEdges;
                    if (oneToOneSrcTaskCount == 0) {
                        oneToOneSrcTaskCount = numSrcTasks;
                        break;
                    }
                    if (oneToOneSrcTaskCount == numSrcTasks) break;
                    throw new TezUncheckedException("All 1-1 source vertices must have identical concurrency");
                }
                case SCATTER_GATHER: 
                case BROADCAST: {
                    break;
                }
                default: {
                    throw new TezUncheckedException("Unknown edge type: " + edgeProp.getDataMovementType());
                }
            }
            this.srcVertexInfo.put(srcVertex, new SourceVertexInfo(numSrcTasks, edgeProp));
        }
        if (this.numOneToOneEdges > 0) {
            if (oneToOneSrcTaskCount != numManagedTasks) {
                throw new TezUncheckedException("Managed task number must equal 1-1 source task number");
            }
            this.oneToOneSrcTasksDoneCount = new int[oneToOneSrcTaskCount];
            this.oneToOneLocationHints = new TaskLocationHint[oneToOneSrcTaskCount];
        }
        for (Map.Entry<Object, Object> entry : completions.entrySet()) {
            for (Integer task : (List)entry.getValue()) {
                this.handleSourceTaskFinished((String)entry.getKey(), task);
            }
        }
    }

    public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
        this.handleSourceTaskFinished(srcVertexName, taskId);
    }

    public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
    }

    public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<Event> events) {
    }

    void handleSourceTaskFinished(String vertex, Integer taskId) {
        SourceVertexInfo srcInfo = this.srcVertexInfo.get(vertex);
        if (srcInfo.taskIsFinished[taskId] == null) {
            srcInfo.taskIsFinished[taskId.intValue()] = new Boolean(true);
            ++srcInfo.numFinishedTasks;
            if (srcInfo.edgeProperty.getDataMovementType() == EdgeProperty.DataMovementType.ONE_TO_ONE) {
                int n = taskId;
                this.oneToOneSrcTasksDoneCount[n] = this.oneToOneSrcTasksDoneCount[n] + 1;
                this.oneToOneLocationHints[taskId.intValue()] = TaskLocationHint.createTaskLocationHint((String)vertex, (int)taskId);
            }
        }
        if (srcInfo.edgeProperty.getDataMovementType() != EdgeProperty.DataMovementType.ONE_TO_ONE && srcInfo.numTasks != srcInfo.numFinishedTasks) {
            return;
        }
        for (SourceVertexInfo vInfo : this.srcVertexInfo.values()) {
            if (vInfo.edgeProperty.getDataMovementType() == EdgeProperty.DataMovementType.ONE_TO_ONE || vInfo.numTasks == vInfo.numFinishedTasks) continue;
            return;
        }
        AbstractList tasksToStart = null;
        if (this.numOneToOneEdges == 0) {
            int numTasks = this.taskIsStarted.length;
            LOG.info((Object)("Starting all " + numTasks + "tasks for vertex: " + this.getContext().getVertexName()));
            tasksToStart = Lists.newArrayListWithCapacity((int)numTasks);
            for (int i = 0; i < numTasks; ++i) {
                this.taskIsStarted[i] = true;
                tasksToStart.add(new VertexManagerPluginContext.TaskWithLocationHint(new Integer(i), null));
            }
        } else {
            tasksToStart = Lists.newLinkedList();
            for (int i = 0; i < this.taskIsStarted.length; ++i) {
                if (this.taskIsStarted[i] || this.oneToOneSrcTasksDoneCount[i] != this.numOneToOneEdges) continue;
                this.taskIsStarted[i] = true;
                TaskLocationHint locationHint = null;
                if (this.oneToOneLocationHints[i] != null) {
                    locationHint = this.oneToOneLocationHints[i];
                }
                LOG.info((Object)("Starting task " + i + " for vertex: " + this.getContext().getVertexName() + " with location: " + (locationHint != null ? locationHint.getAffinitizedTask() : "null")));
                tasksToStart.add(new VertexManagerPluginContext.TaskWithLocationHint(new Integer(i), locationHint));
            }
        }
        if (tasksToStart != null && !tasksToStart.isEmpty()) {
            this.getContext().scheduleVertexTasks((List)tasksToStart);
        }
    }

    class SourceVertexInfo {
        EdgeProperty edgeProperty;
        int numTasks;
        int numFinishedTasks;
        Boolean[] taskIsFinished;

        SourceVertexInfo(int numTasks, EdgeProperty edgeProperty) {
            this.numTasks = numTasks;
            this.numFinishedTasks = 0;
            this.edgeProperty = edgeProperty;
            this.taskIsFinished = new Boolean[numTasks];
        }
    }
}

