/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.v2;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.state.v2.ListStateDescriptor;
import org.apache.flink.api.common.state.v2.MapState;
import org.apache.flink.api.common.state.v2.MapStateDescriptor;
import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
import org.apache.flink.api.common.state.v2.StateDescriptor;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
import org.apache.flink.runtime.state.v2.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.v2.internal.InternalReducingState;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class AsyncKeyedStateBackendAdaptorTest {
    @Test
    public void testValueStateAdaptor() throws Exception {
        CheckpointableKeyedStateBackend<String> keyedStateBackend = AsyncKeyedStateBackendAdaptorTest.createBackend();
        AsyncKeyedStateBackendAdaptor adaptor = new AsyncKeyedStateBackendAdaptor(keyedStateBackend);
        keyedStateBackend.setCurrentKey((Object)"test");
        ValueStateDescriptor descriptor = new ValueStateDescriptor("testState", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        ValueState valueState = (ValueState)adaptor.getOrCreateKeyedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)descriptor);
        valueState.clear();
        Assertions.assertThat((Integer)((Integer)valueState.value())).isNull();
        valueState.update((Object)10);
        Assertions.assertThat((Integer)((Integer)valueState.value())).isEqualTo(10);
        valueState.asyncClear().thenCompose(clear -> {
            Assertions.assertThat((Integer)((Integer)valueState.value())).isNull();
            return valueState.asyncUpdate((Object)20);
        }).thenCompose(empty -> {
            Assertions.assertThat((Integer)((Integer)valueState.value())).isEqualTo(20);
            return valueState.asyncValue();
        }).thenAccept(value -> Assertions.assertThat((Integer)value).isEqualTo(20));
        adaptor.close();
    }

    @Test
    public void testListStateAdaptor() throws Exception {
        CheckpointableKeyedStateBackend<String> keyedStateBackend = AsyncKeyedStateBackendAdaptorTest.createBackend();
        AsyncKeyedStateBackendAdaptor adaptor = new AsyncKeyedStateBackendAdaptor(keyedStateBackend);
        keyedStateBackend.setCurrentKey((Object)"test");
        ListStateDescriptor descriptor = new ListStateDescriptor("testState", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        ListState listState = (ListState)adaptor.getOrCreateKeyedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)descriptor);
        listState.clear();
        Assertions.assertThat((Iterable)((Iterable)listState.get())).isNull();
        listState.add((Object)10);
        Assertions.assertThat((Iterable)((Iterable)listState.get())).containsExactlyInAnyOrderElementsOf(Arrays.asList(10));
        listState.addAll(Arrays.asList(20, 30));
        Assertions.assertThat((Iterable)((Iterable)listState.get())).containsExactlyInAnyOrderElementsOf(Arrays.asList(10, 20, 30));
        listState.update(Arrays.asList(40, 50));
        Assertions.assertThat((Iterable)((Iterable)listState.get())).containsExactlyInAnyOrderElementsOf(Arrays.asList(40, 50));
        listState.asyncClear().thenCompose(clear -> {
            Assertions.assertThat((Iterable)((Iterable)listState.get())).isNull();
            return listState.asyncAdd((Object)10);
        }).thenCompose(empty -> {
            Assertions.assertThat((Iterable)((Iterable)listState.get())).containsExactlyInAnyOrderElementsOf(Arrays.asList(10));
            return listState.asyncAddAll(Arrays.asList(20, 30));
        }).thenCompose(empty -> {
            Assertions.assertThat((Iterable)((Iterable)listState.get())).containsExactlyInAnyOrderElementsOf(Arrays.asList(10, 20, 30));
            return listState.asyncUpdate(Arrays.asList(40, 50));
        }).thenAccept(empty -> Assertions.assertThat((Iterable)((Iterable)listState.get())).containsExactlyInAnyOrderElementsOf(Arrays.asList(40, 50)));
        adaptor.close();
    }

    @Test
    public void testMapStateAdaptor() throws Exception {
        CheckpointableKeyedStateBackend<String> keyedStateBackend = AsyncKeyedStateBackendAdaptorTest.createBackend();
        AsyncKeyedStateBackendAdaptor adaptor = new AsyncKeyedStateBackendAdaptor(keyedStateBackend);
        keyedStateBackend.setCurrentKey((Object)"test");
        MapStateDescriptor descriptor = new MapStateDescriptor("testState", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        MapState mapState = (MapState)adaptor.getOrCreateKeyedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)descriptor);
        HashMap<Integer, Integer> groundTruth = new HashMap<Integer, Integer>(){
            {
                this.put(10, 100);
                this.put(20, 200);
            }
        };
        mapState.clear();
        Assertions.assertThat((boolean)mapState.isEmpty()).isTrue();
        mapState.put((Object)10, (Object)100);
        Assertions.assertThat((Integer)((Integer)mapState.get((Object)10))).isEqualTo(100);
        mapState.putAll((Map)new HashMap<Integer, Integer>(){
            {
                this.put(20, 200);
            }
        });
        Assertions.assertThat((Integer)((Integer)mapState.get((Object)20))).isEqualTo(200);
        Assertions.assertThat((boolean)mapState.contains((Object)20)).isTrue();
        Assertions.assertThat((Iterable)mapState.entries()).containsExactlyInAnyOrderElementsOf(groundTruth.entrySet());
        Assertions.assertThat((Iterable)mapState.keys()).containsExactlyInAnyOrderElementsOf(groundTruth.keySet());
        Assertions.assertThat((Iterable)mapState.values()).containsExactlyInAnyOrderElementsOf(groundTruth.values());
        Assertions.assertThat((Iterator)mapState.iterator()).toIterable().containsExactlyInAnyOrderElementsOf(groundTruth.entrySet());
        mapState.asyncClear().thenCompose(clear -> {
            Assertions.assertThat((boolean)mapState.isEmpty()).isTrue();
            return mapState.asyncPut((Object)10, (Object)100);
        }).thenCompose(empty -> {
            Assertions.assertThat((Integer)((Integer)mapState.get((Object)10))).isEqualTo(100);
            return mapState.asyncPutAll((Map)groundTruth);
        }).thenCompose(empty -> {
            Assertions.assertThat((Integer)((Integer)mapState.get((Object)20))).isEqualTo(200);
            return mapState.asyncContains((Object)20);
        }).thenCompose(contains -> {
            Assertions.assertThat((Boolean)contains).isTrue();
            return mapState.asyncEntries();
        }).thenCompose(iterator -> {
            HashMap iterated = new HashMap();
            iterator.onNext(entry -> iterated.put((Integer)entry.getKey(), (Integer)entry.getValue()));
            Assertions.assertThat(iterated.entrySet()).containsExactlyInAnyOrderElementsOf(groundTruth.entrySet());
            return mapState.asyncKeys();
        }).thenCompose(iterator -> {
            HashSet iterated = new HashSet();
            iterator.onNext(iterated::add);
            Assertions.assertThat(iterated).containsExactlyInAnyOrderElementsOf(groundTruth.keySet());
            return mapState.asyncValues();
        }).thenAccept(iterator -> {
            HashSet iterated = new HashSet();
            iterator.onNext(iterated::add);
            Assertions.assertThat(iterated).containsExactlyInAnyOrderElementsOf(groundTruth.values());
        });
        adaptor.close();
    }

    @Test
    public void testReducingStateAdaptor() throws Exception {
        CheckpointableKeyedStateBackend<String> keyedStateBackend = AsyncKeyedStateBackendAdaptorTest.createBackend();
        AsyncKeyedStateBackendAdaptor adaptor = new AsyncKeyedStateBackendAdaptor(keyedStateBackend);
        keyedStateBackend.setCurrentKey((Object)"test");
        ReducingStateDescriptor descriptor = new ReducingStateDescriptor("testState", Integer::sum, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        InternalReducingState reducingState = (InternalReducingState)adaptor.getOrCreateKeyedState((Object)0L, (TypeSerializer)LongSerializer.INSTANCE, (StateDescriptor)descriptor);
        reducingState.clear();
        Assertions.assertThat((Integer)((Integer)reducingState.get())).isNull();
        reducingState.add((Object)10);
        Assertions.assertThat((Integer)((Integer)reducingState.get())).isEqualTo(10);
        reducingState.add((Object)20);
        Assertions.assertThat((Integer)((Integer)reducingState.get())).isEqualTo(30);
        reducingState.setCurrentNamespace((Object)1L);
        Assertions.assertThat((Integer)((Integer)reducingState.get())).isNull();
        reducingState.add((Object)30);
        reducingState.mergeNamespaces((Object)0L, Arrays.asList(0L, 1L));
        reducingState.setCurrentNamespace((Object)0L);
        Assertions.assertThat((Integer)((Integer)reducingState.get())).isEqualTo(60);
        reducingState.setCurrentNamespace((Object)1L);
        Assertions.assertThat((Integer)((Integer)reducingState.get())).isNull();
        reducingState.setCurrentNamespace((Object)0L);
        reducingState.asyncClear().thenCompose(clear -> {
            Assertions.assertThat((Integer)((Integer)reducingState.get())).isNull();
            return reducingState.asyncAdd((Object)10);
        }).thenCompose(empty -> reducingState.asyncGet()).thenCompose(value -> {
            Assertions.assertThat((Integer)value).isEqualTo(10);
            return reducingState.asyncAdd((Object)20);
        }).thenCompose(empty -> reducingState.asyncGet()).thenCompose(value -> {
            Assertions.assertThat((Integer)value).isEqualTo(30);
            reducingState.setCurrentNamespace((Object)1L);
            return reducingState.asyncGet();
        }).thenCompose(value -> {
            Assertions.assertThat((Integer)value).isNull();
            return reducingState.asyncAdd((Object)30);
        }).thenCompose(empty -> reducingState.asyncMergeNamespaces((Object)0L, Arrays.asList(0L, 1L))).thenCompose(empty -> {
            reducingState.setCurrentNamespace((Object)0L);
            return reducingState.asyncGet();
        }).thenCompose(value -> {
            Assertions.assertThat((Integer)value).isEqualTo(60);
            reducingState.setCurrentNamespace((Object)1L);
            return reducingState.asyncGet();
        }).thenAccept(value -> Assertions.assertThat((Integer)value).isNull());
        adaptor.close();
    }

    @Test
    public void testAggregatingStateAdaptor() throws Exception {
        CheckpointableKeyedStateBackend<String> keyedStateBackend = AsyncKeyedStateBackendAdaptorTest.createBackend();
        AsyncKeyedStateBackendAdaptor adaptor = new AsyncKeyedStateBackendAdaptor(keyedStateBackend);
        keyedStateBackend.setCurrentKey((Object)"test");
        AggregatingStateDescriptor descriptor = new AggregatingStateDescriptor("testState", (AggregateFunction)new AggregateFunction<Integer, Integer, String>(){

            public Integer createAccumulator() {
                return 0;
            }

            public Integer add(Integer value, Integer accumulator) {
                return accumulator + value;
            }

            public String getResult(Integer accumulator) {
                return accumulator.toString();
            }

            public Integer merge(Integer a, Integer b) {
                return a + b;
            }
        }, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        InternalAggregatingState aggState = (InternalAggregatingState)adaptor.getOrCreateKeyedState((Object)0L, (TypeSerializer)LongSerializer.INSTANCE, (StateDescriptor)descriptor);
        aggState.clear();
        Assertions.assertThat((String)((String)aggState.get())).isNull();
        aggState.add((Object)10);
        Assertions.assertThat((String)((String)aggState.get())).isEqualTo("10");
        aggState.add((Object)20);
        Assertions.assertThat((String)((String)aggState.get())).isEqualTo("30");
        aggState.setCurrentNamespace((Object)1L);
        Assertions.assertThat((String)((String)aggState.get())).isNull();
        aggState.add((Object)30);
        aggState.mergeNamespaces((Object)0L, Arrays.asList(0L, 1L));
        aggState.setCurrentNamespace((Object)0L);
        Assertions.assertThat((String)((String)aggState.get())).isEqualTo("60");
        aggState.setCurrentNamespace((Object)1L);
        Assertions.assertThat((String)((String)aggState.get())).isNull();
        aggState.setCurrentNamespace((Object)0L);
        aggState.asyncClear().thenCompose(clear -> {
            Assertions.assertThat((String)((String)aggState.get())).isNull();
            return aggState.asyncAdd((Object)10);
        }).thenCompose(empty -> aggState.asyncGet()).thenCompose(value -> {
            Assertions.assertThat((String)value).isEqualTo("10");
            return aggState.asyncAdd((Object)20);
        }).thenCompose(empty -> aggState.asyncGet()).thenCompose(value -> {
            Assertions.assertThat((String)value).isEqualTo("30");
            aggState.setCurrentNamespace((Object)1L);
            return aggState.asyncGet();
        }).thenCompose(value -> {
            Assertions.assertThat((String)value).isNull();
            return aggState.asyncAdd((Object)30);
        }).thenCompose(empty -> aggState.asyncMergeNamespaces((Object)0L, Arrays.asList(0L, 1L))).thenCompose(empty -> {
            aggState.setCurrentNamespace((Object)0L);
            return aggState.asyncGet();
        }).thenCompose(value -> {
            Assertions.assertThat((String)value).isEqualTo("60");
            aggState.setCurrentNamespace((Object)1L);
            return aggState.asyncGet();
        }).thenAccept(value -> Assertions.assertThat((String)value).isNull());
        adaptor.close();
    }

    private static CheckpointableKeyedStateBackend<String> createBackend() throws Exception {
        ExecutionConfig executionConfig = new ExecutionConfig();
        return new HeapKeyedStateBackendBuilder(new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()), (TypeSerializer)StringSerializer.INSTANCE, ClassLoader.getSystemClassLoader(), 128, new KeyGroupRange(0, 127), executionConfig, TtlTimeProvider.DEFAULT, LatencyTrackingStateConfig.disabled(), (Collection)Collections.EMPTY_LIST, AbstractStateBackend.getCompressionDecorator((ExecutionConfig)executionConfig), TestLocalRecoveryConfig.disabled(), new HeapPriorityQueueSetFactory(new KeyGroupRange(0, 127), 128, 128), true, new CloseableRegistry()).build();
    }
}

