package cz.seznam.euphoria.spark;

import cz.seznam.euphoria.core.client.accumulators.AccumulatorProvider;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.functional.UnaryFunctor;
import cz.seznam.euphoria.shadow.com.google.common.collect.Iterators;
import java.util.Iterator;
import java.util.Objects;
import org.apache.spark.api.java.function.FlatMapFunction;

/* loaded from: input_file:cz/seznam/euphoria/spark/UnaryFunctorWrapper.class */
class UnaryFunctorWrapper<WID extends Window, IN, OUT> implements FlatMapFunction<SparkElement<WID, IN>, SparkElement<WID, OUT>> {
    private final UnaryFunctor<IN, OUT> functor;
    private final AccumulatorProvider accumulators;
    private transient FunctionCollectorMem<OUT> cachedCollector;

    public UnaryFunctorWrapper(UnaryFunctor<IN, OUT> unaryFunctor, AccumulatorProvider accumulatorProvider) {
        this.functor = (UnaryFunctor) Objects.requireNonNull(unaryFunctor);
        this.accumulators = (AccumulatorProvider) Objects.requireNonNull(accumulatorProvider);
    }

    public Iterator<SparkElement<WID, OUT>> call(SparkElement<WID, IN> sparkElement) {
        WID window = sparkElement.getWindow();
        long timestamp = getTimestamp(sparkElement);
        FunctionCollectorMem<OUT> context = getContext();
        context.clear();
        context.setWindow(window);
        this.functor.apply(sparkElement.getElement(), context);
        return Iterators.transform(context.getOutputIterator(), obj -> {
            return new SparkElement(window, timestamp, obj);
        });
    }

    protected long getTimestamp(SparkElement<WID, IN> sparkElement) {
        return sparkElement.getTimestamp();
    }

    private FunctionCollectorMem<OUT> getContext() {
        if (this.cachedCollector == null) {
            this.cachedCollector = new FunctionCollectorMem<>(this.accumulators);
        }
        return this.cachedCollector;
    }
}
