/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.v2;

import java.io.IOException;
import java.util.Deque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.state.v2.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutor;
import org.apache.flink.runtime.asyncprocessing.AsyncRequestContainer;
import org.apache.flink.runtime.asyncprocessing.EpochManager;
import org.apache.flink.runtime.asyncprocessing.MockAsyncRequestContainer;
import org.apache.flink.runtime.asyncprocessing.StateExecutionController;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequest;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.v2.internal.InternalKeyedState;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

public class AbstractKeyedStateTestBase {
    StateExecutionController aec;
    TestStateExecutor testStateExecutor;
    AtomicReference<Throwable> exception;

    @BeforeEach
    void setup() {
        this.testStateExecutor = (TestStateExecutor)this.createStateExecutor();
        this.aec = new StateExecutionController((MailboxExecutor)new SyncMailboxExecutor(), (a, b) -> this.exception.set(b), (AsyncExecutor)this.testStateExecutor, new DeclarationManager(), EpochManager.ParallelMode.SERIAL_BETWEEN_EPOCH, 1, 1, 1000L, 1, null, null);
        this.exception = new AtomicReference<Object>(null);
    }

    @AfterEach
    void after() {
        AssertionsForClassTypes.assertThat((Throwable)this.exception.get()).isNull();
    }

    private StateExecutor createStateExecutor() {
        TestAsyncStateBackend testAsyncStateBackend = new TestAsyncStateBackend();
        AssertionsForClassTypes.assertThat((boolean)testAsyncStateBackend.supportsAsyncKeyedStateBackend()).isTrue();
        return testAsyncStateBackend.createAsyncKeyedStateBackend(null).createStateExecutor();
    }

    <IN> void validateRequestRun(@Nullable State state, StateRequestType type, @Nullable IN payload, int remainingRequests) {
        this.aec.triggerIfNeeded(true);
        this.testStateExecutor.validate(state, type, payload);
        AssertionsForClassTypes.assertThat((int)this.testStateExecutor.receivedRequest.size()).isEqualTo(remainingRequests);
    }

    static class TestStateExecutor
    implements StateExecutor {
        private Deque<StateRequest<?, ?, ?, ?>> receivedRequest = new ConcurrentLinkedDeque();

        TestStateExecutor() {
        }

        <IN> void validate(@Nullable State state, StateRequestType type, @Nullable IN payload) {
            AssertionsForClassTypes.assertThat((boolean)this.receivedRequest.isEmpty()).isFalse();
            StateRequest<?, ?, ?, ?> request = this.receivedRequest.pop();
            AssertionsForClassTypes.assertThat((Object)request.getState()).isEqualTo((Object)state);
            AssertionsForClassTypes.assertThat((Object)request.getRequestType()).isEqualTo((Object)type);
            AssertionsForClassTypes.assertThat((Object)request.getPayload()).isEqualTo(payload);
        }

        public CompletableFuture<Void> executeBatchRequests(AsyncRequestContainer<StateRequest<?, ?, ?, ?>> asyncRequestContainer) {
            for (StateRequest request : ((MockAsyncRequestContainer)asyncRequestContainer).getStateRequestList()) {
                this.executeRequestSync(request);
            }
            CompletableFuture<Void> future = new CompletableFuture<Void>();
            future.complete(null);
            return future;
        }

        public AsyncRequestContainer<StateRequest<?, ?, ?, ?>> createRequestContainer() {
            return new MockAsyncRequestContainer();
        }

        public void executeRequestSync(StateRequest<?, ?, ?, ?> request) {
            this.receivedRequest.add(request);
            if (request.getRequestType() == StateRequestType.MAP_CONTAINS || request.getRequestType() == StateRequestType.MAP_IS_EMPTY) {
                request.getFuture().complete((Object)true);
            } else {
                request.getFuture().complete(null);
            }
        }

        public boolean fullyLoaded() {
            return false;
        }

        public void shutdown() {
        }
    }

    static class TestAsyncStateBackend
    implements StateBackend {
        TestAsyncStateBackend() {
        }

        public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> parameters) throws Exception {
            throw new UnsupportedOperationException("Don't support createKeyedStateBackend yet");
        }

        public OperatorStateBackend createOperatorStateBackend(StateBackend.OperatorStateBackendParameters parameters) throws Exception {
            throw new UnsupportedOperationException("Don't support createOperatorStateBackend yet");
        }

        public boolean supportsAsyncKeyedStateBackend() {
            return true;
        }

        public <K> AsyncKeyedStateBackend<K> createAsyncKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> parameters) {
            return new AsyncKeyedStateBackend<K>(){

                @Nonnull
                public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
                    throw new UnsupportedOperationException("Not support for test yet.");
                }

                @Nonnull
                public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
                    throw new UnsupportedOperationException("Not support yet");
                }

                public void notifyCheckpointComplete(long checkpointId) throws Exception {
                    throw new UnsupportedOperationException("Not support yet");
                }

                public void notifyCheckpointSubsumed(long checkpointId) throws Exception {
                    throw new UnsupportedOperationException("Not support yet");
                }

                public void close() throws IOException {
                }

                public void setup(@Nonnull StateRequestHandler stateRequestHandler) {
                }

                public <N, S extends State, SV> S getOrCreateKeyedState(N defaultNamespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<SV> stateDesc) throws Exception {
                    return null;
                }

                @Nonnull
                public <N, S extends InternalKeyedState, SV> S createStateInternal(@Nonnull N defaultNamespace, @Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<SV> stateDesc) throws Exception {
                    return null;
                }

                public StateExecutor createStateExecutor() {
                    return new TestStateExecutor();
                }

                public KeyGroupRange getKeyGroupRange() {
                    return new KeyGroupRange(0, 127);
                }

                public void dispose() {
                }
            };
        }
    }
}

