/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.asyncprocessing.operators.windowing;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.SerializerFactory;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.v2.AppendingState;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.state.v2.StateDescriptor;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable;
import org.apache.flink.runtime.asyncprocessing.declare.state.StateWithDeclaredNamespace;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
import org.apache.flink.runtime.asyncprocessing.operators.TimestampedCollectorWithDeclaredVariable;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalAsyncWindowFunction;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.DefaultKeyedStateStore;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.v2.internal.InternalAppendingState;
import org.apache.flink.runtime.state.v2.internal.InternalListState;
import org.apache.flink.runtime.state.v2.internal.InternalMergingState;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorAttributes;
import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class AsyncWindowOperator<K, IN, ACC, OUT, W extends Window>
extends AbstractAsyncStateUdfStreamOperator<OUT, InternalAsyncWindowFunction<ACC, OUT, K, W>>
implements OneInputStreamOperator<IN, OUT>,
Triggerable<K, W> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(AsyncWindowOperator.class);
    protected final WindowAssigner<? super IN, W> windowAssigner;
    private final KeySelector<IN, K> keySelector;
    private final AsyncTrigger<? super IN, ? super W> trigger;
    private final StateDescriptor<?> windowStateDescriptor;
    protected final TypeSerializer<K> keySerializer;
    protected final TypeSerializer<W> windowSerializer;
    protected final long allowedLateness;
    protected final OutputTag<IN> lateDataOutputTag;
    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
    protected transient Counter numLateRecordsDropped;
    private transient InternalAppendingState<K, W, IN, ACC, ACC, ACC> windowState;
    private transient InternalMergingState<K, W, IN, ACC, ACC, ACC> windowMergingState;
    private transient InternalListState<K, VoidNamespace, Tuple2<W, W>> mergingSetsState;
    protected transient TimestampedCollectorWithDeclaredVariable<OUT> timestampedCollector;
    protected transient Context triggerContext;
    protected transient WindowContext processContext;
    protected transient DeclaredVariable<W> windowDeclaredVariable;
    protected transient WindowAssigner.WindowAssignerContext windowAssignerContext;
    protected transient InternalTimerService<W> internalTimerService;

    public AsyncWindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> keySerializer, StateDescriptor<?> windowStateDescriptor, InternalAsyncWindowFunction<ACC, OUT, K, W> windowFunction, AsyncTrigger<? super IN, ? super W> trigger, long allowedLateness, OutputTag<IN> lateDataOutputTag) {
        super(windowFunction);
        Preconditions.checkArgument(allowedLateness >= 0L);
        Preconditions.checkArgument(windowStateDescriptor == null || windowStateDescriptor.isSerializerInitialized(), "window state serializer is not properly initialized");
        this.windowAssigner = Preconditions.checkNotNull(windowAssigner);
        this.windowSerializer = Preconditions.checkNotNull(windowSerializer);
        this.keySelector = Preconditions.checkNotNull(keySelector);
        this.keySerializer = Preconditions.checkNotNull(keySerializer);
        this.windowStateDescriptor = windowStateDescriptor;
        this.trigger = Preconditions.checkNotNull(trigger);
        this.allowedLateness = allowedLateness;
        this.lateDataOutputTag = lateDataOutputTag;
        LOG.info("Initialize async window operator with trigger " + String.valueOf(trigger));
    }

    @Override
    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
        super.setup(containingTask, config, output);
    }

    @Override
    public void open() throws Exception {
        super.open();
        Preconditions.checkNotNull(this.getAsyncKeyedStateBackend(), "Async state backend is not properly set.");
        this.numLateRecordsDropped = this.metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
        this.timestampedCollector = new TimestampedCollectorWithDeclaredVariable(this.output, this.declarationContext);
        this.internalTimerService = this.getInternalTimerService("window-timers", this.windowSerializer, this);
        this.windowDeclaredVariable = this.declarationContext.declareVariable(this.windowSerializer, "_AsyncWindowOperator$window", this.windowSerializer::createInstance);
        this.triggerContext = new Context(this.windowDeclaredVariable);
        this.processContext = new WindowContext(this.windowDeclaredVariable);
        this.windowAssignerContext = new WindowAssigner.WindowAssignerContext(){

            @Override
            public long getCurrentProcessingTime() {
                return AsyncWindowOperator.this.internalTimerService.currentProcessingTime();
            }
        };
        if (this.windowStateDescriptor != null) {
            this.windowState = StateWithDeclaredNamespace.create((InternalAppendingState)this.getAsyncKeyedStateBackend().getOrCreateKeyedState(this.windowSerializer.createInstance(), this.windowSerializer, this.windowStateDescriptor), this.windowDeclaredVariable);
        }
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            throw new UnsupportedOperationException("Async WindowOperator not support merging window (e.g. session window) yet.");
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        this.timestampedCollector = null;
        this.triggerContext = null;
        this.processContext = null;
        this.windowAssignerContext = null;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        Collection<W> elementWindows = this.windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), this.windowAssignerContext);
        Object key = this.getCurrentKey();
        ArrayList<StateFuture<Void>> windowFutures = new ArrayList<StateFuture<Void>>();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            throw new UnsupportedOperationException("Async WindowOperator not support merging window (e.g. session window) yet.");
        }
        for (Window window : elementWindows) {
            if (this.isWindowLate(window)) continue;
            AtomicReference triggerResult = new AtomicReference();
            this.windowDeclaredVariable.set(window);
            windowFutures.add(this.windowState.asyncAdd(element.getValue()).thenCompose(ignore -> this.triggerContext.onElement(element).thenAccept(triggerResult::set)).thenConditionallyCompose(ignore -> ((TriggerResult)((Object)((Object)triggerResult.get()))).isFire(), ignore -> this.windowState.asyncGet().thenConditionallyCompose(Objects::nonNull, contents -> this.emitWindowContents(key, window, contents))).thenConditionallyCompose(ignore -> ((TriggerResult)((Object)((Object)triggerResult.get()))).isPurge(), ignore -> this.windowState.asyncClear()).thenAccept(empty -> this.registerCleanupTimer(window)));
        }
        if (windowFutures.isEmpty() && this.isElementLate(element)) {
            if (this.lateDataOutputTag != null) {
                this.sideOutput(element);
            } else {
                this.numLateRecordsDropped.inc();
            }
        }
    }

    @Override
    public void onEventTime(InternalTimer<K, W> timer) throws Exception {
        this.windowDeclaredVariable.set((Window)timer.getNamespace());
        AtomicReference triggerResult = new AtomicReference();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            throw new UnsupportedOperationException("Async WindowOperator not support merging window (e.g. session window) yet.");
        }
        this.triggerContext.onEventTime(timer.getTimestamp()).thenAccept(triggerResult::set).thenConditionallyCompose(ignore -> ((TriggerResult)((Object)((Object)triggerResult.get()))).isFire(), ignore -> this.windowState.asyncGet().thenConditionallyCompose(Objects::nonNull, contents -> this.emitWindowContents(timer.getKey(), (Window)timer.getNamespace(), contents))).thenConditionallyCompose(ignore -> ((TriggerResult)((Object)((Object)triggerResult.get()))).isPurge(), ignore -> this.windowState.asyncClear()).thenConditionallyCompose(ignore -> this.windowAssigner.isEventTime() && this.isCleanupTime((Window)timer.getNamespace(), timer.getTimestamp()), ignore -> this.clearAllState((Window)timer.getNamespace(), this.windowState));
    }

    @Override
    public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
        this.windowDeclaredVariable.set((Window)timer.getNamespace());
        AtomicReference triggerResult = new AtomicReference();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            throw new UnsupportedOperationException("Async WindowOperator not support merging window (e.g. session window) yet.");
        }
        this.triggerContext.onProcessingTime(timer.getTimestamp()).thenAccept(triggerResult::set).thenConditionallyCompose(ignore -> ((TriggerResult)((Object)((Object)triggerResult.get()))).isFire(), ignore -> this.windowState.asyncGet().thenConditionallyCompose(Objects::nonNull, contents -> this.emitWindowContents(timer.getKey(), (Window)timer.getNamespace(), contents))).thenConditionallyCompose(ignore -> ((TriggerResult)((Object)((Object)triggerResult.get()))).isPurge(), ignore -> this.windowState.asyncClear()).thenConditionallyCompose(ignore -> !this.windowAssigner.isEventTime() && this.isCleanupTime((Window)timer.getNamespace(), timer.getTimestamp()), ignore -> this.clearAllState((Window)timer.getNamespace(), this.windowState));
    }

    @Override
    public OperatorAttributes getOperatorAttributes() {
        boolean isOutputOnlyAfterEndOfStream = this.windowAssigner instanceof GlobalWindows && this.trigger.isEndOfStreamTrigger();
        return new OperatorAttributesBuilder().setOutputOnlyAfterEndOfStream(isOutputOnlyAfterEndOfStream).build();
    }

    private StateFuture<Void> clearAllState(W window, AppendingState<IN, ACC, ACC> windowState) {
        return windowState.asyncClear().thenCompose(ignore -> this.triggerContext.clear()).thenCompose(ignore -> {
            this.windowDeclaredVariable.set(window);
            return this.processContext.clear();
        });
    }

    private StateFuture<Void> emitWindowContents(K key, W window, ACC contents) throws Exception {
        this.timestampedCollector.setAbsoluteTimestamp(((Window)window).maxTimestamp());
        this.windowDeclaredVariable.set(window);
        return ((InternalAsyncWindowFunction)this.userFunction).process(key, window, this.processContext, contents, this.timestampedCollector);
    }

    protected void sideOutput(StreamRecord<IN> element) {
        this.output.collect(this.lateDataOutputTag, element);
    }

    protected boolean isWindowLate(W window) {
        return this.windowAssigner.isEventTime() && this.cleanupTime(window) <= this.internalTimerService.currentWatermark();
    }

    protected boolean isElementLate(StreamRecord<IN> element) {
        return this.windowAssigner.isEventTime() && element.getTimestamp() + this.allowedLateness <= this.internalTimerService.currentWatermark();
    }

    protected void registerCleanupTimer(W window) {
        long cleanupTime = this.cleanupTime(window);
        if (cleanupTime == Long.MAX_VALUE) {
            return;
        }
        if (this.windowAssigner.isEventTime()) {
            this.triggerContext.registerEventTimeTimer(cleanupTime);
        } else {
            this.triggerContext.registerProcessingTimeTimer(cleanupTime);
        }
    }

    protected void deleteCleanupTimer(W window) {
        long cleanupTime = this.cleanupTime(window);
        if (cleanupTime == Long.MAX_VALUE) {
            return;
        }
        if (this.windowAssigner.isEventTime()) {
            this.triggerContext.deleteEventTimeTimer(cleanupTime);
        } else {
            this.triggerContext.deleteProcessingTimeTimer(cleanupTime);
        }
    }

    private long cleanupTime(W window) {
        if (this.windowAssigner.isEventTime()) {
            long cleanupTime = ((Window)window).maxTimestamp() + this.allowedLateness;
            return cleanupTime >= ((Window)window).maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
        }
        return ((Window)window).maxTimestamp();
    }

    protected final boolean isCleanupTime(W window, long time) {
        return time == this.cleanupTime(window);
    }

    @VisibleForTesting
    public AsyncTrigger<? super IN, ? super W> getTrigger() {
        return this.trigger;
    }

    @VisibleForTesting
    public KeySelector<IN, K> getKeySelector() {
        return this.keySelector;
    }

    @VisibleForTesting
    public WindowAssigner<? super IN, W> getWindowAssigner() {
        return this.windowAssigner;
    }

    @VisibleForTesting
    public StateDescriptor<?> getStateDescriptor() {
        return this.windowStateDescriptor;
    }

    @VisibleForTesting
    public WindowContext getProcessContext() {
        return this.processContext;
    }

    @VisibleForTesting
    public WindowAssigner.WindowAssignerContext getWindowAssignerContext() {
        return this.windowAssignerContext;
    }

    public class Context
    implements AsyncTrigger.OnMergeContext {
        protected DeclaredVariable<W> window;

        public Context(DeclaredVariable<W> window) {
            this.window = window;
        }

        @Override
        public MetricGroup getMetricGroup() {
            return AsyncWindowOperator.this.getMetricGroup();
        }

        @Override
        public long getCurrentWatermark() {
            return AsyncWindowOperator.this.internalTimerService.currentWatermark();
        }

        @Override
        public <T, S extends State> S getPartitionedState(StateDescriptor<T> stateDescriptor) {
            try {
                return StateWithDeclaredNamespace.create(AsyncWindowOperator.this.getAsyncKeyedStateBackend().getOrCreateKeyedState(this.window.get(), AsyncWindowOperator.this.windowSerializer, stateDescriptor), this.window);
            }
            catch (Exception e) {
                throw new RuntimeException("Could not retrieve state", e);
            }
        }

        @Override
        public <T> void mergePartitionedState(StateDescriptor<T> stateDescriptor) {
            throw new UnsupportedOperationException("Merging window not supported yet");
        }

        @Override
        public long getCurrentProcessingTime() {
            return AsyncWindowOperator.this.internalTimerService.currentProcessingTime();
        }

        @Override
        public void registerProcessingTimeTimer(long time) {
            AsyncWindowOperator.this.internalTimerService.registerProcessingTimeTimer((Window)this.window.get(), time);
        }

        @Override
        public void registerEventTimeTimer(long time) {
            AsyncWindowOperator.this.internalTimerService.registerEventTimeTimer((Window)this.window.get(), time);
        }

        @Override
        public void deleteProcessingTimeTimer(long time) {
            AsyncWindowOperator.this.internalTimerService.deleteProcessingTimeTimer((Window)this.window.get(), time);
        }

        @Override
        public void deleteEventTimeTimer(long time) {
            AsyncWindowOperator.this.internalTimerService.deleteEventTimeTimer((Window)this.window.get(), time);
        }

        public StateFuture<TriggerResult> onElement(StreamRecord<IN> element) throws Exception {
            return AsyncWindowOperator.this.trigger.onElement(element.getValue(), element.getTimestamp(), (Window)this.window.get(), this);
        }

        public StateFuture<TriggerResult> onProcessingTime(long time) throws Exception {
            return AsyncWindowOperator.this.trigger.onProcessingTime(time, (Window)this.window.get(), this);
        }

        public StateFuture<TriggerResult> onEventTime(long time) throws Exception {
            return AsyncWindowOperator.this.trigger.onEventTime(time, (Window)this.window.get(), this);
        }

        public void onMerge(Collection<W> mergedWindows) throws Exception {
            throw new UnsupportedOperationException("Merging window not support yet");
        }

        public StateFuture<Void> clear() throws Exception {
            return AsyncWindowOperator.this.trigger.clear((Window)this.window.get(), this);
        }

        public String toString() {
            return "Context{window=" + String.valueOf(this.window) + "}";
        }
    }

    public class WindowContext
    implements InternalAsyncWindowFunction.InternalWindowContext {
        protected final DeclaredVariable<W> window;
        protected AbstractPerWindowStateStore windowState;

        public WindowContext(DeclaredVariable<W> window) {
            this.window = window;
            this.windowState = new PerWindowStateStore(AsyncWindowOperator.this.getAsyncKeyedStateBackend(), AsyncWindowOperator.this.getExecutionConfig(), window);
        }

        public String toString() {
            return "WindowContext{Window = " + this.window.toString() + "}";
        }

        public StateFuture<Void> clear() throws Exception {
            return ((InternalAsyncWindowFunction)AsyncWindowOperator.this.userFunction).clear((Window)this.window.get(), this);
        }

        @Override
        public long currentProcessingTime() {
            return AsyncWindowOperator.this.internalTimerService.currentProcessingTime();
        }

        @Override
        public long currentWatermark() {
            return AsyncWindowOperator.this.internalTimerService.currentWatermark();
        }

        @Override
        public KeyedStateStore windowState() {
            return this.windowState;
        }

        @Override
        public KeyedStateStore globalState() {
            return AsyncWindowOperator.this.getKeyedStateStore();
        }

        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {
            if (outputTag == null) {
                throw new IllegalArgumentException("OutputTag must not be null.");
            }
            AsyncWindowOperator.this.output.collect(outputTag, new StreamRecord<X>(value, ((Window)this.window.get()).maxTimestamp()));
        }
    }

    public class PerWindowStateStore
    extends AbstractPerWindowStateStore {
        public PerWindowStateStore(AsyncKeyedStateBackend<?> keyedStateBackend, ExecutionConfig executionConfig, DeclaredVariable<W> window) {
            super(keyedStateBackend, executionConfig, window);
        }

        @Override
        protected <S extends State, SV> S getPartitionedState(StateDescriptor<SV> stateDescriptor) throws Exception {
            return StateWithDeclaredNamespace.create(this.asyncKeyedStateBackend.getOrCreateKeyedState(this.window.get(), AsyncWindowOperator.this.windowSerializer, stateDescriptor), this.window);
        }
    }

    public abstract class AbstractPerWindowStateStore
    extends DefaultKeyedStateStore {
        protected final DeclaredVariable<W> window;

        public AbstractPerWindowStateStore(AsyncKeyedStateBackend<?> keyedStateBackend, final ExecutionConfig executionConfig, DeclaredVariable<W> window) {
            super(keyedStateBackend, new SerializerFactory(){

                @Override
                public <T> TypeSerializer<T> createSerializer(TypeInformation<T> typeInformation) {
                    return typeInformation.createSerializer(executionConfig.getSerializerConfig());
                }
            });
            this.window = window;
        }
    }
}

