/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.hadoop.planner;

import cascading.flow.FlowElement;
import cascading.flow.FlowStep;
import cascading.flow.hadoop.HadoopFlowStep;
import cascading.flow.planner.ElementGraph;
import cascading.flow.planner.ElementGraphs;
import cascading.flow.planner.FlowStepGraph;
import cascading.flow.planner.PlannerException;
import cascading.flow.planner.Scope;
import cascading.pipe.Group;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.Splice;
import cascading.tap.Tap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.mapred.JobConf;
import org.jgrapht.DirectedGraph;
import org.jgrapht.GraphPath;
import org.jgrapht.Graphs;
import org.jgrapht.graph.SimpleDirectedGraph;
import org.jgrapht.traverse.TopologicalOrderIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HadoopStepGraph
extends FlowStepGraph<JobConf> {
    private static final Logger LOG = LoggerFactory.getLogger(HadoopStepGraph.class);

    public HadoopStepGraph() {
    }

    public HadoopStepGraph(String flowName, ElementGraph elementGraph) {
        super(flowName, elementGraph);
    }

    protected FlowStep<JobConf> createFlowStep(String stepName, int stepNum) {
        return new HadoopFlowStep(stepName, stepNum);
    }

    protected void makeStepGraph(String flowName, ElementGraph elementGraph) {
        SimpleDirectedGraph tapGraph = elementGraph.makeTapGraph();
        int numJobs = this.countNumJobs((SimpleDirectedGraph<Tap, Integer>)tapGraph);
        LinkedHashMap steps = new LinkedHashMap();
        TopologicalOrderIterator iterator = new TopologicalOrderIterator((DirectedGraph)tapGraph);
        int count = 0;
        while (iterator.hasNext()) {
            Tap source = (Tap)iterator.next();
            LOG.debug("handling source: {}", (Object)source);
            List sinks = Graphs.successorListOf((DirectedGraph)tapGraph, (Object)source);
            for (Tap sink : sinks) {
                LOG.debug("handling path: {} -> {}", (Object)source, (Object)sink);
                HadoopFlowStep step = (HadoopFlowStep)this.getCreateFlowStep(steps, sink, numJobs);
                this.addVertex((Object)step);
                if (steps.containsKey(source)) {
                    this.addEdge(steps.get(source), (Object)step, count++);
                }
                this.populateStep(elementGraph, source, sink, step);
            }
        }
    }

    private void populateStep(ElementGraph elementGraph, Tap source, Tap sink, HadoopFlowStep step) {
        Map traps = elementGraph.getTrapMap();
        List paths = ElementGraphs.getAllShortestPathsBetween((SimpleDirectedGraph)elementGraph, (FlowElement)source, (FlowElement)sink);
        for (GraphPath path : paths) {
            if (this.pathContainsTap(path)) continue;
            List scopes = path.getEdgeList();
            String sourceName = ((Scope)scopes.get(0)).getName();
            String sinkName = ((Scope)scopes.get(scopes.size() - 1)).getName();
            step.addSource(sourceName, source);
            step.addSink(sinkName, sink);
            Tap lhs = source;
            step.getGraph().addVertex((Object)lhs);
            boolean onMapSide = true;
            for (Scope scope : scopes) {
                String name;
                FlowElement rhs = (FlowElement)elementGraph.getEdgeTarget((Object)scope);
                step.getGraph().addVertex((Object)rhs);
                step.getGraph().addEdge((Object)lhs, (Object)rhs, (Object)scope);
                if (rhs instanceof Group) {
                    step.addGroup((Group)rhs);
                    onMapSide = false;
                } else if (rhs instanceof HashJoin) {
                    if (!onMapSide) {
                        throw new PlannerException("joins must not present Reduce side");
                    }
                    Map sourcePaths = ElementGraphs.countOrderedDirectPathsBetween((SimpleDirectedGraph)elementGraph, (FlowElement)source, (Splice)((Splice)rhs));
                    if (sourcePaths.containsKey(0)) {
                        step.addStreamedSourceFor((HashJoin)rhs, source);
                    } else {
                        step.addAccumulatedSourceFor((HashJoin)rhs, source);
                    }
                } else if (rhs instanceof Pipe && traps.containsKey(name = ((Pipe)rhs).getName())) {
                    if (onMapSide) {
                        step.getMapperTraps().put(name, (Tap)traps.get(name));
                    } else {
                        step.getReducerTraps().put(name, (Tap)traps.get(name));
                    }
                }
                lhs = rhs;
            }
        }
    }

    private int countNumJobs(SimpleDirectedGraph<Tap, Integer> tapGraph) {
        Set vertices = tapGraph.vertexSet();
        int count = 0;
        for (Tap vertex : vertices) {
            if (tapGraph.inDegreeOf((Object)vertex) == 0) continue;
            ++count;
        }
        return count;
    }
}

