/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.api.input;

import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.DefaultKeyedStateStore;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.api.input.BufferingCollector;
import org.apache.flink.state.api.input.StreamOperatorContextBuilder;
import org.apache.flink.state.api.input.operator.StateReaderOperator;
import org.apache.flink.state.api.input.splits.KeyGroupRangeInputSplit;
import org.apache.flink.state.api.runtime.SavepointRuntimeContext;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class KeyedStateInputFormat<K, N, OUT>
extends RichInputFormat<OUT, KeyGroupRangeInputSplit> {
    private static final long serialVersionUID = 8230460226049597182L;
    private static final Logger LOG = LoggerFactory.getLogger(KeyedStateInputFormat.class);
    private final OperatorState operatorState;
    @Nullable
    private final StateBackend stateBackend;
    private final Configuration configuration;
    private final StateReaderOperator<?, K, N, OUT> operator;
    private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
    private transient CloseableRegistry registry;
    private transient BufferingCollector<OUT> out;
    private transient CloseableIterator<Tuple2<K, N>> keysAndNamespaces;

    public KeyedStateInputFormat(OperatorState operatorState, @Nullable StateBackend stateBackend, Configuration configuration, StateReaderOperator<?, K, N, OUT> operator, ExecutionConfig executionConfig) throws IOException {
        Preconditions.checkNotNull((Object)operatorState, (String)"The operator state cannot be null");
        Preconditions.checkNotNull((Object)configuration, (String)"The configuration cannot be null");
        Preconditions.checkNotNull(operator, (String)"The operator cannot be null");
        Preconditions.checkNotNull((Object)executionConfig, (String)"The executionConfig cannot be null");
        this.operatorState = operatorState;
        this.stateBackend = stateBackend;
        this.configuration = new Configuration(configuration);
        this.operator = operator;
        this.serializedExecutionConfig = new SerializedValue((Object)executionConfig);
    }

    public void configure(Configuration parameters) {
    }

    public InputSplitAssigner getInputSplitAssigner(KeyGroupRangeInputSplit[] inputSplits) {
        return new DefaultInputSplitAssigner((InputSplit[])inputSplits);
    }

    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
        return cachedStatistics;
    }

    public KeyGroupRangeInputSplit[] createInputSplits(int minNumSplits) throws IOException {
        int maxParallelism = this.operatorState.getMaxParallelism();
        List<KeyGroupRange> keyGroups = KeyedStateInputFormat.sortedKeyGroupRanges(minNumSplits, maxParallelism);
        return (KeyGroupRangeInputSplit[])CollectionUtil.mapWithIndex(keyGroups, (keyGroupRange, index) -> KeyedStateInputFormat.createKeyGroupRangeInputSplit(this.operatorState, maxParallelism, keyGroupRange, index)).toArray(KeyGroupRangeInputSplit[]::new);
    }

    public void openInputFormat() {
        this.out = new BufferingCollector();
    }

    public void open(KeyGroupRangeInputSplit split) throws IOException {
        ExecutionConfig executionConfig;
        this.registry = new CloseableRegistry();
        RuntimeContext runtimeContext = this.getRuntimeContext();
        try {
            executionConfig = (ExecutionConfig)this.serializedExecutionConfig.deserializeValue(runtimeContext.getUserCodeClassLoader());
        }
        catch (ClassNotFoundException e) {
            throw new RuntimeException("Could not deserialize ExecutionConfig.", e);
        }
        StreamOperatorStateContext context = new StreamOperatorContextBuilder(runtimeContext, this.configuration, this.operatorState, split, this.registry, this.stateBackend, executionConfig).withMaxParallelism(split.getNumKeyGroups()).withKey(this.operator, runtimeContext.createSerializer(this.operator.getKeyType())).build(LOG);
        AbstractKeyedStateBackend keyedStateBackend = (AbstractKeyedStateBackend)context.keyedStateBackend();
        DefaultKeyedStateStore keyedStateStore = new DefaultKeyedStateStore((KeyedStateBackend)keyedStateBackend, arg_0 -> ((RuntimeContext)runtimeContext).createSerializer(arg_0));
        SavepointRuntimeContext ctx = new SavepointRuntimeContext(runtimeContext, (KeyedStateStore)keyedStateStore);
        InternalTimeServiceManager timeServiceManager = context.internalTimerServiceManager();
        try {
            this.operator.setup(arg_0 -> ((RuntimeContext)runtimeContext).createSerializer(arg_0), (KeyedStateBackend<K>)keyedStateBackend, (InternalTimeServiceManager<K>)timeServiceManager, ctx);
            this.operator.open();
            this.keysAndNamespaces = this.operator.getKeysAndNamespaces(ctx);
        }
        catch (Exception e) {
            throw new IOException("Failed to restore timer state", e);
        }
    }

    public void close() throws IOException {
        try {
            IOUtils.closeQuietly(this.keysAndNamespaces);
            IOUtils.closeQuietly(this.operator);
            IOUtils.closeQuietly((AutoCloseable)this.registry);
        }
        catch (Exception e) {
            throw new IOException("Failed to close state backend", e);
        }
    }

    public boolean reachedEnd() {
        return !this.out.hasNext() && !this.keysAndNamespaces.hasNext();
    }

    public OUT nextRecord(OUT reuse) throws IOException {
        if (this.out.hasNext()) {
            return this.out.next();
        }
        Tuple2 keyAndNamespace = (Tuple2)this.keysAndNamespaces.next();
        this.operator.setCurrentKey(keyAndNamespace.f0);
        try {
            this.operator.processElement(keyAndNamespace.f0, keyAndNamespace.f1, this.out);
        }
        catch (Exception e) {
            throw new IOException("User defined function KeyedStateReaderFunction#readKey threw an exception", e);
        }
        return this.out.next();
    }

    private static KeyGroupRangeInputSplit createKeyGroupRangeInputSplit(OperatorState operatorState, int maxParallelism, KeyGroupRange keyGroupRange, Integer index) {
        List managedKeyedState = StateAssignmentOperation.getManagedKeyedStateHandles((OperatorState)operatorState, (KeyGroupRange)keyGroupRange);
        List rawKeyedState = StateAssignmentOperation.getRawKeyedStateHandles((OperatorState)operatorState, (KeyGroupRange)keyGroupRange);
        return new KeyGroupRangeInputSplit(managedKeyedState, rawKeyedState, maxParallelism, index);
    }

    @Nonnull
    private static List<KeyGroupRange> sortedKeyGroupRanges(int minNumSplits, int maxParallelism) {
        List keyGroups = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism, (int)Math.min(minNumSplits, maxParallelism));
        keyGroups.sort(Comparator.comparing(KeyGroupRange::getStartKeyGroup));
        return keyGroups;
    }
}

