/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.stream.Stream;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.InternalCheckpointListener;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.TestableKeyedStateBackend;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateFactory;
import org.apache.flink.runtime.state.ttl.TtlStateFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;

public abstract class AbstractKeyedStateBackend<K>
implements CheckpointableKeyedStateBackend<K>,
InternalCheckpointListener,
TestableKeyedStateBackend<K>,
InternalKeyContext<K> {
    protected final TypeSerializer<K> keySerializer;
    private final ArrayList<KeyedStateBackend.KeySelectionListener<K>> keySelectionListeners;
    private final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
    private String lastName;
    private InternalKvState lastState;
    protected final int numberOfKeyGroups;
    protected final KeyGroupRange keyGroupRange;
    protected final TaskKvStateRegistry kvStateRegistry;
    protected CloseableRegistry cancelStreamRegistry;
    protected final ClassLoader userCodeClassLoader;
    private final ExecutionConfig executionConfig;
    protected final TtlTimeProvider ttlTimeProvider;
    protected final LatencyTrackingStateConfig latencyTrackingStateConfig;
    protected final StreamCompressionDecorator keyGroupCompressionDecorator;
    protected final InternalKeyContext<K> keyContext;

    public AbstractKeyedStateBackend(TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, LatencyTrackingStateConfig latencyTrackingStateConfig, CloseableRegistry cancelStreamRegistry, InternalKeyContext<K> keyContext) {
        this(kvStateRegistry, keySerializer, userCodeClassLoader, executionConfig, ttlTimeProvider, latencyTrackingStateConfig, cancelStreamRegistry, AbstractKeyedStateBackend.determineStreamCompression(executionConfig), keyContext);
    }

    public AbstractKeyedStateBackend(TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, LatencyTrackingStateConfig latencyTrackingStateConfig, CloseableRegistry cancelStreamRegistry, StreamCompressionDecorator keyGroupCompressionDecorator, InternalKeyContext<K> keyContext) {
        this(kvStateRegistry, keySerializer, userCodeClassLoader, executionConfig, ttlTimeProvider, latencyTrackingStateConfig, cancelStreamRegistry, keyGroupCompressionDecorator, Preconditions.checkNotNull(keyContext), keyContext.getNumberOfKeyGroups(), keyContext.getKeyGroupRange(), new HashMap(), new ArrayList<KeyedStateBackend.KeySelectionListener<K>>(1), null, null);
    }

    protected AbstractKeyedStateBackend(AbstractKeyedStateBackend<K> abstractKeyedStateBackend) {
        this(abstractKeyedStateBackend.kvStateRegistry, abstractKeyedStateBackend.keySerializer, abstractKeyedStateBackend.userCodeClassLoader, abstractKeyedStateBackend.executionConfig, abstractKeyedStateBackend.ttlTimeProvider, abstractKeyedStateBackend.latencyTrackingStateConfig, abstractKeyedStateBackend.cancelStreamRegistry, abstractKeyedStateBackend.keyGroupCompressionDecorator, abstractKeyedStateBackend.keyContext, abstractKeyedStateBackend.numberOfKeyGroups, abstractKeyedStateBackend.keyGroupRange, abstractKeyedStateBackend.keyValueStatesByName, abstractKeyedStateBackend.keySelectionListeners, abstractKeyedStateBackend.lastState, abstractKeyedStateBackend.lastName);
    }

    private AbstractKeyedStateBackend(TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, LatencyTrackingStateConfig latencyTrackingStateConfig, CloseableRegistry cancelStreamRegistry, StreamCompressionDecorator keyGroupCompressionDecorator, InternalKeyContext<K> keyContext, int numberOfKeyGroups, KeyGroupRange keyGroupRange, HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName, ArrayList<KeyedStateBackend.KeySelectionListener<K>> keySelectionListeners, InternalKvState lastState, String lastName) {
        this.keyContext = Preconditions.checkNotNull(keyContext);
        this.numberOfKeyGroups = numberOfKeyGroups;
        this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
        Preconditions.checkArgument(numberOfKeyGroups >= 1, "NumberOfKeyGroups must be a positive number");
        Preconditions.checkArgument(numberOfKeyGroups >= keyGroupRange.getNumberOfKeyGroups(), "The total number of key groups must be at least the number in the key group range assigned to this backend. The total number of key groups: %s, the number in key groups in range: %s", numberOfKeyGroups, keyGroupRange.getNumberOfKeyGroups());
        this.kvStateRegistry = kvStateRegistry;
        this.keySerializer = keySerializer;
        this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
        this.cancelStreamRegistry = cancelStreamRegistry;
        this.keyValueStatesByName = keyValueStatesByName;
        this.executionConfig = executionConfig;
        this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
        this.ttlTimeProvider = Preconditions.checkNotNull(ttlTimeProvider);
        this.latencyTrackingStateConfig = Preconditions.checkNotNull(latencyTrackingStateConfig);
        this.keySelectionListeners = keySelectionListeners;
        this.lastState = lastState;
        this.lastName = lastName;
    }

    private static StreamCompressionDecorator determineStreamCompression(ExecutionConfig executionConfig) {
        if (executionConfig != null && executionConfig.isUseSnapshotCompression()) {
            return SnappyStreamCompressionDecorator.INSTANCE;
        }
        return UncompressedStreamCompressionDecorator.INSTANCE;
    }

    @Override
    public void notifyCheckpointSubsumed(long checkpointId) throws Exception {
    }

    @Override
    public void dispose() {
        IOUtils.closeQuietly(this.cancelStreamRegistry);
        if (this.kvStateRegistry != null) {
            this.kvStateRegistry.unregisterAll();
        }
        this.lastName = null;
        this.lastState = null;
        this.keyValueStatesByName.clear();
    }

    @Override
    public void setCurrentKey(K newKey) {
        this.notifyKeySelected(newKey);
        this.keyContext.setCurrentKey(newKey);
        this.keyContext.setCurrentKeyGroupIndex(KeyGroupRangeAssignment.assignToKeyGroup(newKey, this.numberOfKeyGroups));
    }

    @Override
    public void setCurrentKeyAndKeyGroup(K newKey, int newKeyGroupIndex) {
        this.notifyKeySelected(newKey);
        this.keyContext.setCurrentKey(newKey);
        this.keyContext.setCurrentKeyGroupIndex(newKeyGroupIndex);
    }

    private void notifyKeySelected(K newKey) {
        for (int i = 0; i < this.keySelectionListeners.size(); ++i) {
            this.keySelectionListeners.get(i).keySelected(newKey);
        }
    }

    @Override
    public void registerKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener) {
        this.keySelectionListeners.add(listener);
    }

    @Override
    public boolean deregisterKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> listener) {
        return this.keySelectionListeners.remove(listener);
    }

    @Override
    public TypeSerializer<K> getKeySerializer() {
        return this.keySerializer;
    }

    @Override
    public K getCurrentKey() {
        return this.keyContext.getCurrentKey();
    }

    @Override
    public int getCurrentKeyGroupIndex() {
        return this.keyContext.getCurrentKeyGroupIndex();
    }

    @Override
    public int getNumberOfKeyGroups() {
        return this.numberOfKeyGroups;
    }

    @Override
    public KeyGroupRange getKeyGroupRange() {
        return this.keyGroupRange;
    }

    @Override
    public <N, S extends State, T> void applyToAllKeys(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor, KeyedStateFunction<K, S> function) throws Exception {
        this.applyToAllKeys(namespace, namespaceSerializer, stateDescriptor, function, this::getPartitionedState);
    }

    public <N, S extends State, T> void applyToAllKeys(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor, KeyedStateFunction<K, S> function, PartitionStateFactory partitionStateFactory) throws Exception {
        try (Stream keyStream = this.getKeys(stateDescriptor.getName(), namespace);){
            Object state = partitionStateFactory.get(namespace, namespaceSerializer, stateDescriptor);
            keyStream.forEach(key -> {
                this.setCurrentKey(key);
                try {
                    function.process(key, state);
                }
                catch (Throwable e) {
                    throw new RuntimeException(e);
                }
            });
        }
    }

    @Override
    public <N, S extends State, V> S getOrCreateKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(namespaceSerializer, "Namespace serializer");
        Preconditions.checkNotNull(this.keySerializer, "State key serializer has not been configured in the config. This operation cannot use partitioned state.");
        InternalKvState<K, Object, ?> kvState = this.keyValueStatesByName.get(stateDescriptor.getName());
        if (kvState == null) {
            if (!stateDescriptor.isSerializerInitialized()) {
                stateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            }
            kvState = LatencyTrackingStateFactory.createStateAndWrapWithLatencyTrackingIfEnabled((InternalKvState)TtlStateFactory.createStateAndWrapWithTtlIfEnabled(namespaceSerializer, stateDescriptor, this, this.ttlTimeProvider), stateDescriptor, this.latencyTrackingStateConfig);
            this.keyValueStatesByName.put(stateDescriptor.getName(), kvState);
            this.publishQueryableStateIfEnabled(stateDescriptor, kvState);
        }
        return (S)kvState;
    }

    public void publishQueryableStateIfEnabled(StateDescriptor<?, ?> stateDescriptor, InternalKvState<?, ?, ?> kvState) {
        if (stateDescriptor.isQueryable()) {
            if (this.kvStateRegistry == null) {
                throw new IllegalStateException("State backend has not been initialized for job.");
            }
            String name = stateDescriptor.getQueryableStateName();
            this.kvStateRegistry.registerKvState(this.keyGroupRange, name, kvState, this.userCodeClassLoader);
        }
    }

    @Override
    public <N, S extends State> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(namespace, "Namespace");
        if (this.lastName != null && this.lastName.equals(stateDescriptor.getName())) {
            this.lastState.setCurrentNamespace(namespace);
            return (S)this.lastState;
        }
        InternalKvState<K, ?, ?> previous = this.keyValueStatesByName.get(stateDescriptor.getName());
        if (previous != null) {
            this.lastState = previous;
            this.lastState.setCurrentNamespace(namespace);
            this.lastName = stateDescriptor.getName();
            return (S)previous;
        }
        S state = this.getOrCreateKeyedState(namespaceSerializer, (StateDescriptor)stateDescriptor);
        InternalKvState kvState = (InternalKvState)state;
        this.lastName = stateDescriptor.getName();
        this.lastState = kvState;
        kvState.setCurrentNamespace(namespace);
        return state;
    }

    @Override
    public void close() throws IOException {
        this.cancelStreamRegistry.close();
    }

    public LatencyTrackingStateConfig getLatencyTrackingStateConfig() {
        return this.latencyTrackingStateConfig;
    }

    @VisibleForTesting
    public StreamCompressionDecorator getKeyGroupCompressionDecorator() {
        return this.keyGroupCompressionDecorator;
    }

    @VisibleForTesting
    public int numKeyValueStatesByName() {
        return this.keyValueStatesByName.size();
    }

    public boolean requiresLegacySynchronousTimerSnapshots(SnapshotType checkpointType) {
        return false;
    }

    public InternalKeyContext<K> getKeyContext() {
        return this.keyContext;
    }

    @Override
    public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) {
        this.keyContext.setCurrentKeyGroupIndex(currentKeyGroupIndex);
    }

    public static interface PartitionStateFactory {
        public <N, S extends State> S get(N var1, TypeSerializer<N> var2, StateDescriptor<S, ?> var3) throws Exception;
    }
}

