/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.hadoop;

import cascading.flow.FlowException;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlow;
import cascading.flow.hadoop.util.HadoopUtil;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.tap.Tap;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import riffle.process.scheduler.ProcessException;
import riffle.process.scheduler.ProcessWrapper;

public class ProcessFlow<P>
extends HadoopFlow {
    private final P process;
    private final ProcessWrapper processWrapper;

    @ConstructorProperties(value={"name", "process"})
    public ProcessFlow(String name, P process) {
        this(new Properties(), name, process);
    }

    @ConstructorProperties(value={"properties", "name", "process"})
    public ProcessFlow(Map<Object, Object> properties, String name, P process) {
        super(HadoopUtil.getPlatformInfo(), properties, null, name);
        this.process = process;
        this.processWrapper = new ProcessWrapper(this.process);
        this.setName(name);
        this.setTapFromProcess();
    }

    public void setTapFromProcess() {
        this.setSources(this.createSources(this.processWrapper));
        this.setSinks(this.createSinks(this.processWrapper));
        this.setTraps(this.createTraps(this.processWrapper));
    }

    public P getProcess() {
        return this.process;
    }

    public void prepare() {
        try {
            this.processWrapper.prepare();
        }
        catch (ProcessException exception) {
            if (exception.getCause() instanceof RuntimeException) {
                throw (RuntimeException)exception.getCause();
            }
            throw new FlowException("could not call prepare on process", exception.getCause());
        }
    }

    public void start() {
        try {
            this.processWrapper.start();
        }
        catch (ProcessException exception) {
            if (exception.getCause() instanceof RuntimeException) {
                throw (RuntimeException)exception.getCause();
            }
            throw new FlowException("could not call start on process", exception.getCause());
        }
    }

    public void stop() {
        try {
            this.processWrapper.stop();
        }
        catch (ProcessException exception) {
            if (exception.getCause() instanceof RuntimeException) {
                throw (RuntimeException)exception.getCause();
            }
            throw new FlowException("could not call stop on process", exception.getCause());
        }
    }

    public void complete() {
        try {
            this.processWrapper.complete();
        }
        catch (ProcessException exception) {
            if (exception.getCause() instanceof RuntimeException) {
                throw (RuntimeException)exception.getCause();
            }
            throw new FlowException("could not call complete on process", exception.getCause());
        }
    }

    public void cleanup() {
        try {
            this.processWrapper.cleanup();
        }
        catch (ProcessException exception) {
            if (exception.getCause() instanceof RuntimeException) {
                throw (RuntimeException)exception.getCause();
            }
            throw new FlowException("could not call cleanup on process", exception.getCause());
        }
    }

    private Map<String, Tap> createSources(ProcessWrapper processParent) {
        try {
            return this.makeTapMap(processParent.getDependencyIncoming());
        }
        catch (ProcessException exception) {
            if (exception.getCause() instanceof RuntimeException) {
                throw (RuntimeException)exception.getCause();
            }
            throw new FlowException("could not get process incoming dependency", exception.getCause());
        }
    }

    private Map<String, Tap> createSinks(ProcessWrapper processParent) {
        try {
            return this.makeTapMap(processParent.getDependencyOutgoing());
        }
        catch (ProcessException exception) {
            if (exception.getCause() instanceof RuntimeException) {
                throw (RuntimeException)exception.getCause();
            }
            throw new FlowException("could not get process outgoing dependency", exception.getCause());
        }
    }

    private Map<String, Tap> makeTapMap(Object resource) {
        Collection paths = this.makeCollection(resource);
        HashMap<String, Tap> taps = new HashMap<String, Tap>();
        for (Object path : paths) {
            if (path instanceof Tap) {
                taps.put(((Tap)path).getIdentifier(), (Tap)path);
                continue;
            }
            taps.put(path.toString(), new ProcessTap(new NullScheme(), path.toString()));
        }
        return taps;
    }

    private Collection makeCollection(Object resource) {
        if (resource instanceof Collection) {
            return (Collection)resource;
        }
        if (resource instanceof Object[]) {
            return Arrays.asList((Object[])resource);
        }
        return Arrays.asList(resource);
    }

    private Map<String, Tap> createTraps(ProcessWrapper processParent) {
        return new HashMap<String, Tap>();
    }

    public String toString() {
        return this.getName() + ":" + this.process;
    }

    static class ProcessTap
    extends Tap {
        private final String token;

        ProcessTap(NullScheme scheme, String token) {
            super((Scheme)scheme);
            this.token = token;
        }

        public String getIdentifier() {
            return this.token;
        }

        public TupleEntryIterator openForRead(FlowProcess flowProcess, Object input) throws IOException {
            return null;
        }

        public TupleEntryCollector openForWrite(FlowProcess flowProcess, Object output) throws IOException {
            return null;
        }

        public boolean createResource(Object conf) throws IOException {
            return false;
        }

        public boolean deleteResource(Object conf) throws IOException {
            return false;
        }

        public boolean resourceExists(Object conf) throws IOException {
            return false;
        }

        public long getModifiedTime(Object conf) throws IOException {
            return 0L;
        }

        public String toString() {
            return this.token;
        }
    }

    static class NullScheme
    extends Scheme {
        NullScheme() {
        }

        public void sourceConfInit(FlowProcess flowProcess, Tap tap, Object conf) {
        }

        public void sinkConfInit(FlowProcess flowProcess, Tap tap, Object conf) {
        }

        public boolean source(FlowProcess flowProcess, SourceCall sourceCall) throws IOException {
            throw new UnsupportedOperationException("sourcing is not supported in the scheme");
        }

        public String toString() {
            return ((Object)((Object)this)).getClass().getSimpleName();
        }

        public void sink(FlowProcess flowProcess, SinkCall sinkCall) throws IOException {
            throw new UnsupportedOperationException("sinking is not supported in the scheme");
        }
    }
}

