/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.asyncprocessing.operators.windowing.functions;

import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.WrappingFunction;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.state.v2.StateIterator;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalAsyncWindowFunction;
import org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalProcessWindowContext;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;

public final class InternalAggregateProcessAsyncWindowFunction<T, ACC, V, R, K, W extends Window>
extends WrappingFunction<ProcessWindowFunction<V, R, K, W>>
implements InternalAsyncWindowFunction<StateIterator<T>, R, K, W> {
    private static final long serialVersionUID = 1L;
    private final AggregateFunction<T, ACC, V> aggFunction;

    public InternalAggregateProcessAsyncWindowFunction(AggregateFunction<T, ACC, V> aggFunction, ProcessWindowFunction<V, R, K, W> windowFunction) {
        super(windowFunction);
        this.aggFunction = aggFunction;
    }

    @Override
    public StateFuture<Void> process(K key, W window, InternalAsyncWindowFunction.InternalWindowContext context, StateIterator<T> input, Collector<R> out) throws Exception {
        InternalProcessWindowContext ctx = new InternalProcessWindowContext((ProcessWindowFunction)this.wrappedFunction);
        ctx.window = window;
        ctx.internalContext = context;
        AtomicReference<ACC> finalAcc = new AtomicReference<ACC>(this.aggFunction.createAccumulator());
        return input.onNext(val -> finalAcc.set(this.aggFunction.add(val, finalAcc.get()))).thenAccept(ignore -> {
            ProcessWindowFunction wrappedFunction = (ProcessWindowFunction)this.wrappedFunction;
            wrappedFunction.process(key, ctx, Collections.singletonList(this.aggFunction.getResult(finalAcc.get())), out);
        });
    }

    @Override
    public StateFuture<Void> clear(W window, InternalAsyncWindowFunction.InternalWindowContext context) throws Exception {
        InternalProcessWindowContext ctx = new InternalProcessWindowContext((ProcessWindowFunction)this.wrappedFunction);
        ctx.window = window;
        ctx.internalContext = context;
        ProcessWindowFunction wrappedFunction = (ProcessWindowFunction)this.wrappedFunction;
        wrappedFunction.clear(ctx);
        return StateFutureUtils.completedVoidFuture();
    }

    @Override
    public RuntimeContext getRuntimeContext() {
        throw new RuntimeException("This should never be called.");
    }

    @Override
    public IterationRuntimeContext getIterationRuntimeContext() {
        throw new RuntimeException("This should never be called.");
    }
}

