/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.v2.adaptor;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.InternalCheckpointListener;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.asyncprocessing.AsyncRequestContainer;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequest;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.state.v2.StateDescriptorUtils;
import org.apache.flink.runtime.state.v2.adaptor.AggregatingStateAdaptor;
import org.apache.flink.runtime.state.v2.adaptor.ListStateAdaptor;
import org.apache.flink.runtime.state.v2.adaptor.MapStateAdaptor;
import org.apache.flink.runtime.state.v2.adaptor.ReducingStateAdaptor;
import org.apache.flink.runtime.state.v2.adaptor.ValueStateAdaptor;
import org.apache.flink.runtime.state.v2.internal.InternalKeyedState;

public class AsyncKeyedStateBackendAdaptor<K>
implements AsyncKeyedStateBackend<K> {
    private final CheckpointableKeyedStateBackend<K> keyedStateBackend;

    public AsyncKeyedStateBackendAdaptor(CheckpointableKeyedStateBackend<K> keyedStateBackend) {
        this.keyedStateBackend = keyedStateBackend;
    }

    @Override
    public void setup(@Nonnull StateRequestHandler stateRequestHandler) {
    }

    @Override
    public <N, S extends State, SV> S getOrCreateKeyedState(N defaultNamespace, TypeSerializer<N> namespaceSerializer, org.apache.flink.api.common.state.v2.StateDescriptor<SV> stateDesc) throws Exception {
        return this.createStateInternal(defaultNamespace, namespaceSerializer, stateDesc);
    }

    @Override
    @Nonnull
    public <N, S extends InternalKeyedState, SV> S createStateInternal(@Nonnull N defaultNamespace, @Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull org.apache.flink.api.common.state.v2.StateDescriptor<SV> stateDesc) throws Exception {
        StateDescriptor rawStateDesc = StateDescriptorUtils.transformFromV2ToV1(stateDesc);
        Object rawState = this.keyedStateBackend.getPartitionedState(defaultNamespace, namespaceSerializer, rawStateDesc);
        switch (rawStateDesc.getType()) {
            case VALUE: {
                return (S)new ValueStateAdaptor((InternalValueState)rawState);
            }
            case LIST: {
                return (S)new ListStateAdaptor((InternalListState)rawState);
            }
            case REDUCING: {
                return (S)new ReducingStateAdaptor((InternalReducingState)rawState);
            }
            case AGGREGATING: {
                return (S)new AggregatingStateAdaptor((InternalAggregatingState)rawState);
            }
            case MAP: {
                return (S)new MapStateAdaptor((InternalMapState)rawState);
            }
        }
        throw new UnsupportedOperationException(String.format("Unsupported state type: %s", rawStateDesc.getType()));
    }

    @Override
    @Nonnull
    public StateExecutor createStateExecutor() {
        return new InvalidStateExecutor();
    }

    @Override
    public KeyGroupRange getKeyGroupRange() {
        return this.keyedStateBackend.getKeyGroupRange();
    }

    @Override
    public void switchContext(@Nullable RecordContext<K> context) {
        if (context != null) {
            this.keyedStateBackend.setCurrentKeyAndKeyGroup(context.getKey(), context.getKeyGroup());
        }
    }

    @Override
    public void dispose() {
    }

    @Override
    public String getBackendTypeIdentifier() {
        return this.keyedStateBackend.getBackendTypeIdentifier();
    }

    @Override
    public void close() throws IOException {
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        if (this.keyedStateBackend instanceof CheckpointListener) {
            ((CheckpointListener)this.keyedStateBackend).notifyCheckpointComplete(checkpointId);
        }
    }

    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        if (this.keyedStateBackend instanceof CheckpointListener) {
            ((CheckpointListener)this.keyedStateBackend).notifyCheckpointAborted(checkpointId);
        }
    }

    public void notifyCheckpointSubsumed(long checkpointId) throws Exception {
        if (this.keyedStateBackend instanceof InternalCheckpointListener) {
            ((InternalCheckpointListener)this.keyedStateBackend).notifyCheckpointSubsumed(checkpointId);
        }
    }

    @Override
    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
        return this.keyedStateBackend.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
    }

    @Override
    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
        return this.keyedStateBackend.create(stateName, byteOrderedElementSerializer);
    }

    @Override
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer, boolean allowFutureMetadataUpdates) {
        return this.keyedStateBackend.create(stateName, byteOrderedElementSerializer, allowFutureMetadataUpdates);
    }

    @Override
    public boolean requiresLegacySynchronousTimerSnapshots(SnapshotType checkpointType) {
        if (this.keyedStateBackend instanceof AbstractKeyedStateBackend) {
            return ((AbstractKeyedStateBackend)this.keyedStateBackend).requiresLegacySynchronousTimerSnapshots(checkpointType);
        }
        return false;
    }

    @Override
    public boolean isSafeToReuseKVState() {
        return this.keyedStateBackend.isSafeToReuseKVState();
    }

    public CheckpointableKeyedStateBackend<K> getKeyedStateBackend() {
        return this.keyedStateBackend;
    }

    private static class InvalidStateExecutor
    implements StateExecutor {
        private InvalidStateExecutor() {
        }

        @Override
        public CompletableFuture<Void> executeBatchRequests(AsyncRequestContainer<StateRequest<?, ?, ?, ?>> asyncRequestContainer) {
            return null;
        }

        @Override
        public AsyncRequestContainer<StateRequest<?, ?, ?, ?>> createRequestContainer() {
            return null;
        }

        @Override
        public void executeRequestSync(StateRequest<?, ?, ?, ?> asyncRequest) {
        }

        @Override
        public boolean fullyLoaded() {
            return false;
        }

        @Override
        public void shutdown() {
        }
    }
}

