/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.hadoop.stream;

import cascading.flow.FlowElement;
import cascading.flow.FlowException;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.flow.hadoop.HadoopFlowStep;
import cascading.flow.hadoop.stream.HadoopCoGroupGate;
import cascading.flow.hadoop.stream.HadoopGroupByGate;
import cascading.flow.hadoop.stream.HadoopMemoryJoinGate;
import cascading.flow.hadoop.stream.HadoopSinkStage;
import cascading.flow.hadoop.util.HadoopUtil;
import cascading.flow.planner.BaseFlowStep;
import cascading.flow.stream.Duct;
import cascading.flow.stream.Gate;
import cascading.flow.stream.MemoryHashJoinGate;
import cascading.flow.stream.SinkStage;
import cascading.flow.stream.SourceStage;
import cascading.flow.stream.SpliceGate;
import cascading.flow.stream.StepStreamGraph;
import cascading.pipe.CoGroup;
import cascading.pipe.Group;
import cascading.pipe.GroupBy;
import cascading.pipe.HashJoin;
import cascading.tap.Tap;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;

public class HadoopMapStreamGraph
extends StepStreamGraph {
    private final Tap source;
    private SourceStage streamedHead;

    public HadoopMapStreamGraph(HadoopFlowProcess flowProcess, HadoopFlowStep step, Tap source) {
        super((FlowProcess)flowProcess, (BaseFlowStep)step);
        this.source = source;
        this.buildGraph();
        this.setTraps();
        this.setScopes();
        this.printGraph(step.getID(), "map", flowProcess.getCurrentSliceNum());
        this.bind();
    }

    public SourceStage getStreamedHead() {
        return this.streamedHead;
    }

    protected void buildGraph() {
        this.streamedHead = this.handleHead(this.source, this.flowProcess);
        Group tail = this.step.getGroup() != null ? this.step.getGroup() : this.step.getSink();
        Set tributaries = this.step.getJoinTributariesBetween((FlowElement)this.source, (FlowElement)tail);
        tributaries.remove(this.source);
        for (Tap source : tributaries) {
            HadoopFlowProcess hadoopProcess = (HadoopFlowProcess)this.flowProcess;
            JobConf conf = hadoopProcess.getJobConf();
            String property = conf.getRaw("cascading.step.accumulated.source.conf." + Tap.id((Tap)source));
            if (property == null) {
                throw new IllegalStateException("accumulated source conf property missing");
            }
            conf = this.getSourceConf(hadoopProcess, conf, property);
            this.flowProcess = new HadoopFlowProcess(hadoopProcess, conf);
            this.handleHead(source, this.flowProcess);
        }
    }

    private JobConf getSourceConf(HadoopFlowProcess flowProcess, JobConf conf, String property) {
        Map priorConf;
        try {
            priorConf = HadoopUtil.deserializeBase64(property, (Configuration)conf, HashMap.class, true);
        }
        catch (IOException exception) {
            throw new FlowException("unable to deserialize properties", (Throwable)exception);
        }
        return flowProcess.mergeMapIntoConfig(conf, (Map<String, String>)priorConf);
    }

    private SourceStage handleHead(Tap source, FlowProcess flowProcess) {
        SourceStage sourceDuct = new SourceStage(flowProcess, source);
        this.addHead((Duct)sourceDuct);
        this.handleDuct((FlowElement)source, (Duct)sourceDuct);
        return sourceDuct;
    }

    protected SinkStage createSinkStage(Tap element) {
        return new HadoopSinkStage(this.flowProcess, element);
    }

    protected Gate createCoGroupGate(CoGroup element) {
        return new HadoopCoGroupGate(this.flowProcess, element, SpliceGate.Role.sink);
    }

    protected Gate createGroupByGate(GroupBy element) {
        return new HadoopGroupByGate(this.flowProcess, element, SpliceGate.Role.sink);
    }

    protected MemoryHashJoinGate createNonBlockingJoinGate(HashJoin join) {
        return new HadoopMemoryJoinGate((FlowProcess<JobConf>)this.flowProcess, join);
    }

    protected boolean stopOnElement(FlowElement lhsElement, List<FlowElement> successors) {
        if (lhsElement instanceof Group) {
            return true;
        }
        if (successors.isEmpty()) {
            if (!(lhsElement instanceof Tap)) {
                throw new IllegalStateException("expected a Tap instance");
            }
            return true;
        }
        return false;
    }
}

