/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.sorted.state;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.runtime.state.SavepointResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.streaming.api.operators.sorted.state.AbstractBatchExecutionKeyState;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalPriorityQueueSet;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyAggregatingState;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyListState;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyMapState;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyReducingState;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyValueState;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchExecutionKeyedStateBackend<K>
implements CheckpointableKeyedStateBackend<K> {
    private static final Logger LOG = LoggerFactory.getLogger(BatchExecutionKeyedStateBackend.class);
    private static final Map<StateDescriptor.Type, StateFactory> STATE_FACTORIES = Stream.of(Tuple2.of(StateDescriptor.Type.VALUE, BatchExecutionKeyValueState::create), Tuple2.of(StateDescriptor.Type.LIST, BatchExecutionKeyListState::create), Tuple2.of(StateDescriptor.Type.MAP, BatchExecutionKeyMapState::create), Tuple2.of(StateDescriptor.Type.AGGREGATING, BatchExecutionKeyAggregatingState::create), Tuple2.of(StateDescriptor.Type.REDUCING, BatchExecutionKeyReducingState::create)).collect(Collectors.toMap(t -> (StateDescriptor.Type)((Object)((Object)t.f0)), t -> (StateFactory)t.f1));
    private K currentKey = null;
    private final TypeSerializer<K> keySerializer;
    private final List<KeyedStateBackend.KeySelectionListener<K>> keySelectionListeners = new ArrayList<KeyedStateBackend.KeySelectionListener<K>>();
    private final Map<String, State> states = new HashMap<String, State>();
    private final Map<String, KeyGroupedInternalPriorityQueue<?>> priorityQueues = new HashMap();
    private final KeyGroupRange keyGroupRange;
    private final ExecutionConfig executionConfig;

    public BatchExecutionKeyedStateBackend(TypeSerializer<K> keySerializer, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig) {
        this.keySerializer = keySerializer;
        this.keyGroupRange = keyGroupRange;
        this.executionConfig = executionConfig;
    }

    @Override
    public void setCurrentKey(K newKey) {
        if (!Objects.equals(newKey, this.currentKey)) {
            this.notifyKeySelected(newKey);
            for (State state : this.states.values()) {
                ((AbstractBatchExecutionKeyState)state).clearAllNamespaces();
            }
            for (KeyGroupedInternalPriorityQueue keyGroupedInternalPriorityQueue : this.priorityQueues.values()) {
                while (keyGroupedInternalPriorityQueue.poll() != null) {
                }
            }
            this.currentKey = newKey;
        }
    }

    @Override
    public K getCurrentKey() {
        return this.currentKey;
    }

    @Override
    public void setCurrentKeyAndKeyGroup(K newKey, int newKeyGroupIndex) {
        this.setCurrentKey(newKey);
    }

    @Override
    public TypeSerializer<K> getKeySerializer() {
        return this.keySerializer;
    }

    @Override
    public <N, S extends State, T> void applyToAllKeys(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor, KeyedStateFunction<K, S> function) {
        LOG.debug("Not iterating over all keyed in BATCH execution mode in applyToAllKeys().");
    }

    @Override
    public <N> Stream<K> getKeys(String state, N namespace) {
        LOG.debug("Returning an empty stream in BATCH execution mode in getKeys().");
        return Stream.empty();
    }

    @Override
    public <N> Stream<K> getKeys(List<String> states, N namespace) {
        LOG.debug("Returning an empty stream in BATCH execution mode in getKeys().");
        return Stream.empty();
    }

    @Override
    public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {
        LOG.debug("Returning an empty stream in BATCH execution mode in getKeysAndNamespaces().");
        return Stream.empty();
    }

    @Override
    public <N, S extends State, T> S getOrCreateKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor) throws Exception {
        State state;
        Preconditions.checkNotNull(namespaceSerializer, "Namespace serializer");
        Preconditions.checkNotNull(this.keySerializer, "State key serializer has not been configured in the config. This operation cannot use partitioned state.");
        if (!stateDescriptor.isSerializerInitialized()) {
            stateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
        }
        if ((state = this.states.get(stateDescriptor.getName())) == null) {
            state = this.createState(namespaceSerializer, stateDescriptor);
            this.states.put(stateDescriptor.getName(), state);
        }
        return (S)state;
    }

    @Override
    public <N, S extends State> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        S state = this.getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
        ((InternalKvState)state).setCurrentNamespace(namespace);
        return state;
    }

    @Override
    public void dispose() {
    }

    private void notifyKeySelected(K newKey) {
        for (KeyedStateBackend.KeySelectionListener<K> keySelectionListener : this.keySelectionListeners) {
            keySelectionListener.keySelected(newKey);
        }
    }

    @Override
    public void registerKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener) {
        this.keySelectionListeners.add(listener);
    }

    @Override
    public boolean deregisterKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener) {
        return this.keySelectionListeners.remove(listener);
    }

    @Override
    @Nonnull
    public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S, SV> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
        return this.createState(namespaceSerializer, stateDesc);
    }

    private <N, SV, S extends State, IS extends S> IS createState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S, SV> stateDesc) throws Exception {
        StateFactory stateFactory = STATE_FACTORIES.get((Object)stateDesc.getType());
        if (stateFactory == null) {
            String message = String.format("State %s is not supported by %s", stateDesc.getClass(), this.getClass());
            throw new FlinkRuntimeException(message);
        }
        return stateFactory.createState(this.keySerializer, namespaceSerializer, stateDesc);
    }

    @Override
    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
        KeyGroupedInternalPriorityQueue<?> priorityQueue = this.priorityQueues.get(stateName);
        if (priorityQueue == null) {
            priorityQueue = new BatchExecutionInternalPriorityQueueSet(PriorityComparator.forPriorityComparableObjects(), 128);
            this.priorityQueues.put(stateName, priorityQueue);
        }
        return priorityQueue;
    }

    @Override
    public KeyGroupRange getKeyGroupRange() {
        return this.keyGroupRange;
    }

    @Override
    public void close() throws IOException {
    }

    @Override
    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) {
        throw new UnsupportedOperationException("Snapshotting is not supported in BATCH runtime mode.");
    }

    @Override
    @Nonnull
    public SavepointResources<K> savepoint() throws Exception {
        throw new UnsupportedOperationException("Savepoints are not supported in BATCH runtime mode.");
    }

    @FunctionalInterface
    private static interface StateFactory {
        public <T, K, N, SV, S extends State, IS extends S> IS createState(TypeSerializer<K> var1, TypeSerializer<N> var2, StateDescriptor<S, SV> var3) throws Exception;
    }
}

