package org.apache.flink.runtime.state.filesystem;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.Random;
import java.util.UUID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.EntropyInjector;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile;
import org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStreamTest.class */
public class FileMergingCheckpointStateOutputStreamTest {

    @Rule
    public final TemporaryFolder tempDir = new TemporaryFolder();
    private static final String CLOSE_FILE_FAILURE_MESSAGE = "Cannot close physical file.";
    private static final int WRITE_BUFFER_SIZE = 256;
    private static boolean physicalFileCanBeReused;
    private static PhysicalFile lastPhysicalFile;
    private static boolean failWhenClosePhysicalFile = false;
    private static boolean isPhysicalFileProvided = false;

    @Before
    public void setEnv() {
        failWhenClosePhysicalFile = false;
        physicalFileCanBeReused = false;
    }

    private FileMergingCheckpointStateOutputStream getNewStream() throws IOException {
        return getNewStream(false);
    }

    private FileMergingCheckpointStateOutputStream getNewStream(boolean z) throws IOException {
        PhysicalFile physicalFile;
        if (z) {
            Assertions.assertThat(lastPhysicalFile).isNotNull();
            physicalFile = lastPhysicalFile;
        } else {
            Path fromLocalFile = Path.fromLocalFile(this.tempDir.newFolder());
            Path path = new Path(fromLocalFile, UUID.randomUUID().toString());
            physicalFile = new PhysicalFile(EntropyInjector.createEntropyAware(fromLocalFile.getFileSystem(), path, FileSystem.WriteMode.NO_OVERWRITE).stream(), path, path2 -> {
            }, CheckpointedStateScope.EXCLUSIVE);
        }
        isPhysicalFileProvided = false;
        final PhysicalFile physicalFile2 = physicalFile;
        return new FileMergingCheckpointStateOutputStream(WRITE_BUFFER_SIZE, new FileMergingCheckpointStateOutputStream.FileMergingSnapshotManagerProxy() { // from class: org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStreamTest.1
            public Tuple2<FSDataOutputStream, Path> providePhysicalFile() {
                boolean unused = FileMergingCheckpointStateOutputStreamTest.isPhysicalFileProvided = true;
                PhysicalFile unused2 = FileMergingCheckpointStateOutputStreamTest.lastPhysicalFile = physicalFile2;
                Preconditions.checkArgument(physicalFile2.isOpen());
                return new Tuple2<>(physicalFile2.getOutputStream(), physicalFile2.getFilePath());
            }

            public SegmentFileStateHandle closeStreamAndCreateStateHandle(Path path3, long j, long j2) throws IOException {
                if (FileMergingCheckpointStateOutputStreamTest.isPhysicalFileProvided) {
                    if (FileMergingCheckpointStateOutputStreamTest.failWhenClosePhysicalFile) {
                        throw new IOException(FileMergingCheckpointStateOutputStreamTest.CLOSE_FILE_FAILURE_MESSAGE);
                    }
                    if (!FileMergingCheckpointStateOutputStreamTest.physicalFileCanBeReused) {
                        physicalFile2.close();
                    }
                }
                return new SegmentFileStateHandle(path3, j, j2, CheckpointedStateScope.EXCLUSIVE, LogicalFile.LogicalFileId.generateRandomId());
            }

            public void closeStreamExceptionally() throws IOException {
                if (FileMergingCheckpointStateOutputStreamTest.isPhysicalFileProvided) {
                    if (FileMergingCheckpointStateOutputStreamTest.failWhenClosePhysicalFile) {
                        throw new IOException(FileMergingCheckpointStateOutputStreamTest.CLOSE_FILE_FAILURE_MESSAGE);
                    }
                    physicalFile2.close();
                }
            }
        });
    }

    @Test
    public void testGetHandleFromStream() throws IOException {
        FileMergingCheckpointStateOutputStream newStream = getNewStream();
        Assertions.assertThat(isPhysicalFileProvided).isFalse();
        Assertions.assertThat(newStream.closeAndGetHandle()).isNull();
        FileMergingCheckpointStateOutputStream newStream2 = getNewStream();
        newStream2.flush();
        Assertions.assertThat(isPhysicalFileProvided).isFalse();
        Assertions.assertThat(newStream2.closeAndGetHandle()).isNull();
        FileMergingCheckpointStateOutputStream newStream3 = getNewStream();
        newStream3.flushToFile();
        Assertions.assertThat(isPhysicalFileProvided).isTrue();
        SegmentFileStateHandle closeAndGetHandle = newStream3.closeAndGetHandle();
        Assertions.assertThat(closeAndGetHandle).isNotNull();
        Assertions.assertThat(closeAndGetHandle.getStateSize()).isEqualTo(0L);
        FileMergingCheckpointStateOutputStream newStream4 = getNewStream();
        newStream4.write(new byte[0]);
        newStream4.flushToFile();
        SegmentFileStateHandle closeAndGetHandle2 = newStream4.closeAndGetHandle();
        Assertions.assertThat(closeAndGetHandle2).isNotNull();
        Assertions.assertThat(closeAndGetHandle2.getStateSize()).isEqualTo(0L);
        FileMergingCheckpointStateOutputStream newStream5 = getNewStream();
        newStream5.write(new byte[10]);
        newStream5.flushToFile();
        SegmentFileStateHandle closeAndGetHandle3 = newStream5.closeAndGetHandle();
        Assertions.assertThat(closeAndGetHandle3).isNotNull();
        Assertions.assertThat(closeAndGetHandle3.getStateSize()).isEqualTo(10L);
        FileMergingCheckpointStateOutputStream newStream6 = getNewStream();
        newStream6.write(new byte[10]);
        SegmentFileStateHandle closeAndGetHandle4 = newStream6.closeAndGetHandle();
        Assertions.assertThat(closeAndGetHandle4).isNotNull();
        Assertions.assertThat(closeAndGetHandle4.getStateSize()).isEqualTo(10L);
    }

    @Test
    public void testGetHandleFromClosedStream() throws IOException {
        FileMergingCheckpointStateOutputStream newStream = getNewStream();
        newStream.close();
        try {
            newStream.closeAndGetHandle();
        } catch (Exception e) {
        }
    }

    @Test
    public void testWhetherFileIsCreatedWhenWritingStream() throws IOException {
        FileMergingCheckpointStateOutputStream newStream = getNewStream();
        newStream.write(new byte[255]);
        Assertions.assertThat(isPhysicalFileProvided).isFalse();
        newStream.write(new byte[2]);
        Assertions.assertThat(isPhysicalFileProvided).isTrue();
        getNewStream().write(new byte[WRITE_BUFFER_SIZE]);
        Assertions.assertThat(isPhysicalFileProvided).isTrue();
        FileMergingCheckpointStateOutputStream newStream2 = getNewStream();
        newStream2.write(new byte[255]);
        newStream2.close();
        Assertions.assertThat(isPhysicalFileProvided).isFalse();
        FileMergingCheckpointStateOutputStream newStream3 = getNewStream();
        newStream3.write(new byte[255]);
        newStream3.closeAndGetHandle();
        Assertions.assertThat(isPhysicalFileProvided).isTrue();
    }

    @Test
    public void testCloseStream() throws IOException {
        FileMergingCheckpointStateOutputStream newStream = getNewStream();
        newStream.flushToFile();
        newStream.close();
        newStream.write(new byte[0]);
        try {
            newStream.write(new byte[1]);
        } catch (IOException e) {
            Assertions.assertThat(e.getMessage()).isEqualTo("Cannot call flushToFile() to a closed stream.");
        }
        failWhenClosePhysicalFile = true;
        FileMergingCheckpointStateOutputStream newStream2 = getNewStream();
        newStream2.flushToFile();
        Assertions.assertThat(isPhysicalFileProvided).isTrue();
        newStream2.close();
        FileMergingCheckpointStateOutputStream newStream3 = getNewStream();
        newStream3.flushToFile();
        Assertions.assertThat(isPhysicalFileProvided).isTrue();
        try {
            newStream3.closeAndGetHandle();
        } catch (IOException e2) {
            if (!e2.getMessage().equals(CLOSE_FILE_FAILURE_MESSAGE)) {
                throw e2;
            }
        }
    }

    @Test
    public void testStateAboveBufferSize() throws Exception {
        runTest(576446);
    }

    @Test
    public void testStateUnderBufferSize() throws Exception {
        runTest(100);
    }

    @Test
    public void testGetPos() throws Exception {
        FileMergingCheckpointStateOutputStream newStream = getNewStream();
        for (int i = 0; i < 64; i++) {
            Assertions.assertThat(newStream.getPos()).isEqualTo(i);
            newStream.write(66);
        }
        newStream.closeAndGetHandle();
        FileMergingCheckpointStateOutputStream newStream2 = getNewStream();
        Random random = new Random();
        long j = 0;
        for (int i2 = 0; i2 < 7; i2++) {
            int nextInt = random.nextInt(16);
            j += nextInt;
            newStream2.write(new byte[nextInt]);
            Assertions.assertThat(newStream2.getPos()).isEqualTo(j);
        }
        physicalFileCanBeReused = true;
        Assertions.assertThat(newStream2.closeAndGetHandle()).isNotNull();
        long j2 = 0;
        FileMergingCheckpointStateOutputStream newStream3 = getNewStream(true);
        newStream3.flushToFile();
        for (int i3 = 0; i3 < 7; i3++) {
            int nextInt2 = random.nextInt(16);
            j2 += nextInt2;
            newStream3.write(new byte[nextInt2]);
            Assertions.assertThat(newStream3.getPos()).isEqualTo(j2);
        }
        newStream3.closeAndGetHandle();
    }

    @Test
    public void testCannotReuseClosedFile() throws IOException {
        FileMergingCheckpointStateOutputStream newStream = getNewStream();
        newStream.flushToFile();
        Assertions.assertThat(isPhysicalFileProvided).isTrue();
        newStream.close();
        try {
            getNewStream(true).flushToFile();
            Assertions.fail("Cannot reuse a closed physical file.");
        } catch (IllegalArgumentException e) {
        }
    }

    @Test
    public void testWriteFailsFastWhenClosed() throws Exception {
        FileMergingCheckpointStateOutputStream newStream = getNewStream();
        newStream.flushToFile();
        Assertions.assertThat(isPhysicalFileProvided).isTrue();
        newStream.close();
        try {
            newStream.write(1);
            Assertions.fail("Cannot reuse a closed physical file.");
        } catch (IOException e) {
        }
    }

    private void runTest(int i) throws Exception {
        FileMergingCheckpointStateOutputStream newStream = getNewStream();
        Random random = new Random();
        byte[] bArr = new byte[i];
        byte[] bArr2 = new byte[bArr.length];
        random.nextBytes(bArr);
        System.arraycopy(bArr, 0, bArr2, 0, bArr.length);
        int i2 = 0;
        while (i2 < bArr2.length) {
            if (random.nextBoolean()) {
                int i3 = i2;
                i2++;
                newStream.write(bArr2[i3]);
            } else {
                int length = random.nextBoolean() ? bArr2.length - i2 : random.nextInt(bArr2.length - i2);
                newStream.write(bArr2, i2, length);
                i2 += length;
            }
        }
        StreamStateHandle closeAndGetHandle = newStream.closeAndGetHandle();
        Assertions.assertThat(closeAndGetHandle).isNotNull();
        Assertions.assertThat(bArr2).containsExactly(bArr);
        FSDataInputStream openInputStream = closeAndGetHandle.openInputStream();
        Throwable th = null;
        try {
            try {
                byte[] bArr3 = new byte[bArr2.length];
                new DataInputStream(openInputStream).readFully(bArr3);
                Assertions.assertThat(bArr3).containsExactly(bArr2);
                if (openInputStream != null) {
                    if (0 != 0) {
                        try {
                            openInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        openInputStream.close();
                    }
                }
                closeAndGetHandle.discardState();
            } finally {
            }
        } catch (Throwable th3) {
            if (openInputStream != null) {
                if (th != null) {
                    try {
                        openInputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    openInputStream.close();
                }
            }
            throw th3;
        }
    }
}
