/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.asyncprocessing;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.function.Consumer;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.state.v2.StateIterator;
import org.apache.flink.core.asyncprocessing.InternalAsyncFuture;
import org.apache.flink.core.state.InternalStateIterator;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingConsumer;

public abstract class AbstractStateIterator<T>
implements InternalStateIterator<T> {
    final State originalState;
    final StateRequestType requestType;
    final StateRequestHandler stateHandler;
    final Collection<T> cache;

    public AbstractStateIterator(State originalState, StateRequestType requestType, StateRequestHandler stateHandler, Collection<T> partialResult) {
        this.originalState = originalState;
        this.requestType = requestType;
        this.stateHandler = stateHandler;
        this.cache = partialResult;
    }

    @Override
    public abstract boolean hasNextLoading();

    protected abstract Object nextPayloadForContinuousLoading();

    @Override
    public Iterable<T> getCurrentCache() {
        return this.cache == null ? Collections.emptyList() : this.cache;
    }

    protected StateRequestType getRequestType() {
        return this.requestType;
    }

    private InternalAsyncFuture<StateIterator<T>> asyncNextLoad() {
        return this.stateHandler.handleRequest(this.originalState, StateRequestType.ITERATOR_LOADING, this.nextPayloadForContinuousLoading());
    }

    private StateIterator<T> syncNextLoad() {
        return (StateIterator)this.stateHandler.handleRequestSync(this.originalState, StateRequestType.ITERATOR_LOADING, this.nextPayloadForContinuousLoading());
    }

    @Override
    public <U> StateFuture<Collection<U>> onNext(FunctionWithException<T, StateFuture<? extends U>, Exception> iterating) {
        if (this.isEmpty()) {
            return StateFutureUtils.completedFuture(Collections.emptyList());
        }
        ArrayList<StateFuture<? extends U>> resultFutures = new ArrayList<StateFuture<? extends U>>();
        try {
            for (T item : this.cache) {
                StateFuture<? extends U> resultFuture = iterating.apply(item);
                if (resultFuture == null) continue;
                resultFutures.add(resultFuture);
            }
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Failed to iterate over state.", e);
        }
        if (this.hasNextLoading()) {
            return StateFutureUtils.combineAll(resultFutures).thenCombine(this.asyncNextLoad().thenCompose(itr -> itr.onNext(iterating)), (a, b) -> {
                ArrayList result = new ArrayList(a.size() + b.size());
                result.addAll(a);
                result.addAll(b);
                return result;
            });
        }
        return StateFutureUtils.combineAll(resultFutures);
    }

    @Override
    public StateFuture<Void> onNext(ThrowingConsumer<T, Exception> iterating) {
        if (this.isEmpty()) {
            return StateFutureUtils.completedVoidFuture();
        }
        try {
            for (T item : this.cache) {
                iterating.accept(item);
            }
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Failed to iterate over state.", e);
        }
        if (this.hasNextLoading()) {
            return this.asyncNextLoad().thenCompose(itr -> itr.onNext(iterating));
        }
        return StateFutureUtils.completedVoidFuture();
    }

    public void onNextSync(Consumer<T> iterating) {
        if (this.isEmpty()) {
            return;
        }
        for (T item : this.cache) {
            iterating.accept(item);
        }
        if (this.hasNextLoading()) {
            ((AbstractStateIterator)this.syncNextLoad()).onNextSync(iterating);
        }
    }

    @Override
    public boolean isEmpty() {
        return (this.cache == null || this.cache.isEmpty()) && !this.hasNextLoading();
    }
}

