package org.apache.flink.runtime.state.filesystem;

import java.io.File;
import java.io.IOException;
import org.apache.flink.core.fs.EntropyInjectingFileSystem;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
import org.apache.flink.runtime.state.filesystem.FsStateBackendEntropyTest;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.class */
public class FsCheckpointStreamFactoryTest {

    @Rule
    public final TemporaryFolder TMP = new TemporaryFolder();
    private Path exclusiveStateDir;
    private Path sharedStateDir;

    /* loaded from: input_file:org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest$DisabledEntropyFS.class */
    private static final class DisabledEntropyFS extends LocalFileSystem implements EntropyInjectingFileSystem {
        private DisabledEntropyFS() {
        }

        public String getEntropyInjectionKey() {
            return null;
        }

        public String generateEntropy() {
            return null;
        }
    }

    @Before
    public void createStateDirectories() throws IOException {
        this.exclusiveStateDir = Path.fromLocalFile(this.TMP.newFolder("exclusive"));
        this.sharedStateDir = Path.fromLocalFile(this.TMP.newFolder("shared"));
    }

    @Test
    public void testWriteFlushesIfAboveThreshold() throws IOException {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream createCheckpointStateOutputStream = createFactory(FileSystem.getLocalFileSystem(), 100, 100).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
        createCheckpointStateOutputStream.write(new byte[100]);
        File[] listFiles = new File(this.exclusiveStateDir.toUri()).listFiles();
        Assert.assertEquals(1L, listFiles.length);
        File file = listFiles[0];
        Assert.assertEquals(100, file.length());
        createCheckpointStateOutputStream.write(new byte[100 - 1]);
        createCheckpointStateOutputStream.write(127);
        Assert.assertEquals(100, file.length());
    }

    @Test
    public void testExclusiveStateHasRelativePathHandles() throws IOException {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream createCheckpointStateOutputStream = createFactory(FileSystem.getLocalFileSystem(), 0).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
        createCheckpointStateOutputStream.write(1657);
        RelativeFileStateHandle closeAndGetHandle = createCheckpointStateOutputStream.closeAndGetHandle();
        Assert.assertThat(closeAndGetHandle, Matchers.instanceOf(RelativeFileStateHandle.class));
        assertPathsEqual(this.exclusiveStateDir, closeAndGetHandle.getFilePath().getParent());
    }

    @Test
    public void testSharedStateHasAbsolutePathHandles() throws IOException {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream createCheckpointStateOutputStream = createFactory(FileSystem.getLocalFileSystem(), 0).createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
        createCheckpointStateOutputStream.write(0);
        FileStateHandle closeAndGetHandle = createCheckpointStateOutputStream.closeAndGetHandle();
        Assert.assertThat(closeAndGetHandle, Matchers.instanceOf(FileStateHandle.class));
        Assert.assertThat(closeAndGetHandle, Matchers.not(Matchers.instanceOf(RelativeFileStateHandle.class)));
        assertPathsEqual(this.sharedStateDir, closeAndGetHandle.getFilePath().getParent());
    }

    @Test
    public void testEntropyMakesExclusiveStateAbsolutePaths() throws IOException {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream createCheckpointStateOutputStream = createFactory(new FsStateBackendEntropyTest.TestEntropyAwareFs(), 0).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
        createCheckpointStateOutputStream.write(0);
        FileStateHandle closeAndGetHandle = createCheckpointStateOutputStream.closeAndGetHandle();
        Assert.assertThat(closeAndGetHandle, Matchers.instanceOf(FileStateHandle.class));
        Assert.assertThat(closeAndGetHandle, Matchers.not(Matchers.instanceOf(RelativeFileStateHandle.class)));
        assertPathsEqual(this.exclusiveStateDir, closeAndGetHandle.getFilePath().getParent());
    }

    @Test
    public void testFSWithDisabledEntropyHasRelativePaths() throws IOException {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream createCheckpointStateOutputStream = createFactory(new DisabledEntropyFS(), 0).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
        createCheckpointStateOutputStream.write(0);
        RelativeFileStateHandle closeAndGetHandle = createCheckpointStateOutputStream.closeAndGetHandle();
        Assert.assertThat(closeAndGetHandle, Matchers.instanceOf(RelativeFileStateHandle.class));
        assertPathsEqual(this.exclusiveStateDir, closeAndGetHandle.getFilePath().getParent());
    }

    @Test
    public void testFlushUnderThreshold() throws IOException {
        flushAndVerify(10, 10, true);
    }

    @Test
    public void testFlushAboveThreshold() throws IOException {
        flushAndVerify(10, 11, false);
    }

    private void flushAndVerify(int i, int i2, boolean z) throws IOException {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream createCheckpointStateOutputStream = createFactory(new FsStateBackendEntropyTest.TestEntropyAwareFs(), i).createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
        createCheckpointStateOutputStream.write(new byte[i2], 0, i2);
        createCheckpointStateOutputStream.flush();
        Assert.assertEquals(z ? 0L : 1L, new File(this.exclusiveStateDir.toUri()).listFiles().length);
    }

    private static void assertPathsEqual(Path path, Path path2) {
        Assert.assertEquals(new Path(path.toString()), new Path(path2.toString()));
    }

    private FsCheckpointStreamFactory createFactory(FileSystem fileSystem, int i) {
        return createFactory(fileSystem, i, 4096);
    }

    private FsCheckpointStreamFactory createFactory(FileSystem fileSystem, int i, int i2) {
        return new FsCheckpointStreamFactory(fileSystem, this.exclusiveStateDir, this.sharedStateDir, i, i2);
    }
}
