/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.v2;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutor;
import org.apache.flink.runtime.asyncprocessing.AsyncRequestContainer;
import org.apache.flink.runtime.asyncprocessing.EpochManager;
import org.apache.flink.runtime.asyncprocessing.MockAsyncRequestContainer;
import org.apache.flink.runtime.asyncprocessing.StateExecutionController;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequest;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.state.v2.AbstractKeyedStateTestBase;
import org.apache.flink.runtime.state.v2.AbstractReducingState;
import org.apache.flink.runtime.state.v2.internal.InternalPartitionedState;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.Test;

public class AbstractReducingStateTest
extends AbstractKeyedStateTestBase {
    @Test
    public void testEachOperation() {
        ReduceFunction & Serializable reducer = Integer::sum;
        ReducingStateDescriptor descriptor = new ReducingStateDescriptor("testState", (ReduceFunction)reducer, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        descriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        AbstractReducingState reducingState = new AbstractReducingState((StateRequestHandler)this.aec, descriptor.getReduceFunction(), descriptor.getSerializer());
        this.aec.setCurrentContext(this.aec.buildContext((Object)"test", (Object)"test"));
        reducingState.asyncClear();
        this.validateRequestRun((State)reducingState, StateRequestType.CLEAR, null, 0);
        reducingState.asyncGet();
        this.validateRequestRun((State)reducingState, StateRequestType.REDUCING_GET, null, 0);
        reducingState.asyncAdd((Object)1);
        this.validateRequestRun((State)reducingState, StateRequestType.REDUCING_GET, null, 1);
        this.validateRequestRun((State)reducingState, StateRequestType.REDUCING_ADD, 1, 0);
        reducingState.clear();
        this.validateRequestRun((State)reducingState, StateRequestType.CLEAR, null, 0);
        reducingState.get();
        this.validateRequestRun((State)reducingState, StateRequestType.REDUCING_GET, null, 0);
        reducingState.add((Object)1);
        this.validateRequestRun((State)reducingState, StateRequestType.REDUCING_GET, null, 1);
        this.validateRequestRun((State)reducingState, StateRequestType.REDUCING_ADD, 1, 0);
    }

    @Test
    public void testMergeNamespace() throws Exception {
        ReduceFunction & Serializable reducer = Integer::sum;
        ReducingStateDescriptor descriptor = new ReducingStateDescriptor("testState", (ReduceFunction)reducer, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        descriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        StateExecutionController aec = new StateExecutionController((MailboxExecutor)new SyncMailboxExecutor(), (a, b) -> {}, (AsyncExecutor)new ReducingStateExecutor(), new DeclarationManager(), EpochManager.ParallelMode.SERIAL_BETWEEN_EPOCH, 1, 100, 10000L, 1, null, null);
        AbstractReducingState reducingState = new AbstractReducingState((StateRequestHandler)aec, descriptor.getReduceFunction(), descriptor.getSerializer());
        aec.setCurrentContext(aec.buildContext((Object)"test", (Object)"test"));
        aec.setCurrentNamespaceForState((InternalPartitionedState)reducingState, (Object)"1");
        reducingState.asyncAdd((Object)1);
        aec.drainInflightRecords(0);
        AssertionsForClassTypes.assertThat((int)ReducingStateExecutor.hashMap.size()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((Integer)ReducingStateExecutor.hashMap.get(Tuple2.of((Object)"test", (Object)"1"))).isEqualTo(1);
        aec.setCurrentNamespaceForState((InternalPartitionedState)reducingState, (Object)"2");
        reducingState.asyncAdd((Object)2);
        aec.drainInflightRecords(0);
        AssertionsForClassTypes.assertThat((int)ReducingStateExecutor.hashMap.size()).isEqualTo(2);
        AssertionsForClassTypes.assertThat((Integer)ReducingStateExecutor.hashMap.get(Tuple2.of((Object)"test", (Object)"1"))).isEqualTo(1);
        AssertionsForClassTypes.assertThat((Integer)ReducingStateExecutor.hashMap.get(Tuple2.of((Object)"test", (Object)"2"))).isEqualTo(2);
        aec.setCurrentNamespaceForState((InternalPartitionedState)reducingState, (Object)"3");
        reducingState.asyncAdd((Object)3);
        aec.drainInflightRecords(0);
        AssertionsForClassTypes.assertThat((int)ReducingStateExecutor.hashMap.size()).isEqualTo(3);
        AssertionsForClassTypes.assertThat((Integer)ReducingStateExecutor.hashMap.get(Tuple2.of((Object)"test", (Object)"1"))).isEqualTo(1);
        AssertionsForClassTypes.assertThat((Integer)ReducingStateExecutor.hashMap.get(Tuple2.of((Object)"test", (Object)"2"))).isEqualTo(2);
        AssertionsForClassTypes.assertThat((Integer)ReducingStateExecutor.hashMap.get(Tuple2.of((Object)"test", (Object)"3"))).isEqualTo(3);
        ArrayList<String> sources = new ArrayList<String>(Arrays.asList("1", "2", "3"));
        reducingState.asyncMergeNamespaces((Object)"0", sources);
        aec.drainInflightRecords(0);
        AssertionsForClassTypes.assertThat((int)ReducingStateExecutor.hashMap.size()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((Integer)ReducingStateExecutor.hashMap.get(Tuple2.of((Object)"test", (Object)"0"))).isEqualTo(6);
        AssertionsForClassTypes.assertThat((Integer)ReducingStateExecutor.hashMap.get(Tuple2.of((Object)"test", (Object)"1"))).isNull();
        AssertionsForClassTypes.assertThat((Integer)ReducingStateExecutor.hashMap.get(Tuple2.of((Object)"test", (Object)"2"))).isNull();
        AssertionsForClassTypes.assertThat((Integer)ReducingStateExecutor.hashMap.get(Tuple2.of((Object)"test", (Object)"3"))).isNull();
        aec.setCurrentNamespaceForState((InternalPartitionedState)reducingState, (Object)"4");
        reducingState.asyncAdd((Object)4);
        aec.drainInflightRecords(0);
        AssertionsForClassTypes.assertThat((int)ReducingStateExecutor.hashMap.size()).isEqualTo(2);
        AssertionsForClassTypes.assertThat((Integer)ReducingStateExecutor.hashMap.get(Tuple2.of((Object)"test", (Object)"0"))).isEqualTo(6);
        AssertionsForClassTypes.assertThat((Integer)ReducingStateExecutor.hashMap.get(Tuple2.of((Object)"test", (Object)"4"))).isEqualTo(4);
        ArrayList<String> sources1 = new ArrayList<String>(Arrays.asList("4"));
        reducingState.asyncMergeNamespaces((Object)"0", sources1);
        aec.drainInflightRecords(0);
        AssertionsForClassTypes.assertThat((int)ReducingStateExecutor.hashMap.size()).isEqualTo(1);
        AssertionsForClassTypes.assertThat((Integer)ReducingStateExecutor.hashMap.get(Tuple2.of((Object)"test", (Object)"0"))).isEqualTo(10);
        AssertionsForClassTypes.assertThat((Integer)ReducingStateExecutor.hashMap.get(Tuple2.of((Object)"test", (Object)"1"))).isNull();
        AssertionsForClassTypes.assertThat((Integer)ReducingStateExecutor.hashMap.get(Tuple2.of((Object)"test", (Object)"2"))).isNull();
        AssertionsForClassTypes.assertThat((Integer)ReducingStateExecutor.hashMap.get(Tuple2.of((Object)"test", (Object)"3"))).isNull();
        AssertionsForClassTypes.assertThat((Integer)ReducingStateExecutor.hashMap.get(Tuple2.of((Object)"test", (Object)"4"))).isNull();
    }

    static class ReducingStateExecutor
    implements StateExecutor {
        private static final HashMap<Tuple2<String, String>, Integer> hashMap = new HashMap();

        public ReducingStateExecutor() {
            hashMap.clear();
        }

        public CompletableFuture<Void> executeBatchRequests(AsyncRequestContainer asyncRequestContainer) {
            Preconditions.checkArgument((boolean)(asyncRequestContainer instanceof MockAsyncRequestContainer));
            CompletableFuture<Void> future = new CompletableFuture<Void>();
            for (StateRequest request : ((MockAsyncRequestContainer)asyncRequestContainer).getStateRequestList()) {
                String namespace;
                String key;
                if (request.getRequestType() == StateRequestType.REDUCING_GET) {
                    key = (String)request.getRecordContext().getKey();
                    namespace = (String)request.getNamespace();
                    Integer val = hashMap.get(Tuple2.of((Object)key, (Object)namespace));
                    request.getFuture().complete((Object)val);
                    continue;
                }
                if (request.getRequestType() == StateRequestType.REDUCING_ADD) {
                    key = (String)request.getRecordContext().getKey();
                    namespace = (String)request.getNamespace();
                    if (request.getPayload() == null) {
                        hashMap.remove(Tuple2.of((Object)key, (Object)namespace));
                        request.getFuture().complete(null);
                        continue;
                    }
                    hashMap.put((Tuple2<String, String>)Tuple2.of((Object)key, (Object)namespace), (Integer)request.getPayload());
                    request.getFuture().complete(null);
                    continue;
                }
                throw new UnsupportedOperationException("Unsupported request type");
            }
            future.complete(null);
            return future;
        }

        public AsyncRequestContainer<StateRequest<?, ?, ?, ?>> createRequestContainer() {
            return new MockAsyncRequestContainer();
        }

        public void executeRequestSync(StateRequest<?, ?, ?, ?> stateRequest) {
            throw new UnsupportedOperationException("Unsupported synchronous execution");
        }

        public boolean fullyLoaded() {
            return false;
        }

        public void shutdown() {
        }
    }
}

