/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.filesystem;

import java.io.File;
import java.io.IOException;
import org.apache.flink.core.fs.EntropyInjectingFileSystem;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
import org.apache.flink.runtime.state.filesystem.FsStateBackendEntropyTest;
import org.apache.flink.runtime.state.filesystem.RelativeFileStateHandle;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class FsCheckpointStreamFactoryTest {
    @Rule
    public final TemporaryFolder TMP = new TemporaryFolder();
    private Path exclusiveStateDir;
    private Path sharedStateDir;

    @Before
    public void createStateDirectories() throws IOException {
        this.exclusiveStateDir = Path.fromLocalFile((File)this.TMP.newFolder("exclusive"));
        this.sharedStateDir = Path.fromLocalFile((File)this.TMP.newFolder("shared"));
    }

    @Test
    public void testWriteFlushesIfAboveThreshold() throws IOException {
        int fileSizeThreshold = 100;
        FsCheckpointStreamFactory factory = this.createFactory(FileSystem.getLocalFileSystem(), fileSizeThreshold, fileSizeThreshold);
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
        stream.write(new byte[fileSizeThreshold]);
        File[] files = new File(this.exclusiveStateDir.toUri()).listFiles();
        Assert.assertEquals((long)1L, (long)files.length);
        File file = files[0];
        Assert.assertEquals((long)fileSizeThreshold, (long)file.length());
        stream.write(new byte[fileSizeThreshold - 1]);
        stream.write(127);
        Assert.assertEquals((long)fileSizeThreshold, (long)file.length());
    }

    @Test
    public void testExclusiveStateHasRelativePathHandles() throws IOException {
        FsCheckpointStreamFactory factory = this.createFactory(FileSystem.getLocalFileSystem(), 0);
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
        stream.write(1657);
        StreamStateHandle handle = stream.closeAndGetHandle();
        Assert.assertThat((Object)handle, (Matcher)Matchers.instanceOf(RelativeFileStateHandle.class));
        FsCheckpointStreamFactoryTest.assertPathsEqual(this.exclusiveStateDir, ((RelativeFileStateHandle)handle).getFilePath().getParent());
    }

    @Test
    public void testSharedStateHasAbsolutePathHandles() throws IOException {
        FsCheckpointStreamFactory factory = this.createFactory(FileSystem.getLocalFileSystem(), 0);
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = factory.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
        stream.write(0);
        StreamStateHandle handle = stream.closeAndGetHandle();
        Assert.assertThat((Object)handle, (Matcher)Matchers.instanceOf(FileStateHandle.class));
        Assert.assertThat((Object)handle, (Matcher)Matchers.not((Matcher)Matchers.instanceOf(RelativeFileStateHandle.class)));
        FsCheckpointStreamFactoryTest.assertPathsEqual(this.sharedStateDir, ((FileStateHandle)handle).getFilePath().getParent());
    }

    @Test
    public void testEntropyMakesExclusiveStateAbsolutePaths() throws IOException {
        FsCheckpointStreamFactory factory = this.createFactory((FileSystem)new FsStateBackendEntropyTest.TestEntropyAwareFs(), 0);
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
        stream.write(0);
        StreamStateHandle handle = stream.closeAndGetHandle();
        Assert.assertThat((Object)handle, (Matcher)Matchers.instanceOf(FileStateHandle.class));
        Assert.assertThat((Object)handle, (Matcher)Matchers.not((Matcher)Matchers.instanceOf(RelativeFileStateHandle.class)));
        FsCheckpointStreamFactoryTest.assertPathsEqual(this.exclusiveStateDir, ((FileStateHandle)handle).getFilePath().getParent());
    }

    @Test
    public void testFSWithDisabledEntropyHasRelativePaths() throws IOException {
        FsCheckpointStreamFactory factory = this.createFactory((FileSystem)new DisabledEntropyFS(), 0);
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
        stream.write(0);
        StreamStateHandle handle = stream.closeAndGetHandle();
        Assert.assertThat((Object)handle, (Matcher)Matchers.instanceOf(RelativeFileStateHandle.class));
        FsCheckpointStreamFactoryTest.assertPathsEqual(this.exclusiveStateDir, ((RelativeFileStateHandle)handle).getFilePath().getParent());
    }

    @Test
    public void testFlushUnderThreshold() throws IOException {
        this.flushAndVerify(10, 10, true);
    }

    @Test
    public void testFlushAboveThreshold() throws IOException {
        this.flushAndVerify(10, 11, false);
    }

    private void flushAndVerify(int minFileSize, int bytesToFlush, boolean expectEmpty) throws IOException {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = this.createFactory((FileSystem)new FsStateBackendEntropyTest.TestEntropyAwareFs(), minFileSize).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
        stream.write(new byte[bytesToFlush], 0, bytesToFlush);
        stream.flush();
        Assert.assertEquals((long)(expectEmpty ? 0L : 1L), (long)new File(this.exclusiveStateDir.toUri()).listFiles().length);
    }

    private static void assertPathsEqual(Path expected, Path actual) {
        Path reNormalizedExpected = new Path(expected.toString());
        Path reNormalizedActual = new Path(actual.toString());
        Assert.assertEquals((Object)reNormalizedExpected, (Object)reNormalizedActual);
    }

    private FsCheckpointStreamFactory createFactory(FileSystem fs, int fileSizeThreshold) {
        return this.createFactory(fs, fileSizeThreshold, 4096);
    }

    private FsCheckpointStreamFactory createFactory(FileSystem fs, int fileSizeThreshold, int bufferSize) {
        return new FsCheckpointStreamFactory(fs, this.exclusiveStateDir, this.sharedStateDir, fileSizeThreshold, bufferSize);
    }

    private static final class DisabledEntropyFS
    extends LocalFileSystem
    implements EntropyInjectingFileSystem {
        private DisabledEntropyFS() {
        }

        public String getEntropyInjectionKey() {
            return null;
        }

        public String generateEntropy() {
            return null;
        }
    }
}

