/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.hadoop.stream;

import cascading.flow.FlowProcess;
import cascading.flow.stream.MemoryHashJoinGate;
import cascading.pipe.HashJoin;
import cascading.provider.FactoryLoader;
import cascading.tuple.Tuple;
import cascading.tuple.collect.Spillable;
import cascading.tuple.collect.SpillableTupleList;
import cascading.tuple.collect.TupleMapFactory;
import cascading.tuple.hadoop.collect.HadoopTupleMapFactory;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HadoopMemoryJoinGate
extends MemoryHashJoinGate {
    private static final Logger LOG = LoggerFactory.getLogger(HadoopMemoryJoinGate.class);
    private final SpillListener spillListener;
    private TupleMapFactory<JobConf> tupleMapFactory;

    public HadoopMemoryJoinGate(FlowProcess<JobConf> flowProcess, HashJoin join) {
        super(flowProcess, join);
        this.spillListener = new SpillListener(flowProcess);
        FactoryLoader loader = FactoryLoader.getInstance();
        this.tupleMapFactory = (TupleMapFactory)loader.loadFactoryFrom(flowProcess, "cascading.factory.tuple.map.classname", HadoopTupleMapFactory.class);
    }

    protected Set<Tuple> createKeySet() {
        return new HashSet<Tuple>();
    }

    protected Map<Tuple, Collection<Tuple>> createTupleMap() {
        Map map = (Map)this.tupleMapFactory.create(this.flowProcess);
        if (map instanceof Spillable) {
            ((Spillable)map).setSpillListener((Spillable.SpillListener)this.spillListener);
        }
        return map;
    }

    protected void waitOnLatch() {
    }

    protected void countDownLatch() {
    }

    private class SpillListener
    implements Spillable.SpillListener {
        private final FlowProcess<JobConf> flowProcess;

        public SpillListener(FlowProcess<JobConf> flowProcess) {
            this.flowProcess = flowProcess;
        }

        public void notifyWriteSpillBegin(Spillable spillable, int spillSize, String spillReason) {
            int numFiles = spillable.spillCount();
            if (numFiles % 10 == 0) {
                LOG.info("spilling grouping: {}, num times: {}, with reason: {}", new Object[]{spillable.getGrouping().print(), numFiles + 1, spillReason});
                Runtime runtime = Runtime.getRuntime();
                long freeMem = runtime.freeMemory() / 1024L / 1024L;
                long maxMem = runtime.maxMemory() / 1024L / 1024L;
                long totalMem = runtime.totalMemory() / 1024L / 1024L;
                LOG.info("mem on spill (mb), free: " + freeMem + ", total: " + totalMem + ", max: " + maxMem);
            }
            LOG.info("spilling {} tuples in list to file number {}", (Object)spillSize, (Object)(numFiles + 1));
            this.flowProcess.increment((Enum)Spill.Num_Spills_Written, 1L);
            this.flowProcess.increment((Enum)Spill.Num_Tuples_Spilled, (long)spillSize);
        }

        public void notifyWriteSpillEnd(SpillableTupleList spillableTupleList, long duration) {
            this.flowProcess.increment((Enum)Spill.Duration_Millis_Written, duration);
        }

        public void notifyReadSpillBegin(Spillable spillable) {
            this.flowProcess.increment((Enum)Spill.Num_Spills_Read, 1L);
        }
    }

    public static enum Spill {
        Num_Spills_Written,
        Num_Spills_Read,
        Num_Tuples_Spilled,
        Duration_Millis_Written;

    }
}

