/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.TestMemoryCheckpointOutputStream;
import org.junit.Assert;
import org.junit.Test;

public class OperatorStateOutputCheckpointStreamTest {
    private static final int STREAM_CAPACITY = 128;

    private static OperatorStateCheckpointOutputStream createStream() throws IOException {
        TestMemoryCheckpointOutputStream checkStream = new TestMemoryCheckpointOutputStream(128);
        return new OperatorStateCheckpointOutputStream((CheckpointStreamFactory.CheckpointStateOutputStream)checkStream);
    }

    private OperatorStateHandle writeAllTestKeyGroups(OperatorStateCheckpointOutputStream stream, int numPartitions) throws Exception {
        DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper((OutputStream)stream);
        for (int i = 0; i < numPartitions; ++i) {
            Assert.assertEquals((long)i, (long)stream.getNumberOfPartitions());
            stream.startNewPartition();
            dov.writeInt(i);
        }
        return stream.closeAndGetHandle();
    }

    @Test
    public void testCloseNotPropagated() throws Exception {
        OperatorStateCheckpointOutputStream stream = OperatorStateOutputCheckpointStreamTest.createStream();
        TestMemoryCheckpointOutputStream innerStream = (TestMemoryCheckpointOutputStream)stream.getDelegate();
        stream.close();
        Assert.assertFalse((boolean)innerStream.isClosed());
        innerStream.close();
    }

    @Test
    public void testEmptyOperatorStream() throws Exception {
        OperatorStateCheckpointOutputStream stream = OperatorStateOutputCheckpointStreamTest.createStream();
        TestMemoryCheckpointOutputStream innerStream = (TestMemoryCheckpointOutputStream)stream.getDelegate();
        OperatorStateHandle emptyHandle = stream.closeAndGetHandle();
        Assert.assertTrue((boolean)innerStream.isClosed());
        Assert.assertEquals((long)0L, (long)stream.getNumberOfPartitions());
        Assert.assertEquals(null, (Object)emptyHandle);
    }

    @Test
    public void testWriteReadRoundtrip() throws Exception {
        int numPartitions = 3;
        OperatorStateCheckpointOutputStream stream = OperatorStateOutputCheckpointStreamTest.createStream();
        OperatorStateHandle fullHandle = this.writeAllTestKeyGroups(stream, numPartitions);
        Assert.assertNotNull((Object)fullHandle);
        Map stateNameToPartitionOffsets = fullHandle.getStateNameToPartitionOffsets();
        for (Map.Entry entry : stateNameToPartitionOffsets.entrySet()) {
            Assert.assertEquals((Object)OperatorStateHandle.Mode.SPLIT_DISTRIBUTE, (Object)((OperatorStateHandle.StateMetaInfo)entry.getValue()).getDistributionMode());
        }
        OperatorStateOutputCheckpointStreamTest.verifyRead(fullHandle, numPartitions);
    }

    private static void verifyRead(OperatorStateHandle fullHandle, int numPartitions) throws IOException {
        int count = 0;
        try (FSDataInputStream in = fullHandle.openInputStream();){
            OperatorStateHandle.StateMetaInfo metaInfo = (OperatorStateHandle.StateMetaInfo)fullHandle.getStateNameToPartitionOffsets().get("_default_");
            long[] offsets = metaInfo.getOffsets();
            Assert.assertNotNull((Object)offsets);
            DataInputViewStreamWrapper div = new DataInputViewStreamWrapper((InputStream)in);
            for (int i = 0; i < numPartitions; ++i) {
                in.seek(offsets[i]);
                Assert.assertEquals((long)i, (long)div.readInt());
                ++count;
            }
        }
        Assert.assertEquals((long)numPartitions, (long)count);
    }
}

