package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulatorTest;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.class */
class PartitionedFileWriteReadTest {

    @TempDir
    private Path tempPath;

    PartitionedFileWriteReadTest() {
    }

    @Test
    void testWriteAndReadPartitionedFile() throws Exception {
        List<Buffer>[] listArr = new List[10];
        List[] listArr2 = new List[10];
        List<Tuple2<Long, Long>>[] listArr3 = new List[10];
        for (int i = 0; i < 10; i++) {
            listArr[i] = new ArrayList();
            listArr2[i] = new ArrayList();
            listArr3[i] = new ArrayList();
        }
        PartitionedFile createPartitionedFile = createPartitionedFile(10, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, HashBufferAccumulatorTest.NUM_TOTAL_BUFFERS, 10, listArr, listArr3, createPartitionedFileWriter(10));
        FileChannel openFileChannel = openFileChannel(createPartitionedFile.getDataFilePath());
        FileChannel openFileChannel2 = openFileChannel(createPartitionedFile.getIndexFilePath());
        for (int i2 = 0; i2 < 10; i2++) {
            PartitionedFileReader partitionedFileReader = new PartitionedFileReader(createPartitionedFile, i2, openFileChannel, openFileChannel2, BufferReaderWriterUtil.allocatedHeaderBuffer(), createAndConfigIndexEntryBuffer());
            while (partitionedFileReader.hasRemaining()) {
                int i3 = i2;
                partitionedFileReader.readCurrentRegion(allocateBuffers(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE), FreeingBufferRecycler.INSTANCE, buffer -> {
                    addReadBuffer(buffer, listArr2[i3]);
                });
            }
        }
        IOUtils.closeAllQuietly(new AutoCloseable[]{openFileChannel, openFileChannel2});
        for (int i4 = 0; i4 < 10; i4++) {
            Assertions.assertThat(listArr[i4]).hasSameSizeAs(listArr2[i4]);
            for (int i5 = 0; i5 < listArr[i4].size(); i5++) {
                assertBufferEquals(listArr[i4].get(i5), (Buffer) listArr2[i4].get(i5));
            }
        }
    }

    private PartitionedFile createPartitionedFile(int i, int i2, int i3, int i4, List<Buffer>[] listArr, List<Tuple2<Long, Long>>[] listArr2, PartitionedFileWriter partitionedFileWriter) throws IOException {
        Random random = new Random(1111L);
        long j = 0;
        for (int i5 = 0; i5 < i4; i5++) {
            boolean nextBoolean = random.nextBoolean();
            partitionedFileWriter.startNewRegion(nextBoolean);
            List[] listArr3 = new List[i];
            for (int i6 = 0; i6 < i; i6++) {
                listArr3[i6] = new ArrayList();
            }
            for (int i7 = 0; i7 < i3; i7++) {
                Buffer createBuffer = createBuffer(random, i2);
                if (nextBoolean) {
                    for (int i8 = 0; i8 < i; i8++) {
                        listArr[i8].add(createBuffer);
                        listArr3[i8].add(new BufferWithSubpartition(createBuffer, i8));
                    }
                } else {
                    int nextInt = random.nextInt(i);
                    listArr[nextInt].add(createBuffer);
                    listArr3[nextInt].add(new BufferWithSubpartition(createBuffer, nextInt));
                }
            }
            int[] randomSubpartitionOrder = DataBufferTest.getRandomSubpartitionOrder(i);
            int i9 = 0;
            while (true) {
                if (i9 < i) {
                    int i10 = randomSubpartitionOrder[i9];
                    partitionedFileWriter.writeBuffers(listArr3[i10]);
                    long totalBytes = getTotalBytes(listArr3[i10]);
                    if (nextBoolean) {
                        for (int i11 = 0; i11 < i; i11++) {
                            listArr2[i11].add(Tuple2.of(Long.valueOf(j), Long.valueOf(totalBytes)));
                        }
                        j += totalBytes;
                    } else {
                        listArr2[i10].add(Tuple2.of(Long.valueOf(j), Long.valueOf(totalBytes)));
                        j += totalBytes;
                        i9++;
                    }
                }
            }
        }
        return partitionedFileWriter.finish();
    }

    private static long getTotalBytes(List<BufferWithSubpartition> list) {
        long j = 0;
        while (list.iterator().hasNext()) {
            j += r0.next().getBuffer().readableBytes() + 8;
        }
        return j;
    }

    private void addReadBuffer(Buffer buffer, List<Buffer> list) {
        int readableBytes = buffer.readableBytes();
        MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(readableBytes);
        Buffer fullBufferData = ((CompositeBuffer) buffer).getFullBufferData(MemorySegmentFactory.allocateUnpooledSegment(readableBytes));
        allocateUnpooledSegment.put(0, fullBufferData.getNioBufferReadable(), fullBufferData.readableBytes());
        list.add(new NetworkBuffer(allocateUnpooledSegment, memorySegment -> {
        }, fullBufferData.getDataType(), fullBufferData.isCompressed(), fullBufferData.readableBytes()));
        fullBufferData.recycleBuffer();
    }

    private static Queue<MemorySegment> allocateBuffers(int i) {
        int i2 = 2;
        LinkedList linkedList = new LinkedList();
        while (true) {
            int i3 = i2;
            i2--;
            if (i3 <= 0) {
                return linkedList;
            }
            linkedList.add(MemorySegmentFactory.allocateUnpooledSegment(i));
        }
    }

    @Test
    void testWriteAndReadWithEmptySubpartition() throws Exception {
        Random random = new Random(1111L);
        ArrayDeque[] arrayDequeArr = new ArrayDeque[5];
        List[] listArr = new List[5];
        for (int i = 0; i < 5; i++) {
            arrayDequeArr[i] = new ArrayDeque();
            listArr[i] = new ArrayList();
        }
        PartitionedFileWriter createPartitionedFileWriter = createPartitionedFileWriter(5);
        for (int i2 = 0; i2 < 10; i2++) {
            createPartitionedFileWriter.startNewRegion(false);
            for (int i3 = 0; i3 < 5; i3++) {
                if (random.nextBoolean()) {
                    Buffer createBuffer = createBuffer(random, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE);
                    arrayDequeArr[i3].add(createBuffer);
                    createPartitionedFileWriter.writeBuffers(getBufferWithSubpartitions(createBuffer, i3));
                }
            }
        }
        PartitionedFile finish = createPartitionedFileWriter.finish();
        FileChannel openFileChannel = openFileChannel(finish.getDataFilePath());
        FileChannel openFileChannel2 = openFileChannel(finish.getIndexFilePath());
        for (int i4 = 0; i4 < 5; i4++) {
            PartitionedFileReader partitionedFileReader = new PartitionedFileReader(finish, i4, openFileChannel, openFileChannel2, BufferReaderWriterUtil.allocatedHeaderBuffer(), createAndConfigIndexEntryBuffer());
            int i5 = 0;
            while (partitionedFileReader.hasRemaining()) {
                int i6 = i4;
                partitionedFileReader.readCurrentRegion(allocateBuffers(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE), FreeingBufferRecycler.INSTANCE, buffer -> {
                    addReadBuffer(buffer, listArr[i6]);
                });
                int i7 = i5;
                i5++;
                assertBufferEquals((Buffer) Preconditions.checkNotNull(arrayDequeArr[i4].poll()), (Buffer) listArr[i6].get(i7));
            }
            Assertions.assertThat(arrayDequeArr[i4]).isEmpty();
        }
        IOUtils.closeAllQuietly(new AutoCloseable[]{openFileChannel, openFileChannel2});
    }

    private void assertBufferEquals(Buffer buffer, Buffer buffer2) {
        Assertions.assertThat(buffer.getDataType()).isEqualTo(buffer2.getDataType());
        Assertions.assertThat(buffer.getNioBufferReadable()).isEqualTo(buffer2.getNioBufferReadable());
    }

    private Buffer createBuffer(Random random, int i) {
        Buffer.DataType dataType = random.nextBoolean() ? Buffer.DataType.DATA_BUFFER : Buffer.DataType.EVENT_BUFFER;
        int nextInt = random.nextInt(i) + 1;
        return new NetworkBuffer(MemorySegmentFactory.wrap(new byte[nextInt]), memorySegment -> {
        }, dataType, nextInt);
    }

    @Test
    void testNotWriteDataOfTheSameSubpartitionTogether() throws Exception {
        PartitionedFileWriter createPartitionedFileWriter = createPartitionedFileWriter(2);
        try {
            MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE);
            Assertions.assertThatThrownBy(() -> {
                createPartitionedFileWriter.writeBuffers(getBufferWithSubpartitions(new NetworkBuffer(allocateUnpooledSegment, memorySegment -> {
                }), 1));
                createPartitionedFileWriter.writeBuffers(getBufferWithSubpartitions(new NetworkBuffer(allocateUnpooledSegment, memorySegment2 -> {
                }), 0));
                createPartitionedFileWriter.writeBuffers(getBufferWithSubpartitions(new NetworkBuffer(allocateUnpooledSegment, memorySegment3 -> {
                }), 1));
            }).isInstanceOf(IllegalStateException.class);
        } finally {
            createPartitionedFileWriter.finish();
        }
    }

    @Test
    void testWriteFinishedPartitionedFile() throws Exception {
        PartitionedFileWriter createAndFinishPartitionedFileWriter = createAndFinishPartitionedFileWriter();
        NetworkBuffer networkBuffer = new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE), memorySegment -> {
        });
        Assertions.assertThatThrownBy(() -> {
            createAndFinishPartitionedFileWriter.writeBuffers(getBufferWithSubpartitions(networkBuffer, 0));
        }).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testFinishPartitionedFileWriterTwice() throws Exception {
        PartitionedFileWriter createAndFinishPartitionedFileWriter = createAndFinishPartitionedFileWriter();
        Assertions.assertThatThrownBy(() -> {
            createAndFinishPartitionedFileWriter.finish();
        }).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testReadEmptyPartitionedFile() throws Exception {
        int i = 1;
        PartitionedFile createEmptyPartitionedFile = createEmptyPartitionedFile();
        List[] listArr = new List[2];
        for (int i2 = 0; i2 < 2; i2++) {
            listArr[i2] = new ArrayList();
        }
        FileChannel openFileChannel = openFileChannel(createEmptyPartitionedFile.getDataFilePath());
        FileChannel openFileChannel2 = openFileChannel(createEmptyPartitionedFile.getIndexFilePath());
        new PartitionedFileReader(createEmptyPartitionedFile, 1, openFileChannel, openFileChannel2, BufferReaderWriterUtil.allocatedHeaderBuffer(), createAndConfigIndexEntryBuffer()).readCurrentRegion(allocateBuffers(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE), FreeingBufferRecycler.INSTANCE, buffer -> {
            addReadBuffer(buffer, listArr[i]);
        });
        Assertions.assertThat(listArr[1]).isEmpty();
        IOUtils.closeAllQuietly(new AutoCloseable[]{openFileChannel, openFileChannel2});
    }

    @Test
    void testMultipleThreadGetIndexEntry() throws Exception {
        List<Buffer>[] listArr = new List[5];
        List[] listArr2 = new List[5];
        final List<Tuple2<Long, Long>>[] listArr3 = new List[5];
        for (int i = 0; i < 5; i++) {
            listArr[i] = new ArrayList();
            listArr2[i] = new ArrayList();
            listArr3[i] = new ArrayList();
        }
        final PartitionedFile createPartitionedFile = createPartitionedFile(5, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE, 100, 10, listArr, listArr3, createPartitionedFileWriter(5, 80, 80));
        FileChannel openFileChannel = openFileChannel(createPartitionedFile.getDataFilePath());
        final FileChannel openFileChannel2 = openFileChannel(createPartitionedFile.getIndexFilePath());
        CheckedThread[] checkedThreadArr = new CheckedThread[5];
        for (int i2 = 0; i2 < 5; i2++) {
            final int i3 = i2;
            checkedThreadArr[i2] = new CheckedThread() { // from class: org.apache.flink.runtime.io.network.partition.PartitionedFileWriteReadTest.1
                public void go() throws Exception {
                    ByteBuffer createAndConfigIndexEntryBuffer = PartitionedFileWriteReadTest.createAndConfigIndexEntryBuffer();
                    for (int i4 = 0; i4 < 10; i4++) {
                        createPartitionedFile.getIndexEntry(openFileChannel2, createAndConfigIndexEntryBuffer, i4, i3);
                        long j = createAndConfigIndexEntryBuffer.getLong();
                        long j2 = createAndConfigIndexEntryBuffer.getLong();
                        Assertions.assertThat(j).isEqualTo(((Tuple2) listArr3[i3].get(i4)).f0);
                        Assertions.assertThat(j2).isEqualTo(((Tuple2) listArr3[i3].get(i4)).f1);
                    }
                }
            };
        }
        for (CheckedThread checkedThread : checkedThreadArr) {
            checkedThread.start();
        }
        for (CheckedThread checkedThread2 : checkedThreadArr) {
            checkedThread2.sync();
        }
        IOUtils.closeAllQuietly(new AutoCloseable[]{openFileChannel, openFileChannel2});
    }

    private FileChannel openFileChannel(Path path) throws IOException {
        return FileChannel.open(path, StandardOpenOption.READ);
    }

    private List<BufferWithSubpartition> getBufferWithSubpartitions(Buffer buffer, int i) {
        return Collections.singletonList(new BufferWithSubpartition(buffer, i));
    }

    private PartitionedFile createEmptyPartitionedFile() throws IOException {
        return createPartitionedFileWriter(2).finish();
    }

    private PartitionedFileWriter createPartitionedFileWriter(int i) throws IOException {
        return createPartitionedFileWriter(i, 640);
    }

    private PartitionedFileWriter createPartitionedFileWriter(int i, int i2, int i3) throws IOException {
        return new PartitionedFileWriter(i, i2, i3, this.tempPath.toString());
    }

    private PartitionedFileWriter createPartitionedFileWriter(int i, int i2) throws IOException {
        return new PartitionedFileWriter(i, i2, this.tempPath.toString());
    }

    private PartitionedFileWriter createAndFinishPartitionedFileWriter() throws IOException {
        PartitionedFileWriter createPartitionedFileWriter = createPartitionedFileWriter(1);
        createPartitionedFileWriter.finish();
        return createPartitionedFileWriter;
    }

    public static ByteBuffer createAndConfigIndexEntryBuffer() {
        ByteBuffer allocateDirect = ByteBuffer.allocateDirect(16);
        BufferReaderWriterUtil.configureByteBuffer(allocateDirect);
        return allocateDirect;
    }
}
