/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.hadoop.planner;

import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.FlowDef;
import cascading.flow.FlowElement;
import cascading.flow.hadoop.HadoopFlow;
import cascading.flow.hadoop.planner.HadoopStepGraph;
import cascading.flow.hadoop.util.HadoopUtil;
import cascading.flow.planner.ElementGraph;
import cascading.flow.planner.ElementGraphs;
import cascading.flow.planner.FlowPlanner;
import cascading.flow.planner.PlatformInfo;
import cascading.pipe.CoGroup;
import cascading.pipe.Every;
import cascading.pipe.Group;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.property.PropertyUtil;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tap.hadoop.util.TempHfs;
import cascading.util.Util;
import java.net.URI;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.mapred.JobConf;
import org.jgrapht.GraphPath;
import org.jgrapht.Graphs;
import org.jgrapht.graph.SimpleDirectedGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HadoopPlanner
extends FlowPlanner {
    private static final Logger LOG = LoggerFactory.getLogger(HadoopPlanner.class);
    private JobConf jobConf;
    private Class intermediateSchemeClass;

    public static void copyJobConf(Map<Object, Object> properties, JobConf jobConf) {
        for (Map.Entry entry : jobConf) {
            properties.put(entry.getKey(), entry.getValue());
        }
    }

    public static JobConf createJobConf(Map<Object, Object> properties) {
        JobConf conf = new JobConf();
        if (properties instanceof Properties) {
            Properties props = (Properties)properties;
            Set<String> keys = props.stringPropertyNames();
            for (String key : keys) {
                conf.set(key, props.getProperty(key));
            }
        } else {
            for (Map.Entry<Object, Object> entry : properties.entrySet()) {
                if (entry.getValue() == null) continue;
                conf.set(entry.getKey().toString(), entry.getValue().toString());
            }
        }
        return conf;
    }

    public static void setNormalizeHeterogeneousSources(Map<Object, Object> properties, boolean doNormalize) {
        properties.put("cascading.multimapreduceplanner.normalizesources", Boolean.toString(doNormalize));
    }

    public static boolean getNormalizeHeterogeneousSources(Map<Object, Object> properties) {
        return Boolean.parseBoolean((String)PropertyUtil.getProperty(properties, (String)"cascading.multimapreduceplanner.normalizesources", (Object)"false"));
    }

    public PlatformInfo getPlatformInfo() {
        return HadoopUtil.getPlatformInfo();
    }

    public void initialize(FlowConnector flowConnector, Map<Object, Object> properties) {
        super.initialize(flowConnector, properties);
        this.jobConf = HadoopUtil.createJobConf(properties, HadoopPlanner.createJobConf(properties));
        this.intermediateSchemeClass = flowConnector.getIntermediateSchemeClass(properties);
        Class type = AppProps.getApplicationJarClass(properties);
        if (this.jobConf.getJar() == null && type != null) {
            this.jobConf.setJarByClass(type);
        }
        String path = AppProps.getApplicationJarPath(properties);
        if (this.jobConf.getJar() == null && path != null) {
            this.jobConf.setJar(path);
        }
        if (this.jobConf.getJar() == null) {
            this.jobConf.setJarByClass(HadoopUtil.findMainClass(HadoopPlanner.class));
        }
        AppProps.setApplicationJarPath(properties, (String)this.jobConf.getJar());
        LOG.info("using application jar: {}", (Object)this.jobConf.getJar());
    }

    public Flow buildFlow(FlowDef flowDef) {
        ElementGraph elementGraph = null;
        try {
            this.verifyAssembly(flowDef);
            HadoopFlow flow = new HadoopFlow(this.getPlatformInfo(), (Map<Object, Object>)this.properties, this.jobConf, flowDef);
            elementGraph = this.createElementGraph(flowDef);
            this.failOnLoneGroupAssertion(elementGraph);
            this.failOnMissingGroup(elementGraph);
            this.failOnMisusedBuffer(elementGraph);
            this.failOnGroupEverySplit(elementGraph);
            this.handleWarnEquivalentPaths(elementGraph);
            this.handleSplit(elementGraph);
            this.handleJobPartitioning(elementGraph);
            this.handleJoins(elementGraph);
            this.handleNonSafeOperations(elementGraph);
            if (HadoopPlanner.getNormalizeHeterogeneousSources(this.properties)) {
                this.handleHeterogeneousSources(elementGraph);
            }
            elementGraph.removeUnnecessaryPipes();
            elementGraph.resolveFields();
            elementGraph = flow.updateSchemes(elementGraph);
            this.handleAdjacentTaps(elementGraph);
            HadoopStepGraph flowStepGraph = new HadoopStepGraph(flowDef.getName(), elementGraph);
            flow.initialize(elementGraph, flowStepGraph);
            return flow;
        }
        catch (Exception exception) {
            throw this.handleExceptionDuringPlanning(exception, elementGraph);
        }
    }

    private void handleWarnEquivalentPaths(ElementGraph elementGraph) {
        List coGroups = elementGraph.findAllCoGroups();
        for (CoGroup coGroup : coGroups) {
            List graphPaths = elementGraph.getAllShortestPathsTo((FlowElement)coGroup);
            List paths = ElementGraphs.asPathList((List)graphPaths);
            if (!this.areEquivalentPaths(elementGraph, paths)) continue;
            LOG.warn("found equivalent paths from: {} to: {}", ((List)paths.get(0)).get(1), (Object)coGroup);
        }
    }

    private boolean areEquivalentPaths(ElementGraph elementGraph, List<List<FlowElement>> paths) {
        int length = this.sameLength(paths);
        if (length == -1) {
            return false;
        }
        TreeSet<FlowElement> elements = new TreeSet<FlowElement>(new EquivalenceComparator(elementGraph));
        for (int i = 0; i < length; ++i) {
            elements.clear();
            for (List<FlowElement> path : paths) {
                elements.add(path.get(i));
            }
            if (elements.size() == 1) continue;
            return false;
        }
        return true;
    }

    private int sameLength(List<List<FlowElement>> paths) {
        int lastSize = paths.get(0).size();
        for (int i = 1; i < paths.size(); ++i) {
            if (paths.get(i).size() == lastSize) continue;
            return -1;
        }
        return lastSize;
    }

    private void handleSplit(ElementGraph elementGraph) {
        while (!this.internalSplit(elementGraph)) {
        }
    }

    private boolean internalSplit(ElementGraph elementGraph) {
        List paths = elementGraph.getAllShortestPathsBetweenExtents();
        for (GraphPath path : paths) {
            List flowElements = Graphs.getPathVertexList((GraphPath)path);
            HashSet<Pipe> tapInsertions = new HashSet<Pipe>();
            FlowElement lastInsertable = null;
            for (int i = 0; i < flowElements.size(); ++i) {
                int maxPaths;
                FlowElement flowElement = (FlowElement)flowElements.get(i);
                if (flowElement instanceof ElementGraph.Extent) continue;
                if (flowElement instanceof Tap || flowElement instanceof Group || flowElement instanceof Every) {
                    lastInsertable = flowElement;
                }
                if (flowElement.getClass() == Pipe.class && flowElements.get(i - 1) instanceof Tap || flowElement instanceof Tap || elementGraph.outDegreeOf((Object)flowElement) <= 1 || (maxPaths = elementGraph.getMaxNumPathsBetweenElementAndGroupingMergeJoin(flowElement)) <= 1 && lastInsertable instanceof Tap) continue;
                tapInsertions.add((Pipe)flowElement);
            }
            for (Pipe pipe : tapInsertions) {
                this.insertTempTapAfter(elementGraph, pipe);
            }
            if (tapInsertions.isEmpty()) continue;
            return false;
        }
        return true;
    }

    private void handleAdjacentTaps(ElementGraph elementGraph) {
        while (!this.internalAdjacentTaps(elementGraph)) {
        }
    }

    private boolean internalAdjacentTaps(ElementGraph elementGraph) {
        List taps = elementGraph.findAllTaps();
        for (Tap tap : taps) {
            if (!(tap instanceof TempHfs)) continue;
            for (FlowElement successor : elementGraph.getAllSuccessors((FlowElement)tap)) {
                URI successorURIScheme;
                URI tempURIScheme;
                Hfs successorTap;
                if (!(successor instanceof Hfs) || !(successorTap = (Hfs)successor).getScheme().isSymmetrical() || !(tempURIScheme = this.getDefaultURIScheme(tap)).equals(successorURIScheme = this.getURIScheme(successorTap)) || !tap.getSourceFields().equals((Object)successorTap.getSourceFields())) continue;
                elementGraph.replaceElementWith((FlowElement)tap, successor);
                return false;
            }
        }
        return true;
    }

    private URI getDefaultURIScheme(Tap tap) {
        return ((Hfs)tap).getDefaultFileSystemURIScheme(this.jobConf);
    }

    private URI getURIScheme(Tap tap) {
        return ((Hfs)tap).getURIScheme(this.jobConf);
    }

    private void handleHeterogeneousSources(ElementGraph elementGraph) {
        while (!this.internalHeterogeneousSources(elementGraph)) {
        }
    }

    private boolean internalHeterogeneousSources(ElementGraph elementGraph) {
        Set<Tap> taps;
        List groups = elementGraph.findAllMergeJoinGroups();
        HashMap<Group, Set<Tap>> normalizeGroups = new HashMap<Group, Set<Tap>>();
        block0: for (Group group : groups) {
            taps = new HashSet();
            block1: for (GraphPath path : elementGraph.getAllShortestPathsTo((FlowElement)group)) {
                List flowElements = Graphs.getPathVertexList((GraphPath)path);
                Collections.reverse(flowElements);
                for (FlowElement previousElement : flowElements) {
                    if (!(previousElement instanceof Tap)) continue;
                    taps.add((Tap)previousElement);
                    continue block1;
                }
            }
            if (taps.size() == 1) continue;
            Iterator iterator = taps.iterator();
            Tap commonTap = (Tap)iterator.next();
            while (iterator.hasNext()) {
                Tap tap = (Tap)iterator.next();
                if (this.getSchemeClass(tap) == this.getSchemeClass(commonTap)) continue;
                normalizeGroups.put(group, taps);
                continue block0;
            }
        }
        for (Group group : normalizeGroups.keySet()) {
            taps = (Set)normalizeGroups.get(group);
            for (Tap tap : taps) {
                if (tap instanceof TempHfs || this.getSchemeClass(tap).equals(this.intermediateSchemeClass)) continue;
                for (GraphPath path : ElementGraphs.getAllShortestPathsBetween((SimpleDirectedGraph)elementGraph, (FlowElement)tap, (FlowElement)group)) {
                    List flowElements = Graphs.getPathVertexList((GraphPath)path);
                    Collections.reverse(flowElements);
                    FlowElement flowElement = (FlowElement)flowElements.get(1);
                    if (flowElement instanceof TempHfs) continue;
                    LOG.warn("inserting step to normalize incompatible sources: {}", (Object)tap);
                    this.insertTempTapAfter(elementGraph, (Pipe)flowElement);
                    return false;
                }
            }
        }
        return normalizeGroups.isEmpty();
    }

    protected Tap makeTempTap(String prefix, String name) {
        return new TempHfs(this.jobConf, Util.makePath((String)prefix, (String)name), this.intermediateSchemeClass, prefix == null);
    }

    private Class getSchemeClass(Tap tap) {
        if (tap instanceof TempHfs) {
            return ((TempHfs)tap).getSchemeClass();
        }
        return tap.getScheme().getClass();
    }

    private class EquivalenceComparator
    implements Comparator<FlowElement> {
        private final ElementGraph elementGraph;

        public EquivalenceComparator(ElementGraph elementGraph) {
            this.elementGraph = elementGraph;
        }

        @Override
        public int compare(FlowElement lhs, FlowElement rhs) {
            boolean sameOutgoing;
            boolean areEquivalent = lhs.isEquivalentTo(rhs);
            boolean sameIncoming = this.elementGraph.inDegreeOf((Object)lhs) == this.elementGraph.inDegreeOf((Object)rhs);
            boolean bl = sameOutgoing = this.elementGraph.outDegreeOf((Object)lhs) == this.elementGraph.outDegreeOf((Object)rhs);
            if (areEquivalent && sameIncoming && sameOutgoing) {
                return 0;
            }
            return System.identityHashCode(lhs) - System.identityHashCode(rhs);
        }
    }
}

