/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.util.ArrayList;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.UnsupportedSideInputReader;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.runners.direct.CommittedBundle;
import org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals;
import org.apache.beam.runners.direct.DirectGroupByKey;
import org.apache.beam.runners.direct.DirectTimerInternals;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.runners.direct.TransformEvaluator;
import org.apache.beam.runners.direct.TransformEvaluatorFactory;
import org.apache.beam.runners.direct.TransformResult;
import org.apache.beam.runners.direct.UncommittedBundle;
import org.apache.beam.runners.local.StructuralKey;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValueReceiver;
import org.apache.beam.sdk.util.construction.TriggerTranslation;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

class GroupAlsoByWindowEvaluatorFactory
implements TransformEvaluatorFactory {
    private final @UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext;
    private final @UnknownKeyFor @NonNull @Initialized PipelineOptions options;

    GroupAlsoByWindowEvaluatorFactory(@UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext, @UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
        this.evaluationContext = evaluationContext;
        this.options = options;
    }

    @Override
    public <InputT> @UnknownKeyFor @NonNull @Initialized TransformEvaluator<InputT> forApplication(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> application, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized CommittedBundle<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> inputBundle) {
        TransformEvaluator evaluator = this.createEvaluator(application, inputBundle);
        return evaluator;
    }

    @Override
    public void cleanup() {
    }

    private <K, V> @UnknownKeyFor @NonNull @Initialized TransformEvaluator<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<K, V>> createEvaluator(@UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<K, V>>, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, @UnknownKeyFor @NonNull @Initialized Iterable<V>>>, @UnknownKeyFor @NonNull @Initialized DirectGroupByKey.DirectGroupAlsoByWindow<K, V>> application, @UnknownKeyFor @NonNull @Initialized CommittedBundle<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<K, V>> inputBundle) {
        return new GroupAlsoByWindowEvaluator<K, V>(this.evaluationContext, this.options, inputBundle, application);
    }

    private static class BundleWindowedValueReceiver<@UnknownKeyFor K, @UnknownKeyFor V>
    implements WindowedValueReceiver<KV<K, Iterable<V>>> {
        private final @UnknownKeyFor @NonNull @Initialized UncommittedBundle<@UnknownKeyFor @NonNull @Initialized KV<K, @UnknownKeyFor @NonNull @Initialized Iterable<V>>> bundle;

        private BundleWindowedValueReceiver(@UnknownKeyFor @NonNull @Initialized UncommittedBundle<@UnknownKeyFor @NonNull @Initialized KV<K, @UnknownKeyFor @NonNull @Initialized Iterable<V>>> bundle) {
            this.bundle = bundle;
        }

        @Override
        public void output(@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KV<K, @UnknownKeyFor @NonNull @Initialized Iterable<V>>> valueWithMetadata) {
            this.bundle.add(valueWithMetadata);
        }
    }

    private static class GroupAlsoByWindowEvaluator<@UnknownKeyFor K, @UnknownKeyFor V>
    implements TransformEvaluator<KeyedWorkItem<K, V>> {
        private final @UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext;
        private final @UnknownKeyFor @NonNull @Initialized PipelineOptions options;
        private final @UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<K, V>>, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, @UnknownKeyFor @NonNull @Initialized Iterable<V>>>, @UnknownKeyFor @NonNull @Initialized DirectGroupByKey.DirectGroupAlsoByWindow<K, V>> application;
        private final  @UnknownKeyFor @NonNull @Initialized DirectExecutionContext. @UnknownKeyFor @NonNull @Initialized DirectStepContext stepContext;
        private final /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized WindowingStrategy<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @NonNull @Initialized BoundedWindow> windowingStrategy;
        private final /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized StructuralKey<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> structuralKey;
        private final /*
         * Issues handling annotations - annotations may be inaccurate
         */
        @UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized UncommittedBundle<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> outputBundles;
        private final // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized ImmutableList.Builder<@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<K, V>>> unprocessedElements;
        private final @UnknownKeyFor @NonNull @Initialized SystemReduceFn<K, V, @UnknownKeyFor @NonNull @Initialized Iterable<V>, @UnknownKeyFor @NonNull @Initialized Iterable<V>, @UnknownKeyFor @NonNull @Initialized BoundedWindow> reduceFn;
        private final @UnknownKeyFor @NonNull @Initialized Counter droppedDueToLateness;

        public GroupAlsoByWindowEvaluator(@UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext, @UnknownKeyFor @NonNull @Initialized PipelineOptions options, @UnknownKeyFor @NonNull @Initialized CommittedBundle<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<K, V>> inputBundle, @UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<K, V>>, @UnknownKeyFor @NonNull @Initialized PCollection<@UnknownKeyFor @NonNull @Initialized KV<K, @UnknownKeyFor @NonNull @Initialized Iterable<V>>>, @UnknownKeyFor @NonNull @Initialized DirectGroupByKey.DirectGroupAlsoByWindow<K, V>> application) {
            this.evaluationContext = evaluationContext;
            this.options = options;
            this.application = application;
            this.structuralKey = inputBundle.getKey();
            this.stepContext = evaluationContext.getExecutionContext(application, inputBundle.getKey()).getStepContext(evaluationContext.getStepName(application));
            this.windowingStrategy = application.getTransform().getInputWindowingStrategy();
            this.outputBundles = new ArrayList();
            this.unprocessedElements = ImmutableList.builder();
            Coder<V> valueCoder = application.getTransform().getValueCoder(((PCollection)inputBundle.getPCollection()).getCoder());
            this.reduceFn = SystemReduceFn.buffering(valueCoder);
            this.droppedDueToLateness = Metrics.counter(GroupAlsoByWindowEvaluator.class, "DroppedDueToLateness");
        }

        @Override
        public void processElement(@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<K, V>> element) throws @UnknownKeyFor @NonNull @Initialized Exception {
            KeyedWorkItem<K, V> workItem = element.getValue();
            K key = workItem.key();
            UncommittedBundle bundle = this.evaluationContext.createKeyedBundle(this.structuralKey, (PCollection)Iterables.getOnlyElement(this.application.getOutputs().values()));
            this.outputBundles.add(bundle);
            StateInternals stateInternals = this.stepContext.stateInternals();
            DirectTimerInternals timerInternals = this.stepContext.timerInternals();
            RunnerApi.Trigger runnerApiTrigger = TriggerTranslation.toProto(this.windowingStrategy.getTrigger());
            ReduceFnRunner reduceFnRunner = new ReduceFnRunner(key, this.windowingStrategy, ExecutableTriggerStateMachine.create(TriggerStateMachines.stateMachineForTrigger(runnerApiTrigger)), stateInternals, timerInternals, new BundleWindowedValueReceiver(bundle), new UnsupportedSideInputReader(DirectGroupByKey.DirectGroupAlsoByWindow.class.getSimpleName()), this.reduceFn, this.options);
            reduceFnRunner.processElements(this.dropExpiredWindows(key, workItem.elementsIterable(), timerInternals));
            reduceFnRunner.onTimers(workItem.timersIterable());
            reduceFnRunner.persist();
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized TransformResult<@UnknownKeyFor @NonNull @Initialized KeyedWorkItem<K, V>> finishBundle() throws @UnknownKeyFor @NonNull @Initialized Exception {
            CopyOnAccessInMemoryStateInternals state = this.stepContext.commitState();
            return StepTransformResult.withHold(this.application, state.getEarliestWatermarkHold()).withState(state).addOutput(this.outputBundles).withTimerUpdate(this.stepContext.getTimerUpdate()).addUnprocessedElements(this.unprocessedElements.build()).withBundleFinalizations(this.stepContext.getAndClearFinalizations()).build();
        }

        @UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized WindowedValue<V>> dropExpiredWindows(K key, @UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized WindowedValue<V>> elements, @UnknownKeyFor @NonNull @Initialized TimerInternals timerInternals) {
            return StreamSupport.stream(elements.spliterator(), false).flatMap(wv -> StreamSupport.stream(wv.explodeWindows().spliterator(), false)).filter(input -> {
                BoundedWindow window = (BoundedWindow)Iterables.getOnlyElement(input.getWindows());
                boolean expired = window.maxTimestamp().plus((ReadableDuration)this.windowingStrategy.getAllowedLateness()).isBefore((ReadableInstant)timerInternals.currentInputWatermarkTime());
                if (expired) {
                    this.droppedDueToLateness.inc();
                    WindowTracing.debug("{}: Dropping element at {} for key: {}; window: {} since it is too far behind inputWatermark: {}", DirectGroupByKey.DirectGroupAlsoByWindow.class.getSimpleName(), input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime());
                }
                return !expired;
            }).collect(Collectors.toList());
        }
    }
}

