/*
 * Decompiled with CFR 0.152.
 */
package org.talend.sdk.component.runtime.manager.chain.internal;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.InvocationTargetException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.json.bind.Jsonb;
import javax.json.bind.JsonbBuilder;
import javax.json.bind.JsonbConfig;
import javax.json.bind.adapter.JsonbAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.talend.sdk.component.api.processor.OutputEmitter;
import org.talend.sdk.component.api.record.Record;
import org.talend.sdk.component.api.service.record.RecordBuilderFactory;
import org.talend.sdk.component.runtime.base.Lifecycle;
import org.talend.sdk.component.runtime.input.Input;
import org.talend.sdk.component.runtime.input.Mapper;
import org.talend.sdk.component.runtime.jsonb.MultipleFormatDateAdapter;
import org.talend.sdk.component.runtime.manager.ComponentManager;
import org.talend.sdk.component.runtime.manager.chain.AutoChunkProcessor;
import org.talend.sdk.component.runtime.manager.chain.ChainedMapper;
import org.talend.sdk.component.runtime.manager.chain.GroupKeyProvider;
import org.talend.sdk.component.runtime.manager.chain.Job;
import org.talend.sdk.component.runtime.manager.chain.internal.DSLParser;
import org.talend.sdk.component.runtime.output.InputFactory;
import org.talend.sdk.component.runtime.output.OutputFactory;
import org.talend.sdk.component.runtime.output.Processor;
import org.talend.sdk.component.runtime.output.ProcessorImpl;
import org.talend.sdk.component.runtime.record.RecordBuilderFactoryImpl;
import org.talend.sdk.component.runtime.record.RecordConverters;

public class JobImpl
implements Job {

    protected static class GroupKeyProviderImpl
    implements GroupKeyProvider {
        private final GroupKeyProvider delegate;

        @Override
        public String apply(GroupKeyProvider.GroupContext context) {
            return (String)this.delegate.apply(context);
        }

        public GroupKeyProviderImpl(GroupKeyProvider delegate) {
            this.delegate = delegate;
        }
    }

    private static class DataInputFactory
    implements InputFactory {
        private final Map<String, Iterator<Object>> inputs = new HashMap<String, Iterator<Object>>();
        private volatile Jsonb jsonb;
        private volatile RecordBuilderFactory factory;

        private DataInputFactory() {
        }

        private DataInputFactory withInput(String branch, Collection<Object> branchData) {
            this.inputs.put(branch, branchData.iterator());
            return this;
        }

        public Object read(String name) {
            Iterator<Object> iterator = this.inputs.get(name);
            if (iterator != null && iterator.hasNext()) {
                return this.map(iterator.next());
            }
            return null;
        }

        private Object map(Object next) {
            if (next == null || Record.class.isInstance(next)) {
                return next;
            }
            String str = this.jsonb.toJson(next);
            if (str.equals(next.toString())) {
                return next;
            }
            return new RecordConverters().toRecord(next, () -> {
                if (this.jsonb == null) {
                    DataInputFactory dataInputFactory = this;
                    synchronized (dataInputFactory) {
                        if (this.jsonb == null) {
                            this.jsonb = JsonbBuilder.create((JsonbConfig)new JsonbConfig().withAdapters(new JsonbAdapter[]{new MultipleFormatDateAdapter()}).setProperty("johnzon.cdi.activated", (Object)false));
                        }
                    }
                }
                return this.jsonb;
            }, () -> {
                if (this.factory == null) {
                    DataInputFactory dataInputFactory = this;
                    synchronized (dataInputFactory) {
                        if (this.factory == null) {
                            this.factory = new RecordBuilderFactoryImpl("test");
                        }
                    }
                }
                return this.factory;
            });
        }
    }

    private static class DataOutputFactory
    implements OutputFactory {
        private final Map<Class<?>, Object> services;
        private final Map<String, Collection<Record>> outputs = new HashMap<String, Collection<Record>>();

        public OutputEmitter create(String name) {
            return new OutputEmitterImpl(name);
        }

        public DataOutputFactory(Map<Class<?>, Object> services) {
            this.services = services;
        }

        public Map<Class<?>, Object> getServices() {
            return this.services;
        }

        public Map<String, Collection<Record>> getOutputs() {
            return this.outputs;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof DataOutputFactory)) {
                return false;
            }
            DataOutputFactory other = (DataOutputFactory)o;
            if (!other.canEqual(this)) {
                return false;
            }
            Map<Class<?>, Object> this$services = this.getServices();
            Map<Class<?>, Object> other$services = other.getServices();
            if (this$services == null ? other$services != null : !((Object)this$services).equals(other$services)) {
                return false;
            }
            Map<String, Collection<Record>> this$outputs = this.getOutputs();
            Map<String, Collection<Record>> other$outputs = other.getOutputs();
            return !(this$outputs == null ? other$outputs != null : !((Object)this$outputs).equals(other$outputs));
        }

        protected boolean canEqual(Object other) {
            return other instanceof DataOutputFactory;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Map<Class<?>, Object> $services = this.getServices();
            result = result * 59 + ($services == null ? 43 : ((Object)$services).hashCode());
            Map<String, Collection<Record>> $outputs = this.getOutputs();
            result = result * 59 + ($outputs == null ? 43 : ((Object)$outputs).hashCode());
            return result;
        }

        public String toString() {
            return "JobImpl.DataOutputFactory(services=" + this.getServices() + ", outputs=" + this.getOutputs() + ")";
        }

        private class OutputEmitterImpl
        implements OutputEmitter {
            private final String name;

            public void emit(Object value) {
                DataOutputFactory.this.outputs.computeIfAbsent(this.name, k -> new ArrayList()).add(new RecordConverters().toRecord(value, () -> (Jsonb)Jsonb.class.cast(DataOutputFactory.this.services.get(Jsonb.class)), () -> (RecordBuilderFactory)RecordBuilderFactory.class.cast(DataOutputFactory.this.services.get(RecordBuilderFactory.class))));
            }

            public OutputEmitterImpl(String name) {
                this.name = name;
            }
        }
    }

    private static class InputRunner {
        private static final Logger log = LoggerFactory.getLogger(InputRunner.class);
        private final Mapper chainedMapper;
        private final Input input;

        private InputRunner(Mapper mapper) {
            RuntimeException error = null;
            try {
                mapper.start();
                this.chainedMapper = new ChainedMapper(mapper, mapper.split(mapper.assess()).iterator());
                this.chainedMapper.start();
                this.input = this.chainedMapper.create();
                this.input.start();
            }
            catch (RuntimeException re) {
                error = re;
                throw re;
            }
            finally {
                try {
                    mapper.stop();
                }
                catch (RuntimeException re) {
                    if (error == null) {
                        throw re;
                    }
                    log.error(re.getMessage(), (Throwable)re);
                }
            }
        }

        public Record next() {
            Object next = this.input.next();
            if (next == null) {
                return null;
            }
            return (Record)Record.class.cast(next);
        }

        public void stop() {
            RuntimeException error = null;
            try {
                if (this.input != null) {
                    this.input.stop();
                }
            }
            catch (RuntimeException re) {
                error = re;
                throw re;
            }
            finally {
                try {
                    if (this.chainedMapper != null) {
                        this.chainedMapper.stop();
                    }
                }
                catch (RuntimeException re) {
                    if (error == null) {
                        throw re;
                    }
                    log.error(re.getMessage(), (Throwable)re);
                }
            }
        }
    }

    public static class LocalSequenceHolder {
        private static final Map<String, AtomicLong> GENERATORS = new HashMap<String, AtomicLong>();

        public static GroupKeyProvider cleanAndGet(String name) {
            GENERATORS.put(name, new AtomicLong(0L));
            return c -> Long.toString(GENERATORS.get(name).incrementAndGet());
        }

        public static void clean(String name) {
            GENERATORS.remove(name);
        }
    }

    private static class GroupContextImpl
    implements GroupKeyProvider.GroupContext {
        private final Record data;
        private final String componentId;
        private final String branchName;

        public GroupContextImpl(Record data, String componentId, String branchName) {
            this.data = data;
            this.componentId = componentId;
            this.branchName = branchName;
        }

        @Override
        public Record getData() {
            return this.data;
        }

        @Override
        public String getComponentId() {
            return this.componentId;
        }

        @Override
        public String getBranchName() {
            return this.branchName;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof GroupContextImpl)) {
                return false;
            }
            GroupContextImpl other = (GroupContextImpl)o;
            if (!other.canEqual(this)) {
                return false;
            }
            Record this$data = this.getData();
            Record other$data = other.getData();
            if (this$data == null ? other$data != null : !this$data.equals(other$data)) {
                return false;
            }
            String this$componentId = this.getComponentId();
            String other$componentId = other.getComponentId();
            if (this$componentId == null ? other$componentId != null : !this$componentId.equals(other$componentId)) {
                return false;
            }
            String this$branchName = this.getBranchName();
            String other$branchName = other.getBranchName();
            return !(this$branchName == null ? other$branchName != null : !this$branchName.equals(other$branchName));
        }

        protected boolean canEqual(Object other) {
            return other instanceof GroupContextImpl;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Record $data = this.getData();
            result = result * 59 + ($data == null ? 43 : $data.hashCode());
            String $componentId = this.getComponentId();
            result = result * 59 + ($componentId == null ? 43 : $componentId.hashCode());
            String $branchName = this.getBranchName();
            result = result * 59 + ($branchName == null ? 43 : $branchName.hashCode());
            return result;
        }

        public String toString() {
            return "JobImpl.GroupContextImpl(data=" + this.getData() + ", componentId=" + this.getComponentId() + ", branchName=" + this.getBranchName() + ")";
        }
    }

    public static class JobExecutor
    implements Job.ExecutorBuilder {
        private static final Logger log = LoggerFactory.getLogger(JobExecutor.class);
        private final Map<Integer, Set<Job.Component>> levels;
        private final List<Job.Edge> edges;
        private final Map<String, Map<String, Object>> componentProperties;
        private final Map<String, Object> jobProperties = new HashMap<String, Object>();
        private final ComponentManager manager = ComponentManager.instance();

        @Override
        public Job.ExecutorBuilder property(String name, Object value) {
            this.jobProperties.put(name, value);
            return this;
        }

        @Override
        public void run() {
            Job.ExecutorBuilder runner = this;
            Object o = this.jobProperties.get(Job.ExecutorBuilder.class.getName());
            if (Job.ExecutorBuilder.class.isInstance(o)) {
                runner = (Job.ExecutorBuilder)Job.ExecutorBuilder.class.cast(o);
            } else if (Class.class.isInstance(o)) {
                runner = this.newRunner((Class)Class.class.cast(o));
            } else if (String.class.isInstance(o)) {
                String name = ((String)String.class.cast(o)).trim();
                if (!("standalone".equalsIgnoreCase(name) || "default".equalsIgnoreCase(name) || "local".equalsIgnoreCase(name))) {
                    if ("beam".equalsIgnoreCase(name)) {
                        try {
                            runner = this.newRunner(Thread.currentThread().getContextClassLoader(), "org.talend.sdk.component.runtime.beam.chain.impl.BeamExecutor");
                        }
                        catch (RuntimeException re) {
                            log.error("Can't instantiate beam job integration, did you add org.talend.sdk.component:component-runtime-beam in your dependencies", (Throwable)re);
                        }
                    } else {
                        runner = this.newRunner(Thread.currentThread().getContextClassLoader(), name);
                    }
                }
            } else {
                if (o != null) {
                    throw new IllegalArgumentException(o + " is not an ExecutionBuilder");
                }
                ClassLoader loader = Thread.currentThread().getContextClassLoader();
                try (InputStream stream = loader.getResourceAsStream("META-INF/services/" + Job.ExecutorBuilder.class.getName());){
                    if (stream != null) {
                        runner = new BufferedReader(new InputStreamReader(stream)).lines().map(String::trim).filter(s -> !s.startsWith("#") && !s.isEmpty()).findFirst().map(clazz -> this.newRunner(loader, (String)clazz)).orElse(this);
                    }
                }
                catch (IOException e) {
                    log.debug(e.getMessage(), (Throwable)e);
                }
            }
            if (runner == this) {
                ((JobExecutor)JobExecutor.class.cast(runner)).localRun();
            } else {
                runner.run();
            }
        }

        private Job.ExecutorBuilder newRunner(ClassLoader loader, String clazz) {
            try {
                Class<?> aClass = loader.loadClass(clazz);
                return this.newRunner(aClass);
            }
            catch (ClassNotFoundException e) {
                throw new IllegalArgumentException(e);
            }
        }

        private Job.ExecutorBuilder newRunner(Class<? extends Job.ExecutorBuilder> runnerType) {
            try {
                try {
                    return runnerType.getConstructor(JobExecutor.class).newInstance(this);
                }
                catch (NoSuchMethodException e) {
                    try {
                        return runnerType.getConstructor(new Class[0]).newInstance(new Object[0]);
                    }
                    catch (IllegalAccessException | InstantiationException | NoSuchMethodException e1) {
                        throw new IllegalArgumentException(e1);
                    }
                }
            }
            catch (InvocationTargetException e1) {
                throw new IllegalArgumentException(e1.getTargetException());
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void localRun() {
            Map<String, InputRunner> inputs = this.levels.values().stream().flatMap(Collection::stream).filter(Job.Component::isSource).map(n -> {
                Mapper mapper = this.manager.findMapper(n.getNode().getFamily(), n.getNode().getComponent(), n.getNode().getVersion(), n.getNode().getConfiguration()).orElseThrow(() -> new IllegalStateException("No mapper found for: " + n.getNode()));
                return new AbstractMap.SimpleEntry<String, InputRunner>(n.getId(), new InputRunner(mapper));
            }).collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
            Map<String, AutoChunkProcessor> processors = this.levels.values().stream().flatMap(Collection::stream).filter(component -> !component.isSource()).map(component -> {
                Processor processor = this.manager.findProcessor(component.getNode().getFamily(), component.getNode().getComponent(), component.getNode().getVersion(), component.getNode().getConfiguration()).orElseThrow(() -> new IllegalStateException("No processor found for:" + component.getNode()));
                AtomicInteger maxBatchSize = new AtomicInteger(1);
                if (ProcessorImpl.class.isInstance(processor)) {
                    ((ProcessorImpl)ProcessorImpl.class.cast(processor)).getInternalConfiguration().entrySet().stream().filter(it -> ((String)it.getKey()).endsWith("$maxBatchSize") && it.getValue() != null && !((String)it.getValue()).trim().isEmpty()).findFirst().ifPresent(val -> {
                        try {
                            maxBatchSize.set(Integer.parseInt(((String)val.getValue()).trim()));
                        }
                        catch (NumberFormatException nfe) {
                            throw new IllegalArgumentException("Invalid configuratoin: " + val);
                        }
                    });
                }
                return new AbstractMap.SimpleEntry<String, AutoChunkProcessor>(component.getId(), new AutoChunkProcessor(maxBatchSize.get(), processor));
            }).collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
            try {
                Map<String, AtomicBoolean> sourcesWithData = this.levels.values().stream().flatMap(Collection::stream).filter(Job.Component::isSource).map(component -> new AbstractMap.SimpleEntry<String, AtomicBoolean>(component.getId(), new AtomicBoolean(true))).collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
                processors.values().forEach(Lifecycle::start);
                HashMap flowData = new HashMap();
                AtomicBoolean running = new AtomicBoolean(true);
                do {
                    this.levels.forEach((level, components) -> components.forEach(component -> {
                        if (component.isSource()) {
                            InputRunner source = (InputRunner)inputs.get(component.getId());
                            Record data2 = source.next();
                            if (data2 == null) {
                                ((AtomicBoolean)sourcesWithData.get(component.getId())).set(false);
                                return;
                            }
                            String key = (String)this.getKeyProvider(component.getId()).apply(new GroupContextImpl(data2, component.getId(), "__default__"));
                            flowData.computeIfAbsent(component.getId(), s -> new HashMap());
                            ((Map)flowData.get(component.getId())).computeIfAbsent("__default__", s -> new TreeMap());
                            ((Map)((Map)flowData.get(component.getId())).get("__default__")).computeIfAbsent(key, k -> new ArrayList()).add(data2);
                        } else {
                            List<Job.Edge> connections = this.getConnections(this.getEdges(), (Job.Component)component, e -> e.getTo().getNode());
                            DataInputFactory dataInputFactory = new DataInputFactory();
                            if (connections.size() == 1) {
                                Record data3;
                                Job.Edge edge2 = connections.get(0);
                                String fromId = edge2.getFrom().getNode().getId();
                                String fromBranch = edge2.getFrom().getBranch();
                                String toBranch = edge2.getTo().getBranch();
                                Map idData = (Map)flowData.get(fromId);
                                Record record = data3 = idData == null ? null : this.pollFirst((Map)idData.get(fromBranch));
                                if (data3 != null) {
                                    dataInputFactory.withInput(toBranch, Collections.singletonList(data3));
                                }
                            } else {
                                HashMap<String, Map<String, Collection<Record>>> availableDataForStep = new HashMap<String, Map<String, Collection<Record>>>();
                                connections.forEach(edge -> {
                                    Map data;
                                    String fromId = edge.getFrom().getNode().getId();
                                    String fromBranch = edge.getFrom().getBranch();
                                    String toBranch = edge.getTo().getBranch();
                                    Map map = data = flowData.get(fromId) == null ? null : (Map)((Map)flowData.get(fromId)).get(fromBranch);
                                    if (data != null && !data.isEmpty()) {
                                        availableDataForStep.put(toBranch, data);
                                    }
                                });
                                Map<String, String> joined = this.joinWithFusionSort(availableDataForStep);
                                if (!joined.isEmpty() && connections.size() == joined.size()) {
                                    joined.forEach((k, v) -> {
                                        Collection data = (Collection)((Map)availableDataForStep.get(k)).remove(v);
                                        dataInputFactory.withInput(k, data);
                                    });
                                }
                            }
                            if (dataInputFactory.inputs.isEmpty()) {
                                if (level.equals(this.levels.size() - 1) && sourcesWithData.entrySet().stream().noneMatch(e -> ((AtomicBoolean)e.getValue()).get())) {
                                    running.set(false);
                                }
                                return;
                            }
                            AutoChunkProcessor processor = (AutoChunkProcessor)processors.get(component.getId());
                            DataOutputFactory dataOutputFactory = new DataOutputFactory(((ComponentManager.AllServices)this.getManager().findPlugin(processor.plugin()).get().get(ComponentManager.AllServices.class)).getServices());
                            processor.onElement(dataInputFactory, dataOutputFactory);
                            dataOutputFactory.getOutputs().forEach((branch, data) -> data.forEach(item -> {
                                String key = (String)this.getKeyProvider(component.getId()).apply(new GroupContextImpl((Record)item, component.getId(), (String)branch));
                                flowData.computeIfAbsent(component.getId(), s -> new HashMap());
                                ((Map)flowData.get(component.getId())).computeIfAbsent(branch, s -> new TreeMap());
                                ((Map)((Map)flowData.get(component.getId())).get(branch)).computeIfAbsent(key, k -> new ArrayList()).add(item);
                            }));
                        }
                    }));
                } while (running.get());
            }
            finally {
                processors.values().forEach(Lifecycle::stop);
                inputs.values().forEach(InputRunner::stop);
                this.levels.values().stream().flatMap(Collection::stream).map(Job.Component::getId).forEach(LocalSequenceHolder::clean);
            }
        }

        private Map<String, String> joinWithFusionSort(Map<String, Map<String, Collection<Record>>> dataByBranch) {
            HashMap<String, String> join = new HashMap<String, String>();
            dataByBranch.forEach((branch1, records1) -> dataByBranch.forEach((branch2, records2) -> {
                if (!branch1.equals(branch2)) {
                    block0: for (String key1 : records1.keySet()) {
                        for (String key2 : records2.keySet()) {
                            if (key1.equals(key2)) {
                                join.putIfAbsent((String)branch1, key1);
                                join.putIfAbsent((String)branch2, key2);
                                continue;
                            }
                            if (key1.compareTo(key2) >= 0) continue;
                            continue block0;
                        }
                    }
                }
            }));
            return join;
        }

        private Record pollFirst(Map<String, Collection<Record>> data) {
            if (data == null || data.isEmpty()) {
                return null;
            }
            while (!data.isEmpty()) {
                String key = data.keySet().iterator().next();
                Collection<Record> items = data.get(key);
                if (!items.isEmpty()) {
                    Iterator<Record> iterator = items.iterator();
                    Record item = iterator.next();
                    iterator.remove();
                    return item;
                }
                data.remove(key);
            }
            return null;
        }

        private List<Job.Edge> getConnections(List<Job.Edge> edges, Job.Component step, Function<Job.Edge, Job.Component> direction) {
            return edges.stream().filter(edge -> ((Job.Component)direction.apply((Job.Edge)edge)).equals(step)).collect(Collectors.toList());
        }

        public GroupKeyProvider getKeyProvider(String componentId) {
            Object o;
            if (this.componentProperties.get(componentId) != null && GroupKeyProvider.class.isInstance(o = this.componentProperties.get(componentId).get(GroupKeyProvider.class.getName()))) {
                return new GroupKeyProviderImpl((GroupKeyProvider)GroupKeyProvider.class.cast(o));
            }
            o = this.jobProperties.get(GroupKeyProvider.class.getName());
            if (GroupKeyProvider.class.isInstance(o)) {
                return new GroupKeyProviderImpl((GroupKeyProvider)GroupKeyProvider.class.cast(o));
            }
            ServiceLoader<GroupKeyProvider> services = ServiceLoader.load(GroupKeyProvider.class);
            if (services.iterator().hasNext()) {
                return services.iterator().next();
            }
            return LocalSequenceHolder.cleanAndGet(componentId);
        }

        public Map<Integer, Set<Job.Component>> getLevels() {
            return this.levels;
        }

        public List<Job.Edge> getEdges() {
            return this.edges;
        }

        public Map<String, Map<String, Object>> getComponentProperties() {
            return this.componentProperties;
        }

        public Map<String, Object> getJobProperties() {
            return this.jobProperties;
        }

        public ComponentManager getManager() {
            return this.manager;
        }

        public JobExecutor(Map<Integer, Set<Job.Component>> levels, List<Job.Edge> edges, Map<String, Map<String, Object>> componentProperties) {
            this.levels = levels;
            this.edges = edges;
            this.componentProperties = componentProperties;
        }
    }

    private static class To
    implements Job.ToBuilder {
        private final List<Job.Component> nodes;
        private final List<Job.Edge> edges;
        private final Job.Connection from;
        private final Job.Builder builder;

        @Override
        public Job.Builder to(String id, String branch) {
            Job.Component to = this.nodes.stream().filter(node -> node.getId().equals(id)).findFirst().orElseThrow(() -> new IllegalStateException("No component with id '" + id + "' in created nodes"));
            this.edges.stream().filter(edge -> edge.getTo().getNode().getId().equals(id) && edge.getTo().getBranch().equals(branch)).findFirst().ifPresent(edge -> {
                throw new IllegalStateException("(" + id + "," + branch + ") node is already connected : " + edge);
            });
            this.edges.add(new Job.Edge(this.from, new Job.Connection(to, branch)));
            return this.builder;
        }

        public To(List<Job.Component> nodes, List<Job.Edge> edges, Job.Connection from, Job.Builder builder) {
            this.nodes = nodes;
            this.edges = edges;
            this.from = from;
            this.builder = builder;
        }
    }

    public static class LinkBuilder
    implements Job.FromBuilder,
    Job.Builder {
        private static final Logger log = LoggerFactory.getLogger(LinkBuilder.class);
        private final List<Job.Component> nodes;
        private final Map<String, Map<String, Object>> properties;
        private final List<Job.Edge> edges = new ArrayList<Job.Edge>();
        private final Map<Integer, Set<Job.Component>> levels = new TreeMap<Integer, Set<Job.Component>>();

        @Override
        public Job.ToBuilder from(String id, String branch) {
            Job.Component from = this.nodes.stream().filter(node -> node.getId().equals(id)).findFirst().orElseThrow(() -> new IllegalStateException("No component with id '" + id + "' in created components"));
            this.edges.stream().filter(edge -> edge.getFrom().getNode().getId().equals(id) && edge.getFrom().getBranch().equals(branch)).findFirst().ifPresent(edge -> {
                throw new IllegalStateException("(" + id + "," + branch + ") node is already connected : " + edge);
            });
            return new To(this.nodes, this.edges, new Job.Connection(from, branch), this);
        }

        public void doBuild() {
            List<Job.Component> orphans = this.nodes.stream().filter(n -> this.edges.stream().noneMatch(l -> l.getFrom().getNode().equals(n) || l.getTo().getNode().equals(n))).collect(Collectors.toList());
            orphans.forEach(o -> log.warn("component '" + o + "' is orphan in this graph. it will be ignored."));
            this.nodes.removeAll(orphans);
            this.nodes.stream().filter(node -> this.edges.stream().noneMatch(l -> l.getTo().getNode().equals(node))).forEach(component -> component.setSource(true));
            this.calculateGraphOrder(0, new HashSet<Job.Component>(this.nodes), new ArrayList<Job.Edge>(this.edges), this.levels);
        }

        private void calculateGraphOrder(int order, Set<Job.Component> nodes, List<Job.Edge> edges, Map<Integer, Set<Job.Component>> orderedGraph) {
            if (edges.isEmpty()) {
                orderedGraph.put(order, nodes);
                return;
            }
            Set startingNodes = nodes.stream().filter(node -> edges.stream().noneMatch(l -> l.getTo().getNode().equals(node))).collect(Collectors.toSet());
            if (order == 0 && startingNodes.isEmpty()) {
                throw new IllegalStateException("There is no starting component in this graph.");
            }
            List level = edges.stream().filter(edge -> startingNodes.contains(edge.getFrom().getNode())).filter(edge -> edges.stream().filter(others -> edge.getTo().getNode().equals(others.getTo().getNode())).map(others -> others.getFrom().getNode()).allMatch(startingNodes::contains)).collect(Collectors.toList());
            if (level.isEmpty()) {
                throw new IllegalStateException("the job pipeline has cyclic connection");
            }
            Set components = level.stream().map(edge -> edge.getFrom().getNode()).collect(Collectors.toSet());
            orderedGraph.put(order, components);
            edges.removeAll(level);
            nodes.removeAll(components);
            this.calculateGraphOrder(order + 1, nodes, edges, orderedGraph);
        }

        @Override
        public JobExecutor build() {
            this.doBuild();
            return new JobExecutor(this.levels, this.edges, this.properties);
        }

        public LinkBuilder(List<Job.Component> nodes, Map<String, Map<String, Object>> properties) {
            this.nodes = nodes;
            this.properties = properties;
        }
    }

    public static class NodeBuilderImpl
    implements Job.NodeBuilder {
        private final List<Job.Component> nodes = new ArrayList<Job.Component>();
        private final Map<String, Map<String, Object>> properties = new HashMap<String, Map<String, Object>>();

        @Override
        public Job.NodeBuilder property(String name, Object value) {
            Job.Component lastComponent = this.nodes.get(this.nodes.size() - 1);
            this.properties.computeIfAbsent(lastComponent.getId(), s -> new HashMap());
            this.properties.get(lastComponent.getId()).put(name, value);
            return this;
        }

        @Override
        public Job.NodeBuilder component(String id, String uri) {
            this.nodes.add(new Job.Component(id, DSLParser.parse(uri)));
            return this;
        }

        @Override
        public LinkBuilder connections() {
            return new LinkBuilder(this.nodes, this.properties);
        }
    }
}

