/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackendParametersImpl;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.PartitionableListState;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.filemerging.FileMergingOperatorStreamStateHandle;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
import org.apache.flink.runtime.state.filesystem.FsMergingCheckpointStorageLocation;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
import org.apache.flink.runtime.util.TestingUserCodeClassLoader;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;

class OperatorStateBackendTest {
    @RegisterExtension
    private static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION = new TestExecutorExtension(Executors::newCachedThreadPool);
    private final ClassLoader classLoader = this.getClass().getClassLoader();
    private final Collection<OperatorStateHandle> emptyStateHandles = Collections.emptyList();

    OperatorStateBackendTest() {
    }

    @Test
    void testCreateOnAbstractStateBackend() throws Exception {
        HashMapStateBackend abstractStateBackend = new HashMapStateBackend();
        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
        Environment env = OperatorStateBackendTest.createMockEnvironment();
        OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend((StateBackend.OperatorStateBackendParameters)new OperatorStateBackendParametersImpl(env, "test-operator", this.emptyStateHandles, cancelStreamRegistry));
        Assertions.assertThat((Object)operatorStateBackend).isNotNull();
        Assertions.assertThat((Collection)operatorStateBackend.getRegisteredStateNames()).isEmpty();
        Assertions.assertThat((Collection)operatorStateBackend.getRegisteredBroadcastStateNames()).isEmpty();
    }

    @Test
    void testRegisterStatesWithoutTypeSerializer() throws Exception {
        Class<FutureTask> registeredType = FutureTask.class;
        Assertions.assertThat((boolean)(new KryoSerializer(File.class, (SerializerConfig)new SerializerConfigImpl()).getKryo().getDefaultSerializer(registeredType) instanceof com.esotericsoftware.kryo.serializers.JavaSerializer)).isFalse();
        ExecutionConfig cfg = new ExecutionConfig();
        ((SerializerConfigImpl)cfg.getSerializerConfig()).registerTypeWithKryoSerializer(registeredType, com.esotericsoftware.kryo.serializers.JavaSerializer.class);
        DefaultOperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackendBuilder(this.classLoader, cfg, false, this.emptyStateHandles, new CloseableRegistry()).build();
        ListStateDescriptor stateDescriptor = new ListStateDescriptor("test", File.class);
        ListStateDescriptor stateDescriptor2 = new ListStateDescriptor("test2", String.class);
        ListState listState = operatorStateBackend.getListState(stateDescriptor);
        Assertions.assertThat((Object)listState).isNotNull();
        ListState listState2 = operatorStateBackend.getListState(stateDescriptor2);
        Assertions.assertThat((Object)listState2).isNotNull();
        Assertions.assertThat((Collection)operatorStateBackend.getRegisteredStateNames()).hasSize(2);
        TypeSerializer serializer = ((PartitionableListState)listState).getStateMetaInfo().getPartitionStateSerializer();
        Assertions.assertThat((Object)serializer).isInstanceOf(KryoSerializer.class);
        Assertions.assertThat((Object)((KryoSerializer)serializer).getKryo().getSerializer(registeredType)).isInstanceOf(com.esotericsoftware.kryo.serializers.JavaSerializer.class);
        Iterator it = ((Iterable)listState2.get()).iterator();
        Assertions.assertThat(it).isExhausted();
        listState2.add((Object)"kevin");
        listState2.add((Object)"sunny");
        it = ((Iterable)listState2.get()).iterator();
        Assertions.assertThat((String)((String)it.next())).isEqualTo("kevin");
        Assertions.assertThat((String)((String)it.next())).isEqualTo("sunny");
        Assertions.assertThat(it).isExhausted();
    }

    @Test
    void testRegisterStates() throws Exception {
        DefaultOperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackendBuilder(this.classLoader, new ExecutionConfig(), false, this.emptyStateHandles, new CloseableRegistry()).build();
        ListStateDescriptor stateDescriptor1 = new ListStateDescriptor("test1", (TypeSerializer)new JavaSerializer());
        ListStateDescriptor stateDescriptor2 = new ListStateDescriptor("test2", (TypeSerializer)new JavaSerializer());
        ListStateDescriptor stateDescriptor3 = new ListStateDescriptor("test3", (TypeSerializer)new JavaSerializer());
        ListState listState1 = operatorStateBackend.getListState(stateDescriptor1);
        Assertions.assertThat((Object)listState1).isNotNull();
        Assertions.assertThat((Collection)operatorStateBackend.getRegisteredStateNames()).hasSize(1);
        Iterator it = ((Iterable)listState1.get()).iterator();
        Assertions.assertThat(it).isExhausted();
        listState1.add((Object)42);
        listState1.add((Object)4711);
        it = ((Iterable)listState1.get()).iterator();
        Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)42);
        Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)4711);
        Assertions.assertThat(it).isExhausted();
        ListState listState2 = operatorStateBackend.getListState(stateDescriptor2);
        Assertions.assertThat((Object)listState2).isNotNull();
        Assertions.assertThat((Collection)operatorStateBackend.getRegisteredStateNames()).hasSize(2);
        Assertions.assertThat(it).isExhausted();
        listState2.add((Object)7);
        listState2.add((Object)13);
        listState2.add((Object)23);
        it = ((Iterable)listState2.get()).iterator();
        Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)7);
        Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)13);
        Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)23);
        Assertions.assertThat(it).isExhausted();
        ListState listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
        Assertions.assertThat((Object)listState3).isNotNull();
        Assertions.assertThat((Collection)operatorStateBackend.getRegisteredStateNames()).hasSize(3);
        Assertions.assertThat(it).isExhausted();
        listState3.add((Object)17);
        listState3.add((Object)3);
        listState3.add((Object)123);
        it = ((Iterable)listState3.get()).iterator();
        Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)17);
        Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)3);
        Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)123);
        Assertions.assertThat(it).isExhausted();
        ListState listState1b = operatorStateBackend.getListState(stateDescriptor1);
        Assertions.assertThat((Object)listState1b).isNotNull();
        listState1b.add((Object)123);
        it = ((Iterable)listState1b.get()).iterator();
        Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)42);
        Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)4711);
        Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)123);
        Assertions.assertThat(it).isExhausted();
        it = ((Iterable)listState1.get()).iterator();
        Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)42);
        Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)4711);
        Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)123);
        Assertions.assertThat(it).isExhausted();
        it = ((Iterable)listState1b.get()).iterator();
        Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)42);
        Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)4711);
        Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)123);
        Assertions.assertThat(it).isExhausted();
        Assertions.assertThatThrownBy(() -> OperatorStateBackendTest.lambda$testRegisterStates$0((OperatorStateBackend)operatorStateBackend, stateDescriptor2)).isInstanceOf(IllegalStateException.class);
        Assertions.assertThatThrownBy(() -> OperatorStateBackendTest.lambda$testRegisterStates$1((OperatorStateBackend)operatorStateBackend, stateDescriptor3)).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testCorrectClassLoaderUsedOnSnapshot() throws Exception {
        HashMapStateBackend abstractStateBackend = new HashMapStateBackend();
        Environment env = OperatorStateBackendTest.createMockEnvironment();
        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
        OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend((StateBackend.OperatorStateBackendParameters)new OperatorStateBackendParametersImpl(env, "test-op-name", this.emptyStateHandles, cancelStreamRegistry));
        AtomicInteger copyCounter = new AtomicInteger(0);
        VerifyingIntSerializer serializer = new VerifyingIntSerializer(env.getUserCodeClassLoader().asClassLoader(), copyCounter);
        ListStateDescriptor stateDescriptor = new ListStateDescriptor("test", (TypeSerializer)serializer);
        ListState listState = operatorStateBackend.getListState(stateDescriptor);
        listState.add((Object)42);
        AtomicInteger keyCopyCounter = new AtomicInteger(0);
        AtomicInteger valueCopyCounter = new AtomicInteger(0);
        VerifyingIntSerializer keySerializer = new VerifyingIntSerializer(env.getUserCodeClassLoader().asClassLoader(), keyCopyCounter);
        VerifyingIntSerializer valueSerializer = new VerifyingIntSerializer(env.getUserCodeClassLoader().asClassLoader(), valueCopyCounter);
        MapStateDescriptor broadcastStateDesc = new MapStateDescriptor("test-broadcast", (TypeSerializer)keySerializer, (TypeSerializer)valueSerializer);
        BroadcastState broadcastState = operatorStateBackend.getBroadcastState(broadcastStateDesc);
        broadcastState.put((Object)1, (Object)2);
        broadcastState.put((Object)3, (Object)4);
        broadcastState.put((Object)5, (Object)6);
        MemCheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);
        RunnableFuture runnableFuture = operatorStateBackend.snapshot(1L, 1L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
        FutureUtils.runIfNotDoneAndGet((RunnableFuture)runnableFuture);
        Assertions.assertThat((int)copyCounter.get()).isGreaterThan(0);
        Assertions.assertThat((int)keyCopyCounter.get()).isGreaterThan(0);
        Assertions.assertThat((int)valueCopyCounter.get()).isGreaterThan(0);
    }

    @Test
    void testSnapshotEmpty() throws Exception {
        HashMapStateBackend abstractStateBackend = new HashMapStateBackend();
        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
        Environment env = OperatorStateBackendTest.createMockEnvironment();
        OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend((StateBackend.OperatorStateBackendParameters)new OperatorStateBackendParametersImpl(env, "testOperator", this.emptyStateHandles, cancelStreamRegistry));
        MemCheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);
        RunnableFuture snapshot = operatorStateBackend.snapshot(0L, 0L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
        SnapshotResult snapshotResult = (SnapshotResult)FutureUtils.runIfNotDoneAndGet((RunnableFuture)snapshot);
        OperatorStateHandle stateHandle = (OperatorStateHandle)snapshotResult.getJobManagerOwnedSnapshot();
        Assertions.assertThat((Object)stateHandle).isNull();
    }

    @Test
    void testFileMergingSnapshotEmpty(@TempDir File tmpFolder) throws Exception {
        HashMapStateBackend abstractStateBackend = new HashMapStateBackend();
        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
        Environment env = OperatorStateBackendTest.createMockEnvironment();
        OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend((StateBackend.OperatorStateBackendParameters)new OperatorStateBackendParametersImpl(env, "testOperator", this.emptyStateHandles, cancelStreamRegistry));
        Path checkpointBaseDir = new Path(tmpFolder.toString());
        Path sharedStateDir = new Path(checkpointBaseDir, "shared");
        Path taskOwnedStateDir = new Path(checkpointBaseDir, "taskowned");
        JobID jobId = JobID.generate();
        FileMergingSnapshotManager.SubtaskKey subtaskKey = new FileMergingSnapshotManager.SubtaskKey(jobId.toHexString(), "opId", 1, 1);
        LocalFileSystem fs = LocalFileSystem.getSharedInstance();
        CheckpointStorageLocationReference cslReference = AbstractFsCheckpointStorageAccess.encodePathAsReference((Path)Path.fromLocalFile((File)fs.pathToFile(checkpointBaseDir)));
        FileMergingSnapshotManager snapshotManager = this.createFileMergingSnapshotManager(checkpointBaseDir, sharedStateDir, taskOwnedStateDir, subtaskKey);
        FsMergingCheckpointStorageLocation streamFactory = new FsMergingCheckpointStorageLocation(subtaskKey, (FileSystem)fs, checkpointBaseDir, sharedStateDir, taskOwnedStateDir, cslReference, 1024, 1024, snapshotManager, 0L);
        RunnableFuture snapshot = operatorStateBackend.snapshot(0L, 0L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
        SnapshotResult snapshotResult = (SnapshotResult)FutureUtils.runIfNotDoneAndGet((RunnableFuture)snapshot);
        OperatorStateHandle stateHandle = (OperatorStateHandle)snapshotResult.getJobManagerOwnedSnapshot();
        Assertions.assertThat((Object)stateHandle).isInstanceOf(FileMergingOperatorStreamStateHandle.class);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testSnapshotBroadcastStateWithEmptyOperatorState() throws Exception {
        HashMapStateBackend abstractStateBackend = new HashMapStateBackend();
        Environment env = OperatorStateBackendTest.createMockEnvironment();
        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
        OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend((StateBackend.OperatorStateBackendParameters)new OperatorStateBackendParametersImpl(env, "testOperator", this.emptyStateHandles, cancelStreamRegistry));
        MapStateDescriptor broadcastStateDesc = new MapStateDescriptor("test-broadcast", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>(3);
        expected.put(1, 2);
        expected.put(3, 4);
        expected.put(5, 6);
        BroadcastState broadcastState = operatorStateBackend.getBroadcastState(broadcastStateDesc);
        broadcastState.putAll(expected);
        MemCheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);
        OperatorStateHandle stateHandle = null;
        try {
            RunnableFuture snapshot = operatorStateBackend.snapshot(0L, 0L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            SnapshotResult snapshotResult = (SnapshotResult)FutureUtils.runIfNotDoneAndGet((RunnableFuture)snapshot);
            stateHandle = (OperatorStateHandle)snapshotResult.getJobManagerOwnedSnapshot();
            Assertions.assertThat((Object)stateHandle).isNotNull();
            HashMap<Integer, Integer> retrieved = new HashMap<Integer, Integer>();
            operatorStateBackend = OperatorStateBackendTest.recreateOperatorStateBackend(operatorStateBackend, (AbstractStateBackend)abstractStateBackend, (Collection<OperatorStateHandle>)StateObjectCollection.singleton((StateObject)stateHandle));
            BroadcastState retrievedState = operatorStateBackend.getBroadcastState(broadcastStateDesc);
            for (Map.Entry e : retrievedState.entries()) {
                retrieved.put((Integer)e.getKey(), (Integer)e.getValue());
            }
            Assertions.assertThat(retrieved).isEqualTo(expected);
            retrievedState.remove((Object)1);
            expected.remove(1);
            snapshot = operatorStateBackend.snapshot(1L, 1L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            snapshotResult = (SnapshotResult)FutureUtils.runIfNotDoneAndGet((RunnableFuture)snapshot);
            stateHandle.discardState();
            stateHandle = (OperatorStateHandle)snapshotResult.getJobManagerOwnedSnapshot();
            retrieved.clear();
            operatorStateBackend = OperatorStateBackendTest.recreateOperatorStateBackend(operatorStateBackend, (AbstractStateBackend)abstractStateBackend, (Collection<OperatorStateHandle>)StateObjectCollection.singleton((StateObject)stateHandle));
            retrievedState = operatorStateBackend.getBroadcastState(broadcastStateDesc);
            for (Map.Entry e : retrievedState.immutableEntries()) {
                retrieved.put((Integer)e.getKey(), (Integer)e.getValue());
            }
            Assertions.assertThat(retrieved).isEqualTo(expected);
            retrievedState.clear();
            expected.clear();
            snapshot = operatorStateBackend.snapshot(2L, 2L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            snapshotResult = (SnapshotResult)FutureUtils.runIfNotDoneAndGet((RunnableFuture)snapshot);
            if (stateHandle != null) {
                stateHandle.discardState();
            }
            stateHandle = (OperatorStateHandle)snapshotResult.getJobManagerOwnedSnapshot();
            retrieved.clear();
            operatorStateBackend = OperatorStateBackendTest.recreateOperatorStateBackend(operatorStateBackend, (AbstractStateBackend)abstractStateBackend, (Collection<OperatorStateHandle>)StateObjectCollection.singleton((StateObject)stateHandle));
            retrievedState = operatorStateBackend.getBroadcastState(broadcastStateDesc);
            for (Map.Entry e : retrievedState.immutableEntries()) {
                retrieved.put((Integer)e.getKey(), (Integer)e.getValue());
            }
            Assertions.assertThat(expected).isEmpty();
            Assertions.assertThat(retrieved).isEqualTo(expected);
            if (stateHandle != null) {
                stateHandle.discardState();
                stateHandle = null;
            }
        }
        finally {
            operatorStateBackend.close();
            operatorStateBackend.dispose();
            if (stateHandle != null) {
                stateHandle.discardState();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testSnapshotRestoreSync() throws Exception {
        HashMapStateBackend abstractStateBackend = new HashMapStateBackend();
        Environment env1 = OperatorStateBackendTest.createMockEnvironment();
        CloseableRegistry cancelStreamRegistry1 = new CloseableRegistry();
        OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend((StateBackend.OperatorStateBackendParameters)new OperatorStateBackendParametersImpl(env1, "test-op-name", this.emptyStateHandles, cancelStreamRegistry1));
        ListStateDescriptor stateDescriptor1 = new ListStateDescriptor("test1", (TypeSerializer)new JavaSerializer());
        ListStateDescriptor stateDescriptor2 = new ListStateDescriptor("test2", (TypeSerializer)new JavaSerializer());
        ListStateDescriptor stateDescriptor3 = new ListStateDescriptor("test3", (TypeSerializer)new JavaSerializer());
        MapStateDescriptor broadcastStateDescriptor1 = new MapStateDescriptor("test4", (TypeSerializer)new JavaSerializer(), (TypeSerializer)new JavaSerializer());
        MapStateDescriptor broadcastStateDescriptor2 = new MapStateDescriptor("test5", (TypeSerializer)new JavaSerializer(), (TypeSerializer)new JavaSerializer());
        MapStateDescriptor broadcastStateDescriptor3 = new MapStateDescriptor("test6", (TypeSerializer)new JavaSerializer(), (TypeSerializer)new JavaSerializer());
        ListState listState1 = operatorStateBackend.getListState(stateDescriptor1);
        ListState listState2 = operatorStateBackend.getListState(stateDescriptor2);
        ListState listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
        BroadcastState broadcastState1 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor1);
        BroadcastState broadcastState2 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor2);
        BroadcastState broadcastState3 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor3);
        listState1.add((Object)42);
        listState1.add((Object)4711);
        listState2.add((Object)7);
        listState2.add((Object)13);
        listState2.add((Object)23);
        listState3.add((Object)17);
        listState3.add((Object)18);
        listState3.add((Object)19);
        listState3.add((Object)20);
        broadcastState1.put((Object)1, (Object)2);
        broadcastState1.put((Object)2, (Object)5);
        broadcastState2.put((Object)2, (Object)5);
        MemCheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(8192);
        RunnableFuture snapshot = operatorStateBackend.snapshot(1L, 1L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
        SnapshotResult snapshotResult = (SnapshotResult)FutureUtils.runIfNotDoneAndGet((RunnableFuture)snapshot);
        OperatorStateHandle stateHandle = (OperatorStateHandle)snapshotResult.getJobManagerOwnedSnapshot();
        try {
            operatorStateBackend.close();
            operatorStateBackend.dispose();
            Environment env = OperatorStateBackendTest.createMockEnvironment();
            StateObjectCollection stateHandles = StateObjectCollection.singleton((StateObject)stateHandle);
            CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
            operatorStateBackend = abstractStateBackend.createOperatorStateBackend((StateBackend.OperatorStateBackendParameters)new OperatorStateBackendParametersImpl(env, "testOperator", (Collection)stateHandles, cancelStreamRegistry));
            Assertions.assertThat((Collection)operatorStateBackend.getRegisteredStateNames()).hasSize(3);
            Assertions.assertThat((Collection)operatorStateBackend.getRegisteredBroadcastStateNames()).hasSize(3);
            listState1 = operatorStateBackend.getListState(stateDescriptor1);
            listState2 = operatorStateBackend.getListState(stateDescriptor2);
            listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
            broadcastState1 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor1);
            broadcastState2 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor2);
            broadcastState3 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor3);
            Assertions.assertThat((Collection)operatorStateBackend.getRegisteredStateNames()).hasSize(3);
            Assertions.assertThat((Collection)operatorStateBackend.getRegisteredBroadcastStateNames()).hasSize(3);
            Iterator it = ((Iterable)listState1.get()).iterator();
            Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)42);
            Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)4711);
            Assertions.assertThat(it).isExhausted();
            it = ((Iterable)listState2.get()).iterator();
            Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)7);
            Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)13);
            Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)23);
            Assertions.assertThat(it).isExhausted();
            it = ((Iterable)listState3.get()).iterator();
            Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)17);
            Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)18);
            Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)19);
            Assertions.assertThat((Object)((Serializable)it.next())).isEqualTo((Object)20);
            Assertions.assertThat(it).isExhausted();
            Iterator bIt = broadcastState1.iterator();
            Assertions.assertThat((Iterator)bIt).hasNext();
            Map.Entry entry = (Map.Entry)bIt.next();
            Assertions.assertThat((Object)((Serializable)entry.getKey())).isEqualTo((Object)1);
            Assertions.assertThat((Object)((Serializable)entry.getValue())).isEqualTo((Object)2);
            Assertions.assertThat((Iterator)bIt).hasNext();
            entry = (Map.Entry)bIt.next();
            Assertions.assertThat((Object)((Serializable)entry.getKey())).isEqualTo((Object)2);
            Assertions.assertThat((Object)((Serializable)entry.getValue())).isEqualTo((Object)5);
            Assertions.assertThat((Iterator)bIt).isExhausted();
            bIt = broadcastState2.iterator();
            Assertions.assertThat((Iterator)bIt).hasNext();
            entry = (Map.Entry)bIt.next();
            Assertions.assertThat((Object)((Serializable)entry.getKey())).isEqualTo((Object)2);
            Assertions.assertThat((Object)((Serializable)entry.getValue())).isEqualTo((Object)5);
            Assertions.assertThat((Iterator)bIt).isExhausted();
            bIt = broadcastState3.iterator();
            Assertions.assertThat((Iterator)bIt).isExhausted();
            operatorStateBackend.close();
            operatorStateBackend.dispose();
        }
        finally {
            stateHandle.discardState();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testSnapshotRestoreAsync() throws Exception {
        DefaultOperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackendBuilder(OperatorStateBackendTest.class.getClassLoader(), new ExecutionConfig(), true, this.emptyStateHandles, new CloseableRegistry()).build();
        ListStateDescriptor stateDescriptor1 = new ListStateDescriptor("test1", (TypeSerializer)new JavaSerializer());
        ListStateDescriptor stateDescriptor2 = new ListStateDescriptor("test2", (TypeSerializer)new JavaSerializer());
        ListStateDescriptor stateDescriptor3 = new ListStateDescriptor("test3", (TypeSerializer)new JavaSerializer());
        MapStateDescriptor broadcastStateDescriptor1 = new MapStateDescriptor("test4", (TypeSerializer)new JavaSerializer(), (TypeSerializer)new JavaSerializer());
        MapStateDescriptor broadcastStateDescriptor2 = new MapStateDescriptor("test5", (TypeSerializer)new JavaSerializer(), (TypeSerializer)new JavaSerializer());
        MapStateDescriptor broadcastStateDescriptor3 = new MapStateDescriptor("test6", (TypeSerializer)new JavaSerializer(), (TypeSerializer)new JavaSerializer());
        ListState listState1 = operatorStateBackend.getListState(stateDescriptor1);
        ListState listState2 = operatorStateBackend.getListState(stateDescriptor2);
        ListState listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
        BroadcastState broadcastState1 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor1);
        BroadcastState broadcastState2 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor2);
        BroadcastState broadcastState3 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor3);
        listState1.add((Object)MutableType.of(42));
        listState1.add((Object)MutableType.of(4711));
        listState2.add((Object)MutableType.of(7));
        listState2.add((Object)MutableType.of(13));
        listState2.add((Object)MutableType.of(23));
        listState3.add((Object)MutableType.of(17));
        listState3.add((Object)MutableType.of(18));
        listState3.add((Object)MutableType.of(19));
        listState3.add((Object)MutableType.of(20));
        broadcastState1.put((Object)MutableType.of(1), (Object)MutableType.of(2));
        broadcastState1.put((Object)MutableType.of(2), (Object)MutableType.of(5));
        broadcastState2.put((Object)MutableType.of(2), (Object)MutableType.of(5));
        BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(0x100000);
        OneShotLatch waiterLatch = new OneShotLatch();
        OneShotLatch blockerLatch = new OneShotLatch();
        streamFactory.setWaiterLatch(waiterLatch);
        streamFactory.setBlockerLatch(blockerLatch);
        RunnableFuture runnableFuture = operatorStateBackend.snapshot(1L, 1L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
        ExecutorService executorService = EXECUTOR_EXTENSION.getExecutor();
        executorService.submit(runnableFuture);
        waiterLatch.await();
        listState1.add((Object)MutableType.of(77));
        broadcastState1.put((Object)MutableType.of(32), (Object)MutableType.of(97));
        int n = 0;
        for (MutableType mutableType : (Iterable)listState2.get()) {
            if (++n == 2) {
                blockerLatch.trigger();
            }
            mutableType.setValue(mutableType.getValue() + 10);
        }
        listState3.clear();
        broadcastState2.clear();
        operatorStateBackend.getListState(new ListStateDescriptor("test4", (TypeSerializer)new JavaSerializer()));
        SnapshotResult snapshotResult = (SnapshotResult)runnableFuture.get();
        OperatorStateHandle stateHandle = (OperatorStateHandle)snapshotResult.getJobManagerOwnedSnapshot();
        try {
            operatorStateBackend.close();
            operatorStateBackend.dispose();
            HashMapStateBackend abstractStateBackend = new HashMapStateBackend();
            CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
            Environment env = OperatorStateBackendTest.createMockEnvironment();
            StateObjectCollection stateHandles = StateObjectCollection.singleton((StateObject)stateHandle);
            operatorStateBackend = abstractStateBackend.createOperatorStateBackend((StateBackend.OperatorStateBackendParameters)new OperatorStateBackendParametersImpl(env, "testOperator", (Collection)stateHandles, cancelStreamRegistry));
            Assertions.assertThat((Collection)operatorStateBackend.getRegisteredStateNames()).hasSize(3);
            Assertions.assertThat((Collection)operatorStateBackend.getRegisteredBroadcastStateNames()).hasSize(3);
            listState1 = operatorStateBackend.getListState(stateDescriptor1);
            listState2 = operatorStateBackend.getListState(stateDescriptor2);
            listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
            broadcastState1 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor1);
            broadcastState2 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor2);
            broadcastState3 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor3);
            Assertions.assertThat((Collection)operatorStateBackend.getRegisteredStateNames()).hasSize(3);
            Assertions.assertThat((Collection)operatorStateBackend.getRegisteredBroadcastStateNames()).hasSize(3);
            Iterator it = ((Iterable)listState1.get()).iterator();
            Assertions.assertThat((int)((MutableType)it.next()).value).isEqualTo(42);
            Assertions.assertThat((int)((MutableType)it.next()).value).isEqualTo(4711);
            Assertions.assertThat(it).isExhausted();
            it = ((Iterable)listState2.get()).iterator();
            Assertions.assertThat((int)((MutableType)it.next()).value).isEqualTo(7);
            Assertions.assertThat((int)((MutableType)it.next()).value).isEqualTo(13);
            Assertions.assertThat((int)((MutableType)it.next()).value).isEqualTo(23);
            Assertions.assertThat(it).isExhausted();
            it = ((Iterable)listState3.get()).iterator();
            Assertions.assertThat((int)((MutableType)it.next()).value).isEqualTo(17);
            Assertions.assertThat((int)((MutableType)it.next()).value).isEqualTo(18);
            Assertions.assertThat((int)((MutableType)it.next()).value).isEqualTo(19);
            Assertions.assertThat((int)((MutableType)it.next()).value).isEqualTo(20);
            Assertions.assertThat(it).isExhausted();
            Iterator bIt = broadcastState1.iterator();
            Assertions.assertThat((Iterator)bIt).hasNext();
            Map.Entry entry = (Map.Entry)bIt.next();
            Assertions.assertThat((int)((MutableType)entry.getKey()).value).isOne();
            Assertions.assertThat((int)((MutableType)entry.getValue()).value).isEqualTo(2);
            Assertions.assertThat((Iterator)bIt).hasNext();
            entry = (Map.Entry)bIt.next();
            Assertions.assertThat((int)((MutableType)entry.getKey()).value).isEqualTo(2);
            Assertions.assertThat((int)((MutableType)entry.getValue()).value).isEqualTo(5);
            Assertions.assertThat((Iterator)bIt).isExhausted();
            bIt = broadcastState2.iterator();
            Assertions.assertThat((Iterator)bIt).hasNext();
            entry = (Map.Entry)bIt.next();
            Assertions.assertThat((int)((MutableType)entry.getKey()).value).isEqualTo(2);
            Assertions.assertThat((int)((MutableType)entry.getValue()).value).isEqualTo(5);
            Assertions.assertThat((Iterator)bIt).isExhausted();
            bIt = broadcastState3.iterator();
            Assertions.assertThat((Iterator)bIt).isExhausted();
            operatorStateBackend.close();
            operatorStateBackend.dispose();
        }
        finally {
            stateHandle.discardState();
        }
        executorService.shutdown();
    }

    @Test
    void testSnapshotAsyncClose() throws Exception {
        DefaultOperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackendBuilder(OperatorStateBackendTest.class.getClassLoader(), new ExecutionConfig(), true, this.emptyStateHandles, new CloseableRegistry()).build();
        ListStateDescriptor stateDescriptor1 = new ListStateDescriptor("test1", (TypeSerializer)new JavaSerializer());
        ListState listState1 = operatorStateBackend.getListState(stateDescriptor1);
        listState1.add((Object)MutableType.of(42));
        listState1.add((Object)MutableType.of(4711));
        MapStateDescriptor broadcastStateDescriptor1 = new MapStateDescriptor("test4", (TypeSerializer)new JavaSerializer(), (TypeSerializer)new JavaSerializer());
        BroadcastState broadcastState1 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor1);
        broadcastState1.put((Object)MutableType.of(1), (Object)MutableType.of(2));
        broadcastState1.put((Object)MutableType.of(2), (Object)MutableType.of(5));
        BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(0x100000);
        OneShotLatch waiterLatch = new OneShotLatch();
        OneShotLatch blockerLatch = new OneShotLatch();
        streamFactory.setWaiterLatch(waiterLatch);
        streamFactory.setBlockerLatch(blockerLatch);
        RunnableFuture runnableFuture = operatorStateBackend.snapshot(1L, 1L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
        EXECUTOR_EXTENSION.getExecutor().submit(runnableFuture);
        waiterLatch.await();
        operatorStateBackend.close();
        blockerLatch.trigger();
        Assertions.assertThatThrownBy(() -> runnableFuture.get(60L, TimeUnit.SECONDS)).isInstanceOf(CancellationException.class);
    }

    @Test
    void testSnapshotAsyncCancel() throws Exception {
        DefaultOperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackendBuilder(OperatorStateBackendTest.class.getClassLoader(), new ExecutionConfig(), true, this.emptyStateHandles, new CloseableRegistry()).build();
        ListStateDescriptor stateDescriptor1 = new ListStateDescriptor("test1", (TypeSerializer)new JavaSerializer());
        ListState listState1 = operatorStateBackend.getListState(stateDescriptor1);
        listState1.add((Object)MutableType.of(42));
        listState1.add((Object)MutableType.of(4711));
        BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(0x100000);
        OneShotLatch waiterLatch = new OneShotLatch();
        OneShotLatch blockerLatch = new OneShotLatch();
        streamFactory.setWaiterLatch(waiterLatch);
        streamFactory.setBlockerLatch(blockerLatch);
        RunnableFuture runnableFuture = operatorStateBackend.snapshot(1L, 1L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
        EXECUTOR_EXTENSION.getExecutor().submit(runnableFuture);
        waiterLatch.await();
        runnableFuture.cancel(true);
        for (BlockingCheckpointOutputStream stream : streamFactory.getAllCreatedStreams()) {
            Assertions.assertThat((boolean)stream.isClosed()).isTrue();
        }
        blockerLatch.trigger();
        Assertions.assertThatThrownBy(() -> runnableFuture.get(60L, TimeUnit.SECONDS)).isInstanceOf(CancellationException.class);
    }

    private static Environment createMockEnvironment() {
        Environment env = (Environment)Mockito.mock(Environment.class);
        Mockito.when((Object)env.getExecutionConfig()).thenReturn((Object)new ExecutionConfig());
        Mockito.when((Object)env.getUserCodeClassLoader()).thenReturn((Object)TestingUserCodeClassLoader.newBuilder().build());
        return env;
    }

    private static OperatorStateBackend recreateOperatorStateBackend(OperatorStateBackend oldOperatorStateBackend, AbstractStateBackend abstractStateBackend, Collection<OperatorStateHandle> toRestore) throws Exception {
        oldOperatorStateBackend.close();
        oldOperatorStateBackend.dispose();
        Environment env = OperatorStateBackendTest.createMockEnvironment();
        return abstractStateBackend.createOperatorStateBackend((StateBackend.OperatorStateBackendParameters)new OperatorStateBackendParametersImpl(env, "testOperator", toRestore, new CloseableRegistry()));
    }

    private FileMergingSnapshotManager createFileMergingSnapshotManager(Path checkpointBaseDir, Path sharedStateDir, Path taskOwnedStateDir, FileMergingSnapshotManager.SubtaskKey subtaskKey) {
        FileMergingSnapshotManager mgr = new FileMergingSnapshotManagerBuilder(JobID.fromHexString((String)subtaskKey.getJobIDString()), new ResourceID("test-1"), FileMergingType.MERGE_WITHIN_CHECKPOINT).build();
        mgr.initFileSystem((FileSystem)LocalFileSystem.getSharedInstance(), checkpointBaseDir, sharedStateDir, taskOwnedStateDir, 1024);
        mgr.registerSubtaskForSharedStates(subtaskKey);
        return mgr;
    }

    private static /* synthetic */ void lambda$testRegisterStates$1(OperatorStateBackend operatorStateBackend, ListStateDescriptor stateDescriptor3) throws Throwable {
        operatorStateBackend.getListState(stateDescriptor3);
    }

    private static /* synthetic */ void lambda$testRegisterStates$0(OperatorStateBackend operatorStateBackend, ListStateDescriptor stateDescriptor2) throws Throwable {
        operatorStateBackend.getUnionListState(stateDescriptor2);
    }

    static final class MutableType
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private int value;

        public MutableType() {
            this(0);
        }

        public MutableType(int value) {
            this.value = value;
        }

        public int getValue() {
            return this.value;
        }

        public void setValue(int value) {
            this.value = value;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            MutableType that = (MutableType)o;
            return this.value == that.value;
        }

        public int hashCode() {
            return this.value;
        }

        static MutableType of(int value) {
            return new MutableType(value);
        }
    }

    public static class VerifyingIntSerializerSnapshot
    extends SimpleTypeSerializerSnapshot<Integer> {
        public VerifyingIntSerializerSnapshot() {
            super(() -> new VerifyingIntSerializer(Thread.currentThread().getContextClassLoader(), new AtomicInteger()));
        }
    }

    private static final class VerifyingIntSerializer
    extends TypeSerializer<Integer> {
        private static final long serialVersionUID = -5344563614550163898L;
        private transient ClassLoader classLoader;
        private transient AtomicInteger atomicInteger;

        private VerifyingIntSerializer(ClassLoader classLoader, AtomicInteger atomicInteger) {
            this.classLoader = (ClassLoader)Preconditions.checkNotNull((Object)classLoader);
            this.atomicInteger = (AtomicInteger)Preconditions.checkNotNull((Object)atomicInteger);
        }

        public boolean isImmutableType() {
            return false;
        }

        public TypeSerializer<Integer> duplicate() {
            return this;
        }

        public Integer createInstance() {
            return 0;
        }

        public Integer copy(Integer from) {
            Assertions.assertThat((Object)this.classLoader).isEqualTo((Object)Thread.currentThread().getContextClassLoader());
            this.atomicInteger.incrementAndGet();
            return IntSerializer.INSTANCE.copy(from);
        }

        public Integer copy(Integer from, Integer reuse) {
            Assertions.assertThat((Object)this.classLoader).isEqualTo((Object)Thread.currentThread().getContextClassLoader());
            this.atomicInteger.incrementAndGet();
            return IntSerializer.INSTANCE.copy(from, reuse);
        }

        public int getLength() {
            return IntSerializer.INSTANCE.getLength();
        }

        public void serialize(Integer record, DataOutputView target) throws IOException {
            IntSerializer.INSTANCE.serialize(record, target);
        }

        public Integer deserialize(DataInputView source) throws IOException {
            return IntSerializer.INSTANCE.deserialize(source);
        }

        public Integer deserialize(Integer reuse, DataInputView source) throws IOException {
            return IntSerializer.INSTANCE.deserialize(reuse, source);
        }

        public void copy(DataInputView source, DataOutputView target) throws IOException {
            Assertions.assertThat((Object)this.classLoader).isEqualTo((Object)Thread.currentThread().getContextClassLoader());
            this.atomicInteger.incrementAndGet();
            IntSerializer.INSTANCE.copy(source, target);
        }

        public boolean equals(Object obj) {
            return obj instanceof VerifyingIntSerializer;
        }

        public int hashCode() {
            return ((Object)((Object)this)).getClass().hashCode();
        }

        public TypeSerializerSnapshot<Integer> snapshotConfiguration() {
            return new VerifyingIntSerializerSnapshot();
        }
    }
}

