package cz.seznam.euphoria.spark;

import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.functional.ReduceFunctor;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.client.operator.ReduceByKey;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.core.executor.util.SingleValueContext;
import cz.seznam.euphoria.shadow.com.google.common.base.Preconditions;
import cz.seznam.euphoria.shadow.com.google.common.collect.Iterators;
import java.lang.invoke.SerializedLambda;
import java.util.Iterator;
import java.util.Objects;
import java.util.stream.Stream;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;

/* loaded from: input_file:cz/seznam/euphoria/spark/ReduceByKeyTranslator.class */
class ReduceByKeyTranslator implements SparkOperatorTranslator<ReduceByKey> {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/seznam/euphoria/spark/ReduceByKeyTranslator$CompositeKeyExtractor.class */
    public static class CompositeKeyExtractor implements PairFlatMapFunction<SparkElement, KeyedWindow, TimestampedElement> {
        private final UnaryFunction keyExtractor;
        private final UnaryFunction valueExtractor;
        private final Windowing windowing;

        CompositeKeyExtractor(UnaryFunction unaryFunction, UnaryFunction unaryFunction2, Windowing windowing) {
            this.keyExtractor = unaryFunction;
            this.valueExtractor = unaryFunction2;
            this.windowing = windowing;
        }

        public Iterator<Tuple2<KeyedWindow, TimestampedElement>> call(SparkElement sparkElement) throws Exception {
            return Iterators.transform(this.windowing.assignWindowsToElement(sparkElement).iterator(), window -> {
                long maxTimestamp = ((Window) Objects.requireNonNull(window)).maxTimestamp() - 1;
                return new Tuple2(new KeyedWindow(window, maxTimestamp, this.keyExtractor.apply(sparkElement.getElement())), new TimestampedElement(maxTimestamp, this.valueExtractor.apply(sparkElement.getElement())));
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/seznam/euphoria/spark/ReduceByKeyTranslator$Reducer.class */
    public static class Reducer implements Function2<TimestampedElement, TimestampedElement, TimestampedElement> {
        private final ReduceFunctor<Iterable<Object>, Object> reducer;
        private Object[] iterable;
        private SingleValueContext<Object> context;

        private Reducer(ReduceFunctor<Iterable<Object>, Object> reduceFunctor) {
            this.reducer = reduceFunctor;
            this.iterable = new Object[2];
        }

        public TimestampedElement call(TimestampedElement timestampedElement, TimestampedElement timestampedElement2) {
            if (this.context == null) {
                this.context = new SingleValueContext<>();
            }
            this.iterable[0] = timestampedElement.getElement();
            this.iterable[1] = timestampedElement2.getElement();
            this.reducer.apply(Stream.of(this.iterable), this.context);
            return new TimestampedElement(Math.max(timestampedElement.getTimestamp(), timestampedElement2.getTimestamp()), this.context.getAndResetValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean wantTranslate(ReduceByKey reduceByKey) {
        return reduceByKey.isCombinable() && (reduceByKey.getWindowing() == null || !((reduceByKey.getWindowing() instanceof MergingWindowing) || reduceByKey.getWindowing().getTrigger().isStateful()));
    }

    @Override // cz.seznam.euphoria.spark.SparkOperatorTranslator
    public JavaRDD<?> translate(ReduceByKey reduceByKey, SparkExecutorContext sparkExecutorContext) {
        JavaRDD<?> singleInput = sparkExecutorContext.getSingleInput(reduceByKey);
        ReduceFunctor reducer = reduceByKey.getReducer();
        Windowing windowing = reduceByKey.getWindowing() == null ? AttachedWindowing.INSTANCE : reduceByKey.getWindowing();
        UnaryFunction keyExtractor = reduceByKey.getKeyExtractor();
        UnaryFunction valueExtractor = reduceByKey.getValueExtractor();
        Preconditions.checkState(reduceByKey.isCombinable(), "Non-combinable ReduceByKey not supported!");
        Preconditions.checkState(!(windowing instanceof MergingWindowing), "MergingWindowing not supported!");
        Preconditions.checkState(!windowing.getTrigger().isStateful(), "Stateful triggers not supported!");
        return singleInput.flatMapToPair(new CompositeKeyExtractor(keyExtractor, valueExtractor, windowing)).reduceByKey(new Reducer(reducer)).map(tuple2 -> {
            KeyedWindow keyedWindow = (KeyedWindow) tuple2._1();
            TimestampedElement timestampedElement = (TimestampedElement) tuple2._2();
            return new SparkElement(keyedWindow.window(), timestampedElement.getTimestamp(), Pair.of(keyedWindow.key(), timestampedElement.getElement()));
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 163125944:
                if (implMethodName.equals("lambda$translate$e495e37$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/seznam/euphoria/spark/ReduceByKeyTranslator") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple2;)Lcz/seznam/euphoria/spark/SparkElement;")) {
                    return tuple2 -> {
                        KeyedWindow keyedWindow = (KeyedWindow) tuple2._1();
                        TimestampedElement timestampedElement = (TimestampedElement) tuple2._2();
                        return new SparkElement(keyedWindow.window(), timestampedElement.getTimestamp(), Pair.of(keyedWindow.key(), timestampedElement.getElement()));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
