package org.apache.flink.runtime.asyncprocessing;

import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.v2.InternalValueState;
import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.class */
class AsyncExecutionControllerTest {
    AsyncExecutionController aec;
    TestUnderlyingState underlyingState;
    AtomicInteger output;
    TestValueState valueState;
    final Runnable userCode = () -> {
        this.valueState.asyncValue().thenCompose(num -> {
            int intValue = num == null ? 1 : num.intValue() + 1;
            return this.valueState.asyncUpdate(Integer.valueOf(intValue)).thenCompose(r3 -> {
                return StateFutureUtils.completedFuture(Integer.valueOf(intValue));
            });
        }).thenAccept(num2 -> {
            this.output.set(num2.intValue());
        });
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest$TestAsyncStateBackend.class */
    public static class TestAsyncStateBackend implements StateBackend {
        TestAsyncStateBackend() {
        }

        public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> keyedStateBackendParameters) throws Exception {
            throw new UnsupportedOperationException("Don't support createKeyedStateBackend yet");
        }

        public OperatorStateBackend createOperatorStateBackend(StateBackend.OperatorStateBackendParameters operatorStateBackendParameters) throws Exception {
            throw new UnsupportedOperationException("Don't support createOperatorStateBackend yet");
        }

        public boolean supportsAsyncKeyedStateBackend() {
            return true;
        }

        public <K> AsyncKeyedStateBackend createAsyncKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> keyedStateBackendParameters) {
            return new AsyncKeyedStateBackend() { // from class: org.apache.flink.runtime.asyncprocessing.AsyncExecutionControllerTest.TestAsyncStateBackend.1
                public StateExecutor createStateExecutor() {
                    return new TestStateExecutor();
                }

                public void dispose() {
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest$TestStateExecutor.class */
    static class TestStateExecutor implements StateExecutor {
        public CompletableFuture<Boolean> executeBatchRequests(Iterable<StateRequest<?, ?, ?>> iterable) {
            CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
            for (StateRequest<?, ?, ?> stateRequest : iterable) {
                if (stateRequest.getRequestType() == StateRequestType.VALUE_GET) {
                    Preconditions.checkState(stateRequest.getState() != null);
                    stateRequest.getFuture().complete(stateRequest.getState().underlyingState.get((String) stateRequest.getRecordContext().getKey()));
                } else {
                    if (stateRequest.getRequestType() != StateRequestType.VALUE_UPDATE) {
                        throw new UnsupportedOperationException("Unsupported request type");
                    }
                    Preconditions.checkState(stateRequest.getState() != null);
                    stateRequest.getState().underlyingState.update((String) stateRequest.getRecordContext().getKey(), (Integer) stateRequest.getPayload());
                    stateRequest.getFuture().complete((Object) null);
                }
            }
            completableFuture.complete(true);
            return completableFuture;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest$TestUnderlyingState.class */
    static class TestUnderlyingState {
        private final HashMap<String, Integer> hashMap = new HashMap<>();

        public Integer get(String str) {
            return this.hashMap.get(str);
        }

        public void update(String str, Integer num) {
            this.hashMap.put(str, num);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest$TestValueState.class */
    static class TestValueState extends InternalValueState<String, Integer> {
        private final TestUnderlyingState underlyingState;

        public TestValueState(AsyncExecutionController<String> asyncExecutionController, TestUnderlyingState testUnderlyingState) {
            super(asyncExecutionController, new ValueStateDescriptor("test-value-state", BasicTypeInfo.INT_TYPE_INFO));
            this.underlyingState = testUnderlyingState;
            AssertionsForClassTypes.assertThat(getValueSerializer()).isEqualTo(IntSerializer.INSTANCE);
        }
    }

    AsyncExecutionControllerTest() {
    }

    @BeforeEach
    void setup() {
        this.aec = new AsyncExecutionController(new SyncMailboxExecutor(), createStateExecutor());
        this.underlyingState = new TestUnderlyingState();
        this.valueState = new TestValueState(this.aec, this.underlyingState);
        this.output = new AtomicInteger();
    }

    @Test
    void testBasicRun() {
        RecordContext buildContext = this.aec.buildContext("key1-r1", "key1");
        this.aec.setCurrentContext(buildContext);
        this.userCode.run();
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.output.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(buildContext.getReferenceCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(0);
        RecordContext buildContext2 = this.aec.buildContext("key1-r2", "key1");
        this.aec.setCurrentContext(buildContext2);
        this.userCode.run();
        RecordContext buildContext3 = this.aec.buildContext("key1-r3", "key1");
        this.aec.setCurrentContext(buildContext3);
        this.userCode.run();
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(2);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(2);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.output.get()).isEqualTo(2);
        AssertionsForClassTypes.assertThat(buildContext2.getReferenceCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.output.get()).isEqualTo(3);
        AssertionsForClassTypes.assertThat(buildContext3.getReferenceCount()).isEqualTo(0);
        RecordContext buildContext4 = this.aec.buildContext("key3-r3", "key3");
        this.aec.setCurrentContext(buildContext4);
        this.userCode.run();
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.output.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(buildContext4.getReferenceCount()).isEqualTo(0);
    }

    @Test
    void testRecordsRunInOrder() {
        RecordContext buildContext = this.aec.buildContext("key1-r1", "key1");
        this.aec.setCurrentContext(buildContext);
        this.userCode.run();
        RecordContext buildContext2 = this.aec.buildContext("key2-r1", "key2");
        this.aec.setCurrentContext(buildContext2);
        this.userCode.run();
        RecordContext buildContext3 = this.aec.buildContext("key1-r2", "key1");
        this.aec.setCurrentContext(buildContext3);
        this.userCode.run();
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(2);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(2);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(3);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(2);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(2);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(3);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.output.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(buildContext.getReferenceCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(buildContext2.getReferenceCount()).isEqualTo(0);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(this.output.get()).isEqualTo(2);
        AssertionsForClassTypes.assertThat(buildContext3.getReferenceCount()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(0);
    }

    @Test
    void testInFlightRecordControl() {
        this.aec = new AsyncExecutionController(new SyncMailboxExecutor(), new TestStateExecutor(), 5, 10);
        this.valueState = new TestValueState(this.aec, this.underlyingState);
        AtomicInteger atomicInteger = new AtomicInteger();
        Runnable runnable = () -> {
            this.valueState.asyncValue().thenCompose(num -> {
                int intValue = num == null ? 1 : num.intValue() + 1;
                return this.valueState.asyncUpdate(Integer.valueOf(intValue)).thenCompose(r3 -> {
                    return StateFutureUtils.completedFuture(Integer.valueOf(intValue));
                });
            }).thenAccept(num2 -> {
                atomicInteger.set(num2.intValue());
            });
        };
        for (int i = 0; i < 10; i++) {
            for (int i2 = 0; i2 < 5; i2++) {
                this.aec.setCurrentContext(this.aec.buildContext(String.format("key%d-r%d", Integer.valueOf((i * 5) + i2), Integer.valueOf((i * 5) + i2)), String.format("key%d", Integer.valueOf((i * 5) + i2))));
                runnable.run();
            }
            AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(0);
            AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
            AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        }
        for (int i3 = 0; i3 < 10; i3++) {
            this.aec.setCurrentContext(this.aec.buildContext(String.format("sameKey-r%d", Integer.valueOf(i3), Integer.valueOf(i3)), "sameKey"));
            runnable.run();
        }
        AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(10);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(9);
        for (int i4 = 10; i4 < 100; i4++) {
            this.aec.setCurrentContext(this.aec.buildContext(String.format("sameKey-r%d", Integer.valueOf(i4), Integer.valueOf(i4)), "sameKey"));
            runnable.run();
            AssertionsForClassTypes.assertThat(this.aec.inFlightRecordNum.get()).isEqualTo(11);
            AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
            AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(10);
        }
    }

    @Test
    public void testSyncPoint() {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        RecordContext buildContext = this.aec.buildContext("record", "key");
        this.aec.setCurrentContext(buildContext);
        buildContext.retain();
        AsyncExecutionController asyncExecutionController = this.aec;
        atomicInteger.getClass();
        asyncExecutionController.syncPointRequestWithCallback(atomicInteger::incrementAndGet);
        AssertionsForClassTypes.assertThat(atomicInteger.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(buildContext.getReferenceCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
        buildContext.release();
        AssertionsForClassTypes.assertThat(this.aec.keyAccountingUnit.occupiedCount()).isEqualTo(0);
        atomicInteger.set(0);
        this.aec.setCurrentContext(this.aec.buildContext("record1", "occupied"));
        this.userCode.run();
        RecordContext buildContext2 = this.aec.buildContext("record2", "occupied");
        this.aec.setCurrentContext(buildContext2);
        AsyncExecutionController asyncExecutionController2 = this.aec;
        atomicInteger.getClass();
        asyncExecutionController2.syncPointRequestWithCallback(atomicInteger::incrementAndGet);
        buildContext2.retain();
        AssertionsForClassTypes.assertThat(atomicInteger.get()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(buildContext2.getReferenceCount()).isGreaterThan(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(atomicInteger.get()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(buildContext2.getReferenceCount()).isGreaterThan(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
        this.aec.triggerIfNeeded(true);
        AssertionsForClassTypes.assertThat(atomicInteger.get()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(buildContext2.getReferenceCount()).isEqualTo(1);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0);
        AssertionsForClassTypes.assertThat(this.aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
        buildContext2.release();
    }

    private StateExecutor createStateExecutor() {
        TestAsyncStateBackend testAsyncStateBackend = new TestAsyncStateBackend();
        AssertionsForClassTypes.assertThat(testAsyncStateBackend.supportsAsyncKeyedStateBackend()).isTrue();
        return testAsyncStateBackend.createAsyncKeyedStateBackend(null).createStateExecutor();
    }
}
