/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
import org.apache.flink.runtime.io.network.partition.BufferWithSubpartition;
import org.apache.flink.runtime.io.network.partition.DataBufferTest;
import org.apache.flink.runtime.io.network.partition.PartitionedFile;
import org.apache.flink.runtime.io.network.partition.PartitionedFileReader;
import org.apache.flink.runtime.io.network.partition.PartitionedFileWriter;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

class PartitionedFileWriteReadTest {
    @TempDir
    private Path tempPath;
    private PartitionedFile partitionedFile;

    PartitionedFileWriteReadTest() {
    }

    @AfterEach
    void tearDown() {
        if (this.partitionedFile != null) {
            this.partitionedFile.deleteQuietly();
        }
    }

    @Test
    void testWriteAndReadPartitionedFile() throws Exception {
        int subpartition;
        int numSubpartitions = 10;
        int bufferSize = 1024;
        int numBuffers = 1000;
        int numRegions = 10;
        Random random = new Random(1111L);
        List[] buffersWritten = new List[numSubpartitions];
        List[] buffersRead = new List[numSubpartitions];
        List[] regionStat = new List[numSubpartitions];
        for (int subpartition2 = 0; subpartition2 < numSubpartitions; ++subpartition2) {
            buffersWritten[subpartition2] = new ArrayList();
            buffersRead[subpartition2] = new ArrayList();
            regionStat[subpartition2] = new ArrayList();
        }
        int[] writeOrder = DataBufferTest.getRandomSubpartitionOrder(numSubpartitions);
        this.createPartitionedFile(numSubpartitions, bufferSize, numBuffers, numRegions, buffersWritten, regionStat, this.createPartitionedFileWriter(numSubpartitions, writeOrder), subpartitionIndex -> subpartitionIndex, random.nextBoolean(), writeOrder);
        FileChannel dataFileChannel = this.openFileChannel(this.partitionedFile.getDataFilePath());
        FileChannel indexFileChannel = this.openFileChannel(this.partitionedFile.getIndexFilePath());
        for (subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
            PartitionedFileReader fileReader = new PartitionedFileReader(this.partitionedFile, new ResultSubpartitionIndexSet(subpartition), dataFileChannel, indexFileChannel, BufferReaderWriterUtil.allocatedHeaderBuffer(), PartitionedFileWriteReadTest.createAndConfigIndexEntryBuffer(), writeOrder[0]);
            while (fileReader.hasRemaining()) {
                int subIndex = subpartition;
                fileReader.readCurrentRegion(PartitionedFileWriteReadTest.allocateBuffers(bufferSize), FreeingBufferRecycler.INSTANCE, buffer -> this.addReadBuffer((Buffer)buffer, buffersRead[subIndex]));
            }
        }
        IOUtils.closeAllQuietly((AutoCloseable[])new AutoCloseable[]{dataFileChannel, indexFileChannel});
        for (subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
            Assertions.assertThat((List)buffersWritten[subpartition]).hasSameSizeAs((Iterable)buffersRead[subpartition]);
            for (int i = 0; i < buffersWritten[subpartition].size(); ++i) {
                this.assertBufferEquals((Buffer)buffersWritten[subpartition].get(i), (Buffer)buffersRead[subpartition].get(i));
            }
        }
    }

    @ParameterizedTest
    @CsvSource(value={"true, true", "true, false", "false, true", "false, false"})
    void testComputeReadablePosition(boolean randomSubpartitionOrder, boolean broadcastRegion) throws IOException {
        int[] nArray;
        int numSubpartitions = 10;
        int bufferSize = 1024;
        int numBuffers = 1000;
        int numRegions = 1;
        List[] buffersWritten = new List[numSubpartitions];
        List[] buffersRead = new List[numSubpartitions];
        List[] regionStat = new List[numSubpartitions];
        for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
            buffersWritten[subpartition] = new ArrayList();
            buffersRead[subpartition] = new ArrayList();
            regionStat[subpartition] = new ArrayList();
        }
        if (randomSubpartitionOrder) {
            nArray = DataBufferTest.getRandomSubpartitionOrder(numSubpartitions);
        } else {
            int[] nArray2 = new int[10];
            nArray2[0] = 0;
            nArray2[1] = 1;
            nArray2[2] = 2;
            nArray2[3] = 3;
            nArray2[4] = 4;
            nArray2[5] = 5;
            nArray2[6] = 6;
            nArray2[7] = 7;
            nArray2[8] = 8;
            nArray = nArray2;
            nArray2[9] = 9;
        }
        int[] writeOrder = nArray;
        this.createPartitionedFile(numSubpartitions, bufferSize, numBuffers, numRegions, buffersWritten, regionStat, this.createPartitionedFileWriter(numSubpartitions, writeOrder), subpartitionIndex -> subpartitionIndex, broadcastRegion, writeOrder);
        FileChannel dataFileChannel = this.openFileChannel(this.partitionedFile.getDataFilePath());
        FileChannel indexFileChannel = this.openFileChannel(this.partitionedFile.getIndexFilePath());
        this.verifyReadablePosition(0, numSubpartitions - 1, writeOrder[0], dataFileChannel, indexFileChannel, this.partitionedFile, regionStat, broadcastRegion);
        this.verifyReadablePosition(0, writeOrder[0], writeOrder[0], dataFileChannel, indexFileChannel, this.partitionedFile, regionStat, broadcastRegion);
        this.verifyReadablePosition(writeOrder[0], numSubpartitions - 1, writeOrder[0], dataFileChannel, indexFileChannel, this.partitionedFile, regionStat, broadcastRegion);
    }

    private void verifyReadablePosition(int start, int end, int subpartitionOrderRotationIndex, FileChannel dataFileChannel, FileChannel indexFileChannel, PartitionedFile partitionedFile, List<Tuple2<Long, Long>>[] regionStat, boolean isBroadcastRegion) throws IOException {
        PartitionedFileReader fileReader = new PartitionedFileReader(partitionedFile, new ResultSubpartitionIndexSet(start, end), dataFileChannel, indexFileChannel, BufferReaderWriterUtil.allocatedHeaderBuffer(), PartitionedFileWriteReadTest.createAndConfigIndexEntryBuffer(), subpartitionOrderRotationIndex);
        ArrayDeque offsetAndSizesToRead = new ArrayDeque();
        fileReader.updateReadableOffsetAndSize(PartitionedFileWriteReadTest.createAndConfigIndexEntryBuffer(), offsetAndSizesToRead);
        if (isBroadcastRegion) {
            Assertions.assertThat((Integer)offsetAndSizesToRead.stream().map(PartitionedFileReader.BufferPositionDescriptor::getRepeatCount).reduce(Integer::sum).get()).isEqualTo(end - start + 1);
            for (PartitionedFileReader.BufferPositionDescriptor descriptor : offsetAndSizesToRead) {
                Assertions.assertThat((long)descriptor.getOffset()).isEqualTo(regionStat[start].get((int)0).f0);
                Assertions.assertThat((long)descriptor.getSize()).isEqualTo(regionStat[start].get((int)0).f1);
            }
            return;
        }
        if (start >= subpartitionOrderRotationIndex || end <= subpartitionOrderRotationIndex - 1) {
            Assertions.assertThat(offsetAndSizesToRead).hasSize(1);
            PartitionedFileReader.BufferPositionDescriptor descriptor = (PartitionedFileReader.BufferPositionDescriptor)offsetAndSizesToRead.poll();
            Assertions.assertThat((long)descriptor.getOffset()).isEqualTo(regionStat[start].get((int)0).f0);
            long expectedSize = 0L;
            for (int i = start; i <= end; ++i) {
                expectedSize += ((Long)regionStat[i].get((int)0).f1).longValue();
            }
            Assertions.assertThat((long)descriptor.getSize()).isEqualTo(expectedSize);
        } else {
            int i;
            Assertions.assertThat(offsetAndSizesToRead).hasSize(2);
            PartitionedFileReader.BufferPositionDescriptor descriptor1 = (PartitionedFileReader.BufferPositionDescriptor)offsetAndSizesToRead.poll();
            PartitionedFileReader.BufferPositionDescriptor descriptor2 = (PartitionedFileReader.BufferPositionDescriptor)offsetAndSizesToRead.poll();
            Assertions.assertThat((long)descriptor1.getOffset()).isEqualTo(regionStat[subpartitionOrderRotationIndex].get((int)0).f0);
            Assertions.assertThat((long)descriptor2.getOffset()).isEqualTo(regionStat[start].get((int)0).f0);
            long expectedSize = 0L;
            for (i = subpartitionOrderRotationIndex; i <= end; ++i) {
                expectedSize += ((Long)regionStat[i].get((int)0).f1).longValue();
            }
            Assertions.assertThat((long)descriptor1.getSize()).isEqualTo(expectedSize);
            expectedSize = 0L;
            for (i = start; i < subpartitionOrderRotationIndex; ++i) {
                expectedSize += ((Long)regionStat[i].get((int)0).f1).longValue();
            }
            Assertions.assertThat((long)descriptor2.getSize()).isEqualTo(expectedSize);
        }
    }

    @Test
    void testWriteAndReadPartitionedFileForSubpartitionRange() throws Exception {
        int subpartition;
        int numSubpartitions = 10;
        int bufferSize = 1024;
        int numBuffers = 1000;
        int numRegions = 10;
        List[] buffersWritten = new List[numSubpartitions];
        List[] buffersRead = new List[numSubpartitions / 2];
        List[] regionStat = new List[numSubpartitions];
        for (int subpartition2 = 0; subpartition2 < numSubpartitions; ++subpartition2) {
            if (subpartition2 % 2 == 0) {
                buffersWritten[subpartition2 / 2] = new ArrayList();
                buffersRead[subpartition2 / 2] = new ArrayList();
            }
            regionStat[subpartition2] = new ArrayList();
        }
        int[] writeOrder = DataBufferTest.getRandomSubpartitionOrder(numSubpartitions);
        this.createPartitionedFile(numSubpartitions, bufferSize, numBuffers, numRegions, buffersWritten, regionStat, this.createPartitionedFileWriter(numSubpartitions, writeOrder), subpartitionIndex -> subpartitionIndex / 2, false, writeOrder);
        FileChannel dataFileChannel = this.openFileChannel(this.partitionedFile.getDataFilePath());
        FileChannel indexFileChannel = this.openFileChannel(this.partitionedFile.getIndexFilePath());
        for (subpartition = 0; subpartition < numSubpartitions; subpartition += 2) {
            PartitionedFileReader fileReader = new PartitionedFileReader(this.partitionedFile, new ResultSubpartitionIndexSet(subpartition, subpartition + 1), dataFileChannel, indexFileChannel, BufferReaderWriterUtil.allocatedHeaderBuffer(), PartitionedFileWriteReadTest.createAndConfigIndexEntryBuffer(), writeOrder[0]);
            while (fileReader.hasRemaining()) {
                int subIndex = subpartition;
                fileReader.readCurrentRegion(PartitionedFileWriteReadTest.allocateBuffers(bufferSize), FreeingBufferRecycler.INSTANCE, buffer -> this.addReadBuffer((Buffer)buffer, buffersRead[subIndex / 2]));
            }
        }
        IOUtils.closeAllQuietly((AutoCloseable[])new AutoCloseable[]{dataFileChannel, indexFileChannel});
        for (subpartition = 0; subpartition < numSubpartitions; subpartition += 2) {
            Assertions.assertThat((List)buffersWritten[subpartition / 2]).hasSameSizeAs((Iterable)buffersRead[subpartition / 2]);
            for (int i = 0; i < buffersRead[subpartition / 2].size(); ++i) {
                this.assertBufferEquals((Buffer)buffersWritten[subpartition / 2].get(i), (Buffer)buffersRead[subpartition / 2].get(i));
            }
        }
    }

    private void createPartitionedFile(int numSubpartitions, int bufferSize, int numBuffers, int numRegions, List<Buffer>[] buffersWritten, List<Tuple2<Long, Long>>[] regionStat, PartitionedFileWriter fileWriter, Function<Integer, Integer> writtenIndexRetriever, boolean isBroadcastRegion, int[] writeOrder) throws IOException {
        Random random = new Random(1111L);
        long currentOffset = 0L;
        block0: for (int region = 0; region < numRegions; ++region) {
            int i;
            fileWriter.startNewRegion(isBroadcastRegion);
            List[] bufferWithSubpartitions = new List[numSubpartitions];
            for (i = 0; i < numSubpartitions; ++i) {
                bufferWithSubpartitions[i] = new ArrayList();
            }
            for (i = 0; i < numBuffers; ++i) {
                int subpartition;
                Buffer buffer = this.createBuffer(random, bufferSize);
                if (isBroadcastRegion) {
                    for (subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
                        bufferWithSubpartitions[subpartition].add(new BufferWithSubpartition(buffer, subpartition));
                    }
                    continue;
                }
                subpartition = random.nextInt(numSubpartitions);
                bufferWithSubpartitions[subpartition].add(new BufferWithSubpartition(buffer, subpartition));
            }
            for (int index = 0; index < numSubpartitions; ++index) {
                int subpartition = writeOrder[index];
                fileWriter.writeBuffers(bufferWithSubpartitions[subpartition]);
                List writtenBuffer = bufferWithSubpartitions[subpartition].stream().map(BufferWithSubpartition::getBuffer).collect(Collectors.toList());
                int writtenIndex = writtenIndexRetriever.apply(subpartition);
                buffersWritten[writtenIndex].addAll(writtenBuffer);
                long totalBytes = PartitionedFileWriteReadTest.getTotalBytes(bufferWithSubpartitions[subpartition]);
                if (isBroadcastRegion) {
                    for (int j = 0; j < numSubpartitions; ++j) {
                        regionStat[j].add((Tuple2<Long, Long>)Tuple2.of((Object)currentOffset, (Object)totalBytes));
                        if (j == writtenIndex) continue;
                        buffersWritten[j].addAll(writtenBuffer);
                    }
                    currentOffset += totalBytes;
                    continue block0;
                }
                regionStat[subpartition].add((Tuple2<Long, Long>)Tuple2.of((Object)currentOffset, (Object)totalBytes));
                currentOffset += totalBytes;
            }
        }
        this.partitionedFile = fileWriter.finish();
    }

    private static long getTotalBytes(List<BufferWithSubpartition> bufferWithSubpartitions) {
        long totalBytes = 0L;
        for (BufferWithSubpartition bufferWithSubpartition : bufferWithSubpartitions) {
            totalBytes += (long)(bufferWithSubpartition.getBuffer().readableBytes() + 8);
        }
        return totalBytes;
    }

    private void addReadBuffer(Buffer buffer, List<Buffer> buffersRead) {
        int numBytes = buffer.readableBytes();
        MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)numBytes);
        Buffer fullBuffer = ((CompositeBuffer)buffer).getFullBufferData(MemorySegmentFactory.allocateUnpooledSegment((int)numBytes));
        segment.put(0, fullBuffer.getNioBufferReadable(), fullBuffer.readableBytes());
        buffersRead.add((Buffer)new NetworkBuffer(segment, ignore -> {}, fullBuffer.getDataType(), fullBuffer.isCompressed(), fullBuffer.readableBytes()));
        fullBuffer.recycleBuffer();
    }

    private static Queue<MemorySegment> allocateBuffers(int bufferSize) {
        return PartitionedFileWriteReadTest.allocateBuffers(bufferSize, 2);
    }

    private static Queue<MemorySegment> allocateBuffers(int bufferSize, int numBuffers) {
        LinkedList<MemorySegment> readBuffers = new LinkedList<MemorySegment>();
        while (numBuffers-- > 0) {
            readBuffers.add(MemorySegmentFactory.allocateUnpooledSegment((int)bufferSize));
        }
        return readBuffers;
    }

    @Test
    void testWriteAndReadWithEmptySubpartition() throws Exception {
        int numRegions = 10;
        int numSubpartitions = 5;
        int bufferSize = 1024;
        Random random = new Random(1111L);
        ArrayDeque[] subpartitionBuffers = new ArrayDeque[numSubpartitions];
        List[] buffersRead = new List[numSubpartitions];
        for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
            subpartitionBuffers[subpartition] = new ArrayDeque();
            buffersRead[subpartition] = new ArrayList();
        }
        PartitionedFileWriter fileWriter = this.createPartitionedFileWriter(numSubpartitions, new int[]{0, 1, 2, 3, 4});
        for (int region = 0; region < numRegions; ++region) {
            fileWriter.startNewRegion(false);
            for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
                if (!random.nextBoolean()) continue;
                Buffer buffer2 = this.createBuffer(random, bufferSize);
                subpartitionBuffers[subpartition].add(buffer2);
                fileWriter.writeBuffers(this.getBufferWithSubpartitions(buffer2, subpartition));
            }
        }
        this.partitionedFile = fileWriter.finish();
        FileChannel dataFileChannel = this.openFileChannel(this.partitionedFile.getDataFilePath());
        FileChannel indexFileChannel = this.openFileChannel(this.partitionedFile.getIndexFilePath());
        for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
            PartitionedFileReader fileReader = new PartitionedFileReader(this.partitionedFile, new ResultSubpartitionIndexSet(subpartition), dataFileChannel, indexFileChannel, BufferReaderWriterUtil.allocatedHeaderBuffer(), PartitionedFileWriteReadTest.createAndConfigIndexEntryBuffer(), 0);
            int bufferIndex = 0;
            while (fileReader.hasRemaining()) {
                int subIndex = subpartition;
                fileReader.readCurrentRegion(PartitionedFileWriteReadTest.allocateBuffers(bufferSize), FreeingBufferRecycler.INSTANCE, buffer -> this.addReadBuffer((Buffer)buffer, buffersRead[subIndex]));
                Buffer buffer3 = (Buffer)buffersRead[subIndex].get(bufferIndex++);
                this.assertBufferEquals((Buffer)Preconditions.checkNotNull((Object)((Buffer)subpartitionBuffers[subpartition].poll())), buffer3);
            }
            Assertions.assertThat((Collection)subpartitionBuffers[subpartition]).isEmpty();
        }
        IOUtils.closeAllQuietly((AutoCloseable[])new AutoCloseable[]{dataFileChannel, indexFileChannel});
    }

    @Test
    void testWriteAndReadWithEmptySubpartitionForMultipleSubpartitions() throws Exception {
        int numRegions = 10;
        int numSubpartitions = 5;
        int bufferSize = 1024;
        Random random = new Random();
        ArrayDeque[] subpartitionBuffers = new ArrayDeque[numRegions];
        List[] buffersRead = new List[numRegions];
        for (int region = 0; region < numRegions; ++region) {
            subpartitionBuffers[region] = new ArrayDeque();
            buffersRead[region] = new ArrayList();
        }
        int[] writeOrder = new int[]{0, 1, 2, 3, 4};
        PartitionedFileWriter fileWriter = this.createPartitionedFileWriter(numSubpartitions, writeOrder);
        for (int region = 0; region < numRegions; ++region) {
            fileWriter.startNewRegion(false);
            for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
                if (!random.nextBoolean()) continue;
                Buffer buffer2 = this.createBuffer(random, bufferSize);
                subpartitionBuffers[region].add(buffer2);
                fileWriter.writeBuffers(this.getBufferWithSubpartitions(buffer2, subpartition));
            }
        }
        this.partitionedFile = fileWriter.finish();
        FileChannel dataFileChannel = this.openFileChannel(this.partitionedFile.getDataFilePath());
        FileChannel indexFileChannel = this.openFileChannel(this.partitionedFile.getIndexFilePath());
        PartitionedFileReader fileReader = new PartitionedFileReader(this.partitionedFile, new ResultSubpartitionIndexSet(0, numSubpartitions - 1), dataFileChannel, indexFileChannel, BufferReaderWriterUtil.allocatedHeaderBuffer(), PartitionedFileWriteReadTest.createAndConfigIndexEntryBuffer(), writeOrder[0]);
        int regionIndex = 0;
        while (fileReader.hasRemaining()) {
            if (subpartitionBuffers[regionIndex].isEmpty()) {
                ++regionIndex;
                continue;
            }
            int finalRegionIndex = regionIndex;
            fileReader.readCurrentRegion(PartitionedFileWriteReadTest.allocateBuffers(bufferSize, 10), FreeingBufferRecycler.INSTANCE, buffer -> this.addReadBuffer((Buffer)buffer, buffersRead[finalRegionIndex]));
            for (Buffer buffer3 : buffersRead[finalRegionIndex]) {
                this.assertBufferEquals((Buffer)Preconditions.checkNotNull((Object)((Buffer)subpartitionBuffers[finalRegionIndex].poll())), buffer3);
            }
            Assertions.assertThat((Collection)subpartitionBuffers[finalRegionIndex]).isEmpty();
            ++regionIndex;
        }
        IOUtils.closeAllQuietly((AutoCloseable[])new AutoCloseable[]{dataFileChannel, indexFileChannel});
    }

    private void assertBufferEquals(Buffer expected, Buffer actual) {
        Assertions.assertThat((Comparable)expected.getDataType()).isEqualTo((Object)actual.getDataType());
        Assertions.assertThat((Comparable)expected.getNioBufferReadable()).isEqualTo((Object)actual.getNioBufferReadable());
    }

    private Buffer createBuffer(Random random, int bufferSize) {
        boolean isBuffer = random.nextBoolean();
        Buffer.DataType dataType = isBuffer ? Buffer.DataType.DATA_BUFFER : Buffer.DataType.EVENT_BUFFER;
        int dataSize = random.nextInt(bufferSize) + 1;
        byte[] data = new byte[dataSize];
        return new NetworkBuffer(MemorySegmentFactory.wrap((byte[])data), buf -> {}, dataType, dataSize);
    }

    @Test
    void testNotWriteDataOfTheSameSubpartitionTogether() throws Exception {
        PartitionedFileWriter partitionedFileWriter = this.createPartitionedFileWriter(2, new int[]{1, 0});
        try {
            MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)1024);
            Assertions.assertThatThrownBy(() -> {
                NetworkBuffer buffer1 = new NetworkBuffer(segment, buf -> {});
                partitionedFileWriter.writeBuffers(this.getBufferWithSubpartitions((Buffer)buffer1, 1));
                NetworkBuffer buffer2 = new NetworkBuffer(segment, buf -> {});
                partitionedFileWriter.writeBuffers(this.getBufferWithSubpartitions((Buffer)buffer2, 0));
                NetworkBuffer buffer3 = new NetworkBuffer(segment, buf -> {});
                partitionedFileWriter.writeBuffers(this.getBufferWithSubpartitions((Buffer)buffer3, 1));
            }).isInstanceOf(IllegalStateException.class);
        }
        finally {
            this.partitionedFile = partitionedFileWriter.finish();
        }
    }

    @Test
    void testWriteFinishedPartitionedFile() throws Exception {
        PartitionedFileWriter partitionedFileWriter = this.createAndFinishPartitionedFileWriter();
        MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)1024);
        NetworkBuffer buffer = new NetworkBuffer(segment, buf -> {});
        Assertions.assertThatThrownBy(() -> partitionedFileWriter.writeBuffers(this.getBufferWithSubpartitions((Buffer)buffer, 0))).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testFinishPartitionedFileWriterTwice() throws Exception {
        PartitionedFileWriter partitionedFileWriter = this.createAndFinishPartitionedFileWriter();
        Assertions.assertThatThrownBy(() -> partitionedFileWriter.finish()).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testReadEmptyPartitionedFile() throws Exception {
        int bufferSize = 1024;
        int numSubpartitions = 2;
        int targetSubpartition = 1;
        this.createEmptyPartitionedFile();
        List[] buffersRead = new List[numSubpartitions];
        for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
            buffersRead[subpartition] = new ArrayList();
        }
        FileChannel dataFileChannel = this.openFileChannel(this.partitionedFile.getDataFilePath());
        FileChannel indexFileChannel = this.openFileChannel(this.partitionedFile.getIndexFilePath());
        PartitionedFileReader partitionedFileReader = new PartitionedFileReader(this.partitionedFile, new ResultSubpartitionIndexSet(1), dataFileChannel, indexFileChannel, BufferReaderWriterUtil.allocatedHeaderBuffer(), PartitionedFileWriteReadTest.createAndConfigIndexEntryBuffer(), 0);
        partitionedFileReader.readCurrentRegion(PartitionedFileWriteReadTest.allocateBuffers(bufferSize), FreeingBufferRecycler.INSTANCE, buffer -> this.addReadBuffer((Buffer)buffer, buffersRead[targetSubpartition]));
        Assertions.assertThat((List)buffersRead[targetSubpartition]).isEmpty();
        IOUtils.closeAllQuietly((AutoCloseable[])new AutoCloseable[]{dataFileChannel, indexFileChannel});
    }

    @Test
    void testMultipleThreadGetIndexEntry() throws Exception {
        int numSubpartitions = 5;
        int bufferSize = 1024;
        int numBuffers = 100;
        int numRegions = 10;
        Random random = new Random(1111L);
        List[] buffersWritten = new List[5];
        List[] buffersRead = new List[5];
        final List[] regionStat = new List[5];
        for (int subpartition = 0; subpartition < 5; ++subpartition) {
            buffersWritten[subpartition] = new ArrayList();
            buffersRead[subpartition] = new ArrayList();
            regionStat[subpartition] = new ArrayList();
        }
        int[] writeOrder = DataBufferTest.getRandomSubpartitionOrder(5);
        this.createPartitionedFile(5, 1024, 100, 10, buffersWritten, regionStat, this.createPartitionedFileWriter(5, 80, 80, writeOrder), subpartitionIndex -> subpartitionIndex, random.nextBoolean(), writeOrder);
        FileChannel dataFileChannel = this.openFileChannel(this.partitionedFile.getDataFilePath());
        final FileChannel indexFileChannel = this.openFileChannel(this.partitionedFile.getIndexFilePath());
        CheckedThread[] readers = new CheckedThread[5];
        for (int i = 0; i < 5; ++i) {
            final int subpartition = i;
            readers[i] = new CheckedThread(){

                public void go() throws Exception {
                    ByteBuffer indexEntryBuffer = PartitionedFileWriteReadTest.createAndConfigIndexEntryBuffer();
                    for (int region = 0; region < 10; ++region) {
                        PartitionedFileWriteReadTest.this.partitionedFile.getIndexEntry(indexFileChannel, indexEntryBuffer, region, subpartition);
                        long offset = indexEntryBuffer.getLong();
                        long regionBytes = indexEntryBuffer.getLong();
                        Assertions.assertThat((long)offset).isEqualTo(((Tuple2)regionStat[subpartition].get((int)region)).f0);
                        Assertions.assertThat((long)regionBytes).isEqualTo(((Tuple2)regionStat[subpartition].get((int)region)).f1);
                    }
                }
            };
        }
        for (CheckedThread reader : readers) {
            reader.start();
        }
        for (CheckedThread reader : readers) {
            reader.sync();
        }
        IOUtils.closeAllQuietly((AutoCloseable[])new AutoCloseable[]{dataFileChannel, indexFileChannel});
    }

    private FileChannel openFileChannel(Path path) throws IOException {
        return FileChannel.open(path, StandardOpenOption.READ);
    }

    private List<BufferWithSubpartition> getBufferWithSubpartitions(Buffer buffer, int subpartitionIndex) {
        return Collections.singletonList(new BufferWithSubpartition(buffer, subpartitionIndex));
    }

    private void createEmptyPartitionedFile() throws IOException {
        PartitionedFileWriter partitionedFileWriter = this.createPartitionedFileWriter(2, new int[0]);
        this.partitionedFile = partitionedFileWriter.finish();
    }

    private PartitionedFileWriter createPartitionedFileWriter(int numSubpartitions, int[] writeOrder) throws IOException {
        return this.createPartitionedFileWriter(numSubpartitions, 640, writeOrder);
    }

    private PartitionedFileWriter createPartitionedFileWriter(int numSubpartitions, int minIndexBufferSize, int maxIndexBufferSize, int[] writeOrder) throws IOException {
        return new PartitionedFileWriter(numSubpartitions, minIndexBufferSize, maxIndexBufferSize, this.tempPath.toString(), writeOrder);
    }

    private PartitionedFileWriter createPartitionedFileWriter(int numSubpartitions, int maxIndexBufferSize, int[] writeOrder) throws IOException {
        return new PartitionedFileWriter(numSubpartitions, maxIndexBufferSize, this.tempPath.toString(), writeOrder);
    }

    private PartitionedFileWriter createAndFinishPartitionedFileWriter() throws IOException {
        PartitionedFileWriter partitionedFileWriter = this.createPartitionedFileWriter(1, new int[0]);
        this.partitionedFile = partitionedFileWriter.finish();
        return partitionedFileWriter;
    }

    public static ByteBuffer createAndConfigIndexEntryBuffer() {
        ByteBuffer indexEntryBuffer = ByteBuffer.allocateDirect(16);
        BufferReaderWriterUtil.configureByteBuffer((ByteBuffer)indexEntryBuffer);
        return indexEntryBuffer;
    }
}

