package cz.seznam.euphoria.spark;

import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.UnaryPredicate;
import cz.seznam.euphoria.core.client.graph.DAG;
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.operator.FlatMap;
import cz.seznam.euphoria.core.client.operator.Operator;
import cz.seznam.euphoria.core.client.operator.ReduceByKey;
import cz.seznam.euphoria.core.client.operator.ReduceStateByKey;
import cz.seznam.euphoria.core.client.operator.Repartition;
import cz.seznam.euphoria.core.client.operator.Sort;
import cz.seznam.euphoria.core.client.operator.Union;
import cz.seznam.euphoria.core.executor.FlowUnfolder;
import cz.seznam.euphoria.hadoop.output.DataSinkOutputFormat;
import cz.seznam.euphoria.shaded.guava.com.google.common.base.Preconditions;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

/* loaded from: input_file:cz/seznam/euphoria/spark/SparkFlowTranslator.class */
public class SparkFlowTranslator {
    private final Map<Class, Translation> translations = new IdentityHashMap();
    private final JavaSparkContext sparkEnv;

    /* loaded from: input_file:cz/seznam/euphoria/spark/SparkFlowTranslator$TranslateAcceptor.class */
    public static final class TranslateAcceptor<O> implements UnaryPredicate<Operator<?, ?>> {
        final Class<O> type;

        @Nullable
        final UnaryPredicate<O> accept;

        public TranslateAcceptor(Class<O> cls) {
            this(cls, null);
        }

        public TranslateAcceptor(Class<O> cls, @Nullable UnaryPredicate<O> unaryPredicate) {
            this.type = (Class) Objects.requireNonNull(cls);
            this.accept = unaryPredicate;
        }

        public Boolean apply(Operator<?, ?> operator) {
            return Boolean.valueOf(this.type == operator.getClass() && (this.accept == null || ((Boolean) this.accept.apply(this.type.cast(operator))).booleanValue()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/seznam/euphoria/spark/SparkFlowTranslator$Translation.class */
    public static class Translation<O extends Operator<?, ?>> {
        final SparkOperatorTranslator<O> translator;
        final UnaryPredicate<O> accept;

        private Translation(SparkOperatorTranslator<O> sparkOperatorTranslator, UnaryPredicate<O> unaryPredicate) {
            this.translator = (SparkOperatorTranslator) Objects.requireNonNull(sparkOperatorTranslator);
            this.accept = unaryPredicate;
        }

        static <O extends Operator<?, ?>> void set(Map<Class, Translation> map, Class<O> cls, SparkOperatorTranslator<O> sparkOperatorTranslator) {
            set(map, cls, sparkOperatorTranslator, null);
        }

        static <O extends Operator<?, ?>> void set(Map<Class, Translation> map, Class<O> cls, SparkOperatorTranslator<O> sparkOperatorTranslator, UnaryPredicate<O> unaryPredicate) {
            map.put(cls, new Translation(sparkOperatorTranslator, unaryPredicate));
        }
    }

    public SparkFlowTranslator(JavaSparkContext javaSparkContext) {
        this.sparkEnv = (JavaSparkContext) Objects.requireNonNull(javaSparkContext);
        Translation.set(this.translations, FlowUnfolder.InputOperator.class, new InputTranslator());
        Translation.set(this.translations, FlatMap.class, new FlatMapTranslator());
        Translation.set(this.translations, Repartition.class, new RepartitionTranslator());
        Translation.set(this.translations, ReduceStateByKey.class, new ReduceStateByKeyTranslator());
        Translation.set(this.translations, Union.class, new UnionTranslator());
        Translation.set(this.translations, ReduceByKey.class, new ReduceByKeyTranslator(), ReduceByKeyTranslator::wantTranslate);
        Translation.set(this.translations, Sort.class, new SortTranslator(), SortTranslator::wantTranslate);
    }

    public List<DataSink<?>> translateInto(Flow flow) {
        DAG<Operator<?, ?>> flowToDag = flowToDag(flow);
        SparkExecutorContext sparkExecutorContext = new SparkExecutorContext(this.sparkEnv, flowToDag);
        flowToDag.traverse().map((v0) -> {
            return v0.get();
        }).forEach(operator -> {
            Translation translation = this.translations.get(operator.getClass());
            if (translation == null) {
                throw new UnsupportedOperationException("Operator " + operator.getClass().getSimpleName() + " not supported");
            }
            Preconditions.checkState(translation.accept == 0 || Boolean.TRUE.equals(translation.accept.apply(operator)));
            sparkExecutorContext.setOutput(operator, translation.translator.translate(operator, sparkExecutorContext));
        });
        ArrayList arrayList = new ArrayList();
        flowToDag.getLeafs().stream().map((v0) -> {
            return v0.get();
        }).filter(operator2 -> {
            return operator2.output().getOutputSink() != null;
        }).forEach(operator3 -> {
            DataSink outputSink = operator3.output().getOutputSink();
            arrayList.add(outputSink);
            JavaPairRDD mapToPair = ((JavaRDD) Objects.requireNonNull(sparkExecutorContext.getOutput(operator3))).mapToPair(sparkElement -> {
                return new Tuple2(NullWritable.get(), sparkElement.getElement());
            });
            try {
                Configuration configure = DataSinkOutputFormat.configure(new Configuration(), outputSink);
                configure.set("mapreduce.job.outputformat.class", DataSinkOutputFormat.class.getName());
                mapToPair.saveAsNewAPIHadoopDataset(configure);
            } catch (IOException e) {
                throw new RuntimeException();
            }
        });
        return arrayList;
    }

    protected DAG<Operator<?, ?>> flowToDag(Flow flow) {
        Map<Class, Collection<TranslateAcceptor>> buildAcceptorsIndex = buildAcceptorsIndex(getAcceptors());
        return FlowUnfolder.unfold(flow, operator -> {
            Collection collection = (Collection) buildAcceptorsIndex.get(operator.getClass());
            if (collection != null && !collection.isEmpty()) {
                Iterator it = collection.iterator();
                while (it.hasNext()) {
                    if (((TranslateAcceptor) it.next()).apply((Operator<?, ?>) operator).booleanValue()) {
                        return true;
                    }
                }
            }
            return false;
        });
    }

    private Map<Class, Collection<TranslateAcceptor>> buildAcceptorsIndex(Collection<TranslateAcceptor> collection) {
        IdentityHashMap identityHashMap = new IdentityHashMap(collection.size());
        for (TranslateAcceptor translateAcceptor : collection) {
            ((Collection) identityHashMap.computeIfAbsent(translateAcceptor.type, cls -> {
                return new ArrayList();
            })).add(translateAcceptor);
        }
        return identityHashMap;
    }

    protected Collection<TranslateAcceptor> getAcceptors() {
        return (Collection) this.translations.entrySet().stream().map(entry -> {
            return new TranslateAcceptor((Class) entry.getKey(), ((Translation) entry.getValue()).accept);
        }).collect(Collectors.toList());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -325424581:
                if (implMethodName.equals("lambda$flowToDag$759c3ad$1")) {
                    z = true;
                    break;
                }
                break;
            case -203334768:
                if (implMethodName.equals("lambda$null$d278b1a$1")) {
                    z = false;
                    break;
                }
                break;
            case 720818462:
                if (implMethodName.equals("wantTranslate")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/PairFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Lscala/Tuple2;") && serializedLambda.getImplClass().equals("cz/seznam/euphoria/spark/SparkFlowTranslator") && serializedLambda.getImplMethodSignature().equals("(Lcz/seznam/euphoria/spark/SparkElement;)Lscala/Tuple2;")) {
                    return sparkElement -> {
                        return new Tuple2(NullWritable.get(), sparkElement.getElement());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/seznam/euphoria/core/client/functional/UnaryPredicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/seznam/euphoria/spark/SparkFlowTranslator") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map;Lcz/seznam/euphoria/core/client/operator/Operator;)Ljava/lang/Boolean;")) {
                    Map map = (Map) serializedLambda.getCapturedArg(0);
                    return operator -> {
                        Collection collection = (Collection) map.get(operator.getClass());
                        if (collection != null && !collection.isEmpty()) {
                            Iterator it = collection.iterator();
                            while (it.hasNext()) {
                                if (((TranslateAcceptor) it.next()).apply((Operator<?, ?>) operator).booleanValue()) {
                                    return true;
                                }
                            }
                        }
                        return false;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/seznam/euphoria/core/client/functional/UnaryPredicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/seznam/euphoria/spark/ReduceByKeyTranslator") && serializedLambda.getImplMethodSignature().equals("(Lcz/seznam/euphoria/core/client/operator/ReduceByKey;)Z")) {
                    return ReduceByKeyTranslator::wantTranslate;
                }
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/seznam/euphoria/core/client/functional/UnaryPredicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/seznam/euphoria/spark/SortTranslator") && serializedLambda.getImplMethodSignature().equals("(Lcz/seznam/euphoria/core/client/operator/Sort;)Z")) {
                    return SortTranslator::wantTranslate;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
