/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnull;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.state.v2.ListStateDescriptor;
import org.apache.flink.api.common.state.v2.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.BackendWritableBroadcastState;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.HeapBroadcastState;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.PartitionableListState;
import org.apache.flink.runtime.state.RegisteredBroadcastStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategyRunner;
import org.apache.flink.runtime.state.v2.StateDescriptorUtils;
import org.apache.flink.runtime.state.v2.adaptor.OperatorListStateAdaptor;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class DefaultOperatorStateBackend
implements OperatorStateBackend {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultOperatorStateBackend.class);
    public static final String DEFAULT_OPERATOR_STATE_NAME = "_default_";
    private final Map<String, PartitionableListState<?>> registeredOperatorStates;
    private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;
    private final CloseableRegistry closeStreamOnCancelRegistry;
    private final JavaSerializer<Serializable> deprecatedDefaultJavaSerializer = new JavaSerializer();
    private final ExecutionConfig executionConfig;
    private final Map<String, PartitionableListState<?>> accessedStatesByName;
    private final Map<String, BackendWritableBroadcastState<?, ?>> accessedBroadcastStatesByName;
    private final SnapshotStrategyRunner<OperatorStateHandle, ?> snapshotStrategyRunner;

    public DefaultOperatorStateBackend(ExecutionConfig executionConfig, CloseableRegistry closeStreamOnCancelRegistry, Map<String, PartitionableListState<?>> registeredOperatorStates, Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates, Map<String, PartitionableListState<?>> accessedStatesByName, Map<String, BackendWritableBroadcastState<?, ?>> accessedBroadcastStatesByName, SnapshotStrategyRunner<OperatorStateHandle, ?> snapshotStrategyRunner) {
        this.closeStreamOnCancelRegistry = closeStreamOnCancelRegistry;
        this.executionConfig = executionConfig;
        this.registeredOperatorStates = registeredOperatorStates;
        this.registeredBroadcastStates = registeredBroadcastStates;
        this.accessedStatesByName = accessedStatesByName;
        this.accessedBroadcastStatesByName = accessedBroadcastStatesByName;
        this.snapshotStrategyRunner = snapshotStrategyRunner;
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    @Override
    public Set<String> getRegisteredStateNames() {
        return this.registeredOperatorStates.keySet();
    }

    @Override
    public Set<String> getRegisteredBroadcastStateNames() {
        return this.registeredBroadcastStates.keySet();
    }

    @Override
    public void close() throws IOException {
        this.closeStreamOnCancelRegistry.close();
    }

    @Override
    public void dispose() {
        IOUtils.closeQuietly((Closeable)this.closeStreamOnCancelRegistry);
        this.registeredOperatorStates.clear();
        this.registeredBroadcastStates.clear();
    }

    @Override
    public <K, V> BroadcastState<K, V> getBroadcastState(org.apache.flink.api.common.state.MapStateDescriptor<K, V> stateDescriptor) throws StateMigrationException {
        Preconditions.checkNotNull(stateDescriptor);
        String name = Preconditions.checkNotNull(stateDescriptor.getName());
        BackendWritableBroadcastState<?, ?> previous = this.accessedBroadcastStatesByName.get(name);
        if (previous != null) {
            DefaultOperatorStateBackend.checkStateNameAndMode(previous.getStateMetaInfo().getName(), name, previous.getStateMetaInfo().getAssignmentMode(), OperatorStateHandle.Mode.BROADCAST);
            return previous;
        }
        stateDescriptor.initializeSerializerUnlessSet(this.getExecutionConfig());
        TypeSerializer<K> broadcastStateKeySerializer = Preconditions.checkNotNull(stateDescriptor.getKeySerializer());
        TypeSerializer<V> broadcastStateValueSerializer = Preconditions.checkNotNull(stateDescriptor.getValueSerializer());
        BackendWritableBroadcastState<?, ?> broadcastState = this.registeredBroadcastStates.get(name);
        if (broadcastState == null) {
            broadcastState = new HeapBroadcastState(new RegisteredBroadcastStateBackendMetaInfo<K, V>(name, OperatorStateHandle.Mode.BROADCAST, broadcastStateKeySerializer, broadcastStateValueSerializer));
            this.registeredBroadcastStates.put(name, broadcastState);
        } else {
            DefaultOperatorStateBackend.checkStateNameAndMode(broadcastState.getStateMetaInfo().getName(), name, broadcastState.getStateMetaInfo().getAssignmentMode(), OperatorStateHandle.Mode.BROADCAST);
            RegisteredBroadcastStateBackendMetaInfo<?, ?> restoredBroadcastStateMetaInfo = broadcastState.getStateMetaInfo();
            TypeSerializerSchemaCompatibility<?> keyCompatibility = restoredBroadcastStateMetaInfo.updateKeySerializer(broadcastStateKeySerializer);
            if (keyCompatibility.isIncompatible()) {
                throw new StateMigrationException("The new key typeSerializer for broadcast state must not be incompatible.");
            }
            TypeSerializerSchemaCompatibility<?> valueCompatibility = restoredBroadcastStateMetaInfo.updateValueSerializer(broadcastStateValueSerializer);
            if (valueCompatibility.isIncompatible()) {
                throw new StateMigrationException("The new value typeSerializer for broadcast state must not be incompatible.");
            }
            broadcastState.setStateMetaInfo(restoredBroadcastStateMetaInfo);
        }
        this.accessedBroadcastStatesByName.put(name, broadcastState);
        return broadcastState;
    }

    @Override
    public <S> org.apache.flink.api.common.state.ListState<S> getListState(org.apache.flink.api.common.state.ListStateDescriptor<S> stateDescriptor) throws Exception {
        return this.getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
    }

    @Override
    public <S> org.apache.flink.api.common.state.ListState<S> getUnionListState(org.apache.flink.api.common.state.ListStateDescriptor<S> stateDescriptor) throws Exception {
        return this.getListState(stateDescriptor, OperatorStateHandle.Mode.UNION);
    }

    @Override
    public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception {
        return this.getBroadcastState(StateDescriptorUtils.transformFromV2ToV1(stateDescriptor));
    }

    @Override
    public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
        return new OperatorListStateAdaptor<S>(this.getListState(StateDescriptorUtils.transformFromV2ToV1(stateDescriptor)));
    }

    @Override
    public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
        return new OperatorListStateAdaptor<S>(this.getListState(StateDescriptorUtils.transformFromV2ToV1(stateDescriptor)));
    }

    @Override
    @Nonnull
    public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
        return this.snapshotStrategyRunner.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
    }

    private <S> org.apache.flink.api.common.state.ListState<S> getListState(org.apache.flink.api.common.state.ListStateDescriptor<S> stateDescriptor, OperatorStateHandle.Mode mode) throws StateMigrationException {
        Preconditions.checkNotNull(stateDescriptor);
        String name = Preconditions.checkNotNull(stateDescriptor.getName());
        PartitionableListState<?> previous = this.accessedStatesByName.get(name);
        if (previous != null) {
            DefaultOperatorStateBackend.checkStateNameAndMode(previous.getStateMetaInfo().getName(), name, previous.getStateMetaInfo().getAssignmentMode(), mode);
            return previous;
        }
        stateDescriptor.initializeSerializerUnlessSet(this.getExecutionConfig());
        TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getElementSerializer());
        PartitionableListState<Object> partitionableListState = this.registeredOperatorStates.get(name);
        if (null == partitionableListState) {
            partitionableListState = new PartitionableListState<S>(new RegisteredOperatorStateBackendMetaInfo<S>(name, partitionStateSerializer, mode));
            this.registeredOperatorStates.put(name, partitionableListState);
        } else {
            DefaultOperatorStateBackend.checkStateNameAndMode(partitionableListState.getStateMetaInfo().getName(), name, partitionableListState.getStateMetaInfo().getAssignmentMode(), mode);
            RegisteredOperatorStateBackendMetaInfo<Object> restoredPartitionableListStateMetaInfo = partitionableListState.getStateMetaInfo();
            TypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate();
            TypeSerializerSchemaCompatibility<Object> stateCompatibility = restoredPartitionableListStateMetaInfo.updatePartitionStateSerializer(newPartitionStateSerializer);
            if (stateCompatibility.isIncompatible()) {
                throw new StateMigrationException("The new state typeSerializer for operator state must not be incompatible.");
            }
            partitionableListState.setStateMetaInfo(restoredPartitionableListStateMetaInfo);
        }
        this.accessedStatesByName.put(name, partitionableListState);
        return partitionableListState;
    }

    private static void checkStateNameAndMode(String actualName, String expectedName, OperatorStateHandle.Mode actualMode, OperatorStateHandle.Mode expectedMode) {
        Preconditions.checkState(actualName.equals(expectedName), "Incompatible state names. Was [" + actualName + "], registered with [" + expectedName + "].");
        Preconditions.checkState(actualMode.equals((Object)expectedMode), "Incompatible state assignment modes. Was [" + String.valueOf((Object)actualMode) + "], registered with [" + String.valueOf((Object)expectedMode) + "].");
    }
}

