package cz.seznam.euphoria.spark;

import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.client.operator.ExtractEventTime;
import cz.seznam.euphoria.core.client.operator.ReduceStateByKey;
import cz.seznam.euphoria.core.client.operator.state.State;
import cz.seznam.euphoria.core.client.operator.state.StateFactory;
import cz.seznam.euphoria.core.client.operator.state.StateMerger;
import cz.seznam.euphoria.core.client.triggers.Trigger;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.core.executor.greduce.GroupReducer;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:cz/seznam/euphoria/spark/ReduceStateByKeyTranslator.class */
public class ReduceStateByKeyTranslator implements SparkOperatorTranslator<ReduceStateByKey> {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/seznam/euphoria/spark/ReduceStateByKeyTranslator$CompositeKeyExtractor.class */
    public static class CompositeKeyExtractor implements PairFlatMapFunction<SparkElement, KeyedWindow, Object> {
        private final UnaryFunction keyExtractor;
        private final UnaryFunction valueExtractor;
        private final Windowing windowing;

        @Nullable
        private final ExtractEventTime eventTimeAssigner;

        public CompositeKeyExtractor(UnaryFunction unaryFunction, UnaryFunction unaryFunction2, Windowing windowing, @Nullable ExtractEventTime extractEventTime) {
            this.keyExtractor = unaryFunction;
            this.valueExtractor = unaryFunction2;
            this.windowing = windowing;
            this.eventTimeAssigner = extractEventTime;
        }

        public Iterator<Tuple2<KeyedWindow, Object>> call(SparkElement sparkElement) throws Exception {
            if (this.eventTimeAssigner != null) {
                sparkElement.setTimestamp(this.eventTimeAssigner.extractTimestamp(sparkElement.getElement()));
            }
            Iterable<Window> assignWindowsToElement = this.windowing.assignWindowsToElement(sparkElement);
            ArrayList arrayList = new ArrayList();
            for (Window window : assignWindowsToElement) {
                Object element = sparkElement.getElement();
                arrayList.add(new Tuple2(new KeyedWindow(window, sparkElement.getTimestamp(), this.keyExtractor.apply(element)), this.valueExtractor.apply(element)));
            }
            return arrayList.iterator();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/seznam/euphoria/spark/ReduceStateByKeyTranslator$KeyTimestampComparator.class */
    public static class KeyTimestampComparator implements Comparator<KeyedWindow>, Serializable {
        private KeyTimestampComparator() {
        }

        @Override // java.util.Comparator
        public int compare(KeyedWindow keyedWindow, KeyedWindow keyedWindow2) {
            int compare = Integer.compare(keyedWindow.key().hashCode(), keyedWindow2.key().hashCode());
            if (compare == 0) {
                compare = Long.compare(keyedWindow.timestamp(), keyedWindow2.timestamp());
            }
            return compare;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/seznam/euphoria/spark/ReduceStateByKeyTranslator$StateReducer.class */
    public static class StateReducer implements FlatMapFunction<Iterator<Tuple2<KeyedWindow, Object>>, SparkElement> {
        private final Windowing windowing;
        private final Trigger trigger;
        private final StateFactory<?, ?, State<?, ?>> stateFactory;
        private final StateMerger<?, ?, State<?, ?>> stateCombiner;
        private final SparkStorageProvider storageProvider = new SparkStorageProvider();
        private transient Map<Object, GroupReducer> activeReducers;

        public StateReducer(Windowing windowing, StateFactory<?, ?, State<?, ?>> stateFactory, StateMerger<?, ?, State<?, ?>> stateMerger) {
            this.windowing = windowing;
            this.trigger = windowing.getTrigger();
            this.stateFactory = stateFactory;
            this.stateCombiner = stateMerger;
        }

        public Iterator<SparkElement> call(Iterator<Tuple2<KeyedWindow, Object>> it) {
            this.activeReducers = new HashMap();
            FunctionContextAsync functionContextAsync = new FunctionContextAsync();
            functionContextAsync.runAsynchronously(() -> {
                int i = 0;
                while (it.hasNext()) {
                    Tuple2 tuple2 = (Tuple2) it.next();
                    KeyedWindow keyedWindow = (KeyedWindow) tuple2._1();
                    Object _2 = tuple2._2();
                    if (i != keyedWindow.key().hashCode()) {
                        flushStates();
                    }
                    i = keyedWindow.key().hashCode();
                    GroupReducer groupReducer = this.activeReducers.get(keyedWindow.key());
                    if (groupReducer == null) {
                        groupReducer = new GroupReducer(this.stateFactory, this.stateCombiner, this.storageProvider, SparkElement::new, this.windowing, this.trigger, obj -> {
                            functionContextAsync.collect((SparkElement) obj);
                        }, false);
                        this.activeReducers.put(keyedWindow.key(), groupReducer);
                    }
                    groupReducer.process(new SparkElement(keyedWindow.window(), keyedWindow.timestamp(), Pair.of(keyedWindow.key(), _2)));
                }
                flushStates();
            });
            return functionContextAsync.iterator();
        }

        private void flushStates() {
            Iterator<Map.Entry<Object, GroupReducer>> it = this.activeReducers.entrySet().iterator();
            while (it.hasNext()) {
                it.next().getValue().close();
            }
            this.activeReducers.clear();
        }
    }

    @Override // cz.seznam.euphoria.spark.SparkOperatorTranslator
    public JavaRDD<?> translate(ReduceStateByKey reduceStateByKey, SparkExecutorContext sparkExecutorContext) {
        JavaRDD<?> singleInput = sparkExecutorContext.getSingleInput(reduceStateByKey);
        StateFactory stateFactory = reduceStateByKey.getStateFactory();
        StateMerger stateMerger = reduceStateByKey.getStateMerger();
        UnaryFunction keyExtractor = reduceStateByKey.getKeyExtractor();
        UnaryFunction valueExtractor = reduceStateByKey.getValueExtractor();
        ExtractEventTime eventTimeAssigner = reduceStateByKey.getEventTimeAssigner();
        Windowing windowing = reduceStateByKey.getWindowing() == null ? AttachedWindowing.INSTANCE : reduceStateByKey.getWindowing();
        return singleInput.flatMapToPair(new CompositeKeyExtractor(keyExtractor, valueExtractor, windowing, eventTimeAssigner)).repartitionAndSortWithinPartitions(((windowing instanceof MergingWindowing) || !reduceStateByKey.getPartitioning().hasDefaultPartitioner()) ? new PartitioningWrapper(reduceStateByKey.getPartitioning()) : new HashPartitioner(reduceStateByKey.getPartitioning().getNumPartitions()), new KeyTimestampComparator()).mapPartitions(new StateReducer(windowing, stateFactory, stateMerger));
    }
}
