/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.v2;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.state.v2.AbstractKeyedState;
import org.apache.flink.runtime.state.v2.internal.InternalReducingState;

public class AbstractReducingState<K, N, V>
extends AbstractKeyedState<K, N, V>
implements InternalReducingState<K, N, V> {
    protected final ReduceFunction<V> reduceFunction;

    public AbstractReducingState(StateRequestHandler stateRequestHandler, ReducingStateDescriptor<V> stateDescriptor) {
        super(stateRequestHandler, stateDescriptor);
        this.reduceFunction = stateDescriptor.getReduceFunction();
    }

    public StateFuture<V> asyncGet() {
        return this.asyncGetInternal();
    }

    public StateFuture<Void> asyncAdd(V value) {
        return this.asyncGetInternal().thenCompose(oldValue -> {
            Object newValue = oldValue == null ? value : this.reduceFunction.reduce(oldValue, value);
            return this.asyncUpdateInternal((V)newValue);
        });
    }

    public V get() {
        return this.getInternal();
    }

    public void add(V value) {
        V oldValue = this.getInternal();
        try {
            V newValue = oldValue == null ? value : this.reduceFunction.reduce(oldValue, value);
            this.updateInternal(newValue);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public StateFuture<Void> asyncMergeNamespaces(N target, Collection<N> sources) {
        if (sources == null || sources.isEmpty()) {
            return StateFutureUtils.completedVoidFuture();
        }
        ArrayList<StateFuture<V>> futures = new ArrayList<StateFuture<V>>(sources.size() + 1);
        for (N source : sources) {
            if (source == null) continue;
            this.setCurrentNamespace(source);
            futures.add(this.asyncGetInternal());
        }
        this.setCurrentNamespace(target);
        futures.add(this.asyncGetInternal());
        return StateFutureUtils.combineAll(futures).thenCompose(values -> {
            ArrayList<StateFuture<Void>> updateFutures = new ArrayList<StateFuture<Void>>(sources.size() + 1);
            Object current = null;
            Iterator valueIterator = values.iterator();
            for (Object source : sources) {
                Object value = valueIterator.next();
                if (value == null) continue;
                this.setCurrentNamespace(source);
                updateFutures.add(this.asyncUpdateInternal((V)null));
                if (current != null) {
                    current = this.reduceFunction.reduce(current, value);
                    continue;
                }
                current = value;
            }
            Object targetValue = valueIterator.next();
            if (current != null) {
                if (targetValue != null) {
                    current = this.reduceFunction.reduce(current, targetValue);
                }
                this.setCurrentNamespace(target);
                updateFutures.add(this.asyncUpdateInternal((V)current));
            }
            return StateFutureUtils.combineAll(updateFutures).thenAccept(ignores -> {});
        });
    }

    @Override
    public void mergeNamespaces(N target, Collection<N> sources) {
        if (sources == null || sources.isEmpty()) {
            return;
        }
        try {
            Object current = null;
            for (N source : sources) {
                if (source == null) continue;
                this.setCurrentNamespace(source);
                V oldValue = this.getInternal();
                if (oldValue == null) continue;
                this.updateInternal((V)null);
                if (current != null) {
                    current = this.reduceFunction.reduce(current, oldValue);
                    continue;
                }
                current = oldValue;
            }
            if (current != null) {
                this.setCurrentNamespace(target);
                V targetValue = this.getInternal();
                if (targetValue != null) {
                    current = this.reduceFunction.reduce(current, targetValue);
                }
                this.updateInternal((V)current);
            }
        }
        catch (Exception e) {
            throw new RuntimeException("merge namespace fail.", e);
        }
    }

    @Override
    public StateFuture<V> asyncGetInternal() {
        return this.handleRequest(StateRequestType.REDUCING_GET, null);
    }

    @Override
    public StateFuture<Void> asyncUpdateInternal(V valueToStore) {
        return this.handleRequest(StateRequestType.REDUCING_ADD, valueToStore);
    }

    @Override
    public V getInternal() {
        return (V)this.handleRequestSync(StateRequestType.REDUCING_GET, null);
    }

    @Override
    public void updateInternal(V valueToStore) {
        this.handleRequestSync(StateRequestType.REDUCING_ADD, valueToStore);
    }
}

