package org.apache.flink.runtime.io.network.partition.hybrid;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Optional;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionFileReader;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiConsumerWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.class */
class HsFileDataManagerTest {
    private static final int BUFFER_SIZE = 1024;
    private static final int NUM_SUBPARTITIONS = 10;
    private static final int BUFFER_POOL_SIZE = 2;
    private final byte[] dataBytes = new byte[1024];
    private ManuallyTriggeredScheduledExecutorService ioExecutor;
    private BatchShuffleReadBufferPool bufferPool;
    private FileChannel dataFileChannel;
    private Path dataFilePath;
    private HsFileDataManager fileDataManager;
    private TestingSubpartitionConsumerInternalOperation subpartitionViewOperation;
    private TestingHsSubpartitionFileReader.Factory factory;

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest$TestingHsSubpartitionFileReader.class */
    private static class TestingHsSubpartitionFileReader implements HsSubpartitionFileReader {
        private Runnable prepareForSchedulingRunnable;
        private BiConsumerWithException<Queue<MemorySegment>, Queue<MemorySegment>, IOException> readBuffersConsumer;
        private Consumer<Throwable> failConsumer;
        private Runnable releaseDataViewRunnable;
        private final Queue<MemorySegment> readBuffers;
        private int priority;

        /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest$TestingHsSubpartitionFileReader$Factory.class */
        private static class Factory implements HsSubpartitionFileReader.Factory {
            private final Queue<HsSubpartitionFileReader> allReaders;

            private Factory() {
                this.allReaders = new ArrayDeque();
            }

            public HsSubpartitionFileReader createFileReader(int i, HsConsumerId hsConsumerId, FileChannel fileChannel, HsSubpartitionConsumerInternalOperations hsSubpartitionConsumerInternalOperations, HsFileDataIndex hsFileDataIndex, int i2, Consumer<HsSubpartitionFileReader> consumer, ByteBuffer byteBuffer) {
                return (HsSubpartitionFileReader) Preconditions.checkNotNull(this.allReaders.poll());
            }
        }

        private TestingHsSubpartitionFileReader() {
            this.prepareForSchedulingRunnable = () -> {
            };
            this.readBuffersConsumer = (queue, queue2) -> {
            };
            this.failConsumer = th -> {
            };
            this.releaseDataViewRunnable = () -> {
            };
            this.readBuffers = new ArrayDeque();
        }

        public void prepareForScheduling() {
            this.prepareForSchedulingRunnable.run();
        }

        public void readBuffers(Queue<MemorySegment> queue, BufferRecycler bufferRecycler) throws IOException {
            this.readBuffersConsumer.accept(queue, this.readBuffers);
        }

        public void fail(Throwable th) {
            this.failConsumer.accept(th);
        }

        public int compareTo(HsSubpartitionFileReader hsSubpartitionFileReader) {
            Preconditions.checkArgument(hsSubpartitionFileReader instanceof TestingHsSubpartitionFileReader);
            return Integer.compare(this.priority, ((TestingHsSubpartitionFileReader) hsSubpartitionFileReader).priority);
        }

        public void setPriority(int i) {
            this.priority = i;
        }

        public void setPrepareForSchedulingRunnable(Runnable runnable) {
            this.prepareForSchedulingRunnable = runnable;
        }

        public void setReadBuffersConsumer(BiConsumerWithException<Queue<MemorySegment>, Queue<MemorySegment>, IOException> biConsumerWithException) {
            this.readBuffersConsumer = biConsumerWithException;
        }

        public void setFailConsumer(Consumer<Throwable> consumer) {
            this.failConsumer = consumer;
        }

        public void setReleaseDataViewRunnable(Runnable runnable) {
            this.releaseDataViewRunnable = runnable;
        }

        public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(int i, Collection<Buffer> collection) {
            return Optional.empty();
        }

        public Buffer.DataType peekNextToConsumeDataType(int i, Collection<Buffer> collection) {
            return Buffer.DataType.NONE;
        }

        public int getBacklog() {
            return 0;
        }

        public void releaseDataView() {
            this.releaseDataViewRunnable.run();
        }
    }

    HsFileDataManagerTest() {
    }

    @BeforeEach
    void before(@TempDir Path path) throws IOException {
        new Random().nextBytes(this.dataBytes);
        this.bufferPool = new BatchShuffleReadBufferPool(2048L, 1024);
        this.ioExecutor = new ManuallyTriggeredScheduledExecutorService();
        this.dataFilePath = Files.createFile(path.resolve(".data"), new FileAttribute[0]);
        this.dataFileChannel = openFileChannel(this.dataFilePath);
        this.factory = new TestingHsSubpartitionFileReader.Factory();
        this.fileDataManager = new HsFileDataManager(this.bufferPool, this.ioExecutor, new HsFileDataIndexImpl(10, path.resolve(".index"), 256, Long.MAX_VALUE), this.dataFilePath, this.factory, HybridShuffleConfiguration.builder(10, this.bufferPool.getNumBuffersPerRequest()).build());
        this.subpartitionViewOperation = new TestingSubpartitionConsumerInternalOperation();
    }

    @AfterEach
    void after() throws Exception {
        this.bufferPool.destroy();
        if (this.dataFileChannel != null) {
            this.dataFileChannel.close();
        }
    }

    @Test
    void testRegisterReaderTriggerRun() throws Exception {
        TestingHsSubpartitionFileReader testingHsSubpartitionFileReader = new TestingHsSubpartitionFileReader();
        testingHsSubpartitionFileReader.setReadBuffersConsumer((queue, queue2) -> {
            queue2.addAll(queue);
        });
        this.factory.allReaders.add(testingHsSubpartitionFileReader);
        Assertions.assertThat(testingHsSubpartitionFileReader.readBuffers).isEmpty();
        this.fileDataManager.registerNewConsumer(0, HsConsumerId.DEFAULT, this.subpartitionViewOperation);
        this.ioExecutor.trigger();
        Assertions.assertThat(testingHsSubpartitionFileReader.readBuffers).hasSize(2);
    }

    @Test
    void testBufferReleasedTriggerRun() throws Exception {
        TestingHsSubpartitionFileReader testingHsSubpartitionFileReader = new TestingHsSubpartitionFileReader();
        testingHsSubpartitionFileReader.setReadBuffersConsumer((queue, queue2) -> {
            while (!queue.isEmpty()) {
                queue2.add(queue.poll());
            }
        });
        this.factory.allReaders.add(testingHsSubpartitionFileReader);
        this.fileDataManager.registerNewConsumer(0, HsConsumerId.DEFAULT, this.subpartitionViewOperation);
        this.ioExecutor.trigger();
        Assertions.assertThat(testingHsSubpartitionFileReader.readBuffers).hasSize(2);
        Assertions.assertThat(this.bufferPool.getAvailableBuffers()).isZero();
        this.fileDataManager.recycle((MemorySegment) testingHsSubpartitionFileReader.readBuffers.poll());
        this.fileDataManager.recycle((MemorySegment) testingHsSubpartitionFileReader.readBuffers.poll());
        this.ioExecutor.trigger();
        Assertions.assertThat(testingHsSubpartitionFileReader.readBuffers).hasSize(2);
    }

    @Test
    void testRunReleaseUnusedBuffers() throws Exception {
        TestingHsSubpartitionFileReader testingHsSubpartitionFileReader = new TestingHsSubpartitionFileReader();
        CompletableFuture completableFuture = new CompletableFuture();
        testingHsSubpartitionFileReader.setPrepareForSchedulingRunnable(() -> {
            completableFuture.complete(null);
        });
        testingHsSubpartitionFileReader.setReadBuffersConsumer((queue, queue2) -> {
            Assertions.assertThat(completableFuture).isCompleted();
            Assertions.assertThat(queue).hasSize(2);
            Assertions.assertThat(this.bufferPool.getAvailableBuffers()).isEqualTo(0);
            queue2.add(queue.poll());
        });
        this.factory.allReaders.add(testingHsSubpartitionFileReader);
        this.fileDataManager.registerNewConsumer(0, HsConsumerId.DEFAULT, this.subpartitionViewOperation);
        this.ioExecutor.trigger();
        Assertions.assertThat(this.bufferPool.getAvailableBuffers()).isEqualTo(1);
    }

    @Test
    void testScheduleReadersOrdered() throws Exception {
        TestingHsSubpartitionFileReader testingHsSubpartitionFileReader = new TestingHsSubpartitionFileReader();
        TestingHsSubpartitionFileReader testingHsSubpartitionFileReader2 = new TestingHsSubpartitionFileReader();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        testingHsSubpartitionFileReader.setReadBuffersConsumer((queue, queue2) -> {
            Assertions.assertThat(completableFuture2).isNotDone();
            completableFuture.complete(null);
        });
        testingHsSubpartitionFileReader2.setReadBuffersConsumer((queue3, queue4) -> {
            Assertions.assertThat(completableFuture).isDone();
            completableFuture2.complete(null);
        });
        testingHsSubpartitionFileReader.setPriority(1);
        testingHsSubpartitionFileReader2.setPriority(2);
        this.factory.allReaders.add(testingHsSubpartitionFileReader);
        this.factory.allReaders.add(testingHsSubpartitionFileReader2);
        this.fileDataManager.registerNewConsumer(0, HsConsumerId.DEFAULT, this.subpartitionViewOperation);
        this.fileDataManager.registerNewConsumer(1, HsConsumerId.DEFAULT, this.subpartitionViewOperation);
        this.ioExecutor.trigger();
        Assertions.assertThat(completableFuture2).isCompleted();
    }

    @Test
    void testRunRequestBufferTimeout(@TempDir Path path) throws Exception {
        this.fileDataManager = new HsFileDataManager(this.bufferPool, this.ioExecutor, new HsFileDataIndexImpl(10, path.resolve(".index"), 256, Long.MAX_VALUE), this.dataFilePath, this.factory, HybridShuffleConfiguration.builder(10, this.bufferPool.getNumBuffersPerRequest()).setBufferRequestTimeout(Duration.ofSeconds(3L)).build());
        TestingHsSubpartitionFileReader testingHsSubpartitionFileReader = new TestingHsSubpartitionFileReader();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        testingHsSubpartitionFileReader.setPrepareForSchedulingRunnable(() -> {
            completableFuture.complete(null);
        });
        completableFuture2.getClass();
        testingHsSubpartitionFileReader.setFailConsumer((v1) -> {
            r1.complete(v1);
        });
        testingHsSubpartitionFileReader.setReadBuffersConsumer((queue, queue2) -> {
            Assertions.assertThat(queue).hasSize(this.bufferPool.getNumTotalBuffers());
            queue.clear();
        });
        this.factory.allReaders.add(testingHsSubpartitionFileReader);
        this.fileDataManager.registerNewConsumer(0, HsConsumerId.DEFAULT, this.subpartitionViewOperation);
        this.ioExecutor.trigger();
        Assertions.assertThat(this.bufferPool.getAvailableBuffers()).isZero();
        this.fileDataManager.run();
        Assertions.assertThat(completableFuture).isCompleted();
        Assertions.assertThat(completableFuture2).isCompleted();
        Assertions.assertThat((Throwable) completableFuture2.get()).isInstanceOf(TimeoutException.class).hasMessageContaining("Buffer request timeout");
    }

    @Test
    void testRunReadBuffersThrowException() throws Exception {
        TestingHsSubpartitionFileReader testingHsSubpartitionFileReader = new TestingHsSubpartitionFileReader();
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.getClass();
        testingHsSubpartitionFileReader.setFailConsumer((v1) -> {
            r1.complete(v1);
        });
        testingHsSubpartitionFileReader.setReadBuffersConsumer((queue, queue2) -> {
            throw new IOException("expected exception.");
        });
        this.factory.allReaders.add(testingHsSubpartitionFileReader);
        this.fileDataManager.registerNewConsumer(0, HsConsumerId.DEFAULT, this.subpartitionViewOperation);
        this.ioExecutor.trigger();
        Assertions.assertThat(completableFuture).isCompleted();
        Assertions.assertThat((Throwable) completableFuture.get()).isInstanceOf(IOException.class).hasMessageContaining("expected exception.");
    }

    @Timeout(10)
    @Test
    void testReleasedWhenReading() throws Exception {
        TestingHsSubpartitionFileReader testingHsSubpartitionFileReader = new TestingHsSubpartitionFileReader();
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.getClass();
        testingHsSubpartitionFileReader.setFailConsumer((v1) -> {
            r1.complete(v1);
        });
        final CompletableFuture completableFuture2 = new CompletableFuture();
        final CompletableFuture completableFuture3 = new CompletableFuture();
        testingHsSubpartitionFileReader.setReadBuffersConsumer((queue, queue2) -> {
            try {
                completableFuture2.complete(null);
                completableFuture3.get();
            } catch (Exception e) {
                throw new IOException(e);
            }
        });
        this.factory.allReaders.add(testingHsSubpartitionFileReader);
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataManagerTest.1
            public void go() throws Exception {
                completableFuture2.get();
                HsFileDataManagerTest.this.fileDataManager.release();
                completableFuture3.complete(null);
            }
        };
        checkedThread.start();
        this.fileDataManager.registerNewConsumer(0, HsConsumerId.DEFAULT, this.subpartitionViewOperation);
        this.ioExecutor.trigger();
        checkedThread.sync();
        Assertions.assertThat(completableFuture).isCompleted();
        Assertions.assertThat((Throwable) completableFuture.get()).isInstanceOf(IllegalStateException.class).hasMessageContaining("Result partition has been already released.");
    }

    @Test
    void testRegisterSubpartitionReaderAfterReleased() {
        this.factory.allReaders.add(new TestingHsSubpartitionFileReader());
        this.fileDataManager.release();
        Assertions.assertThatThrownBy(() -> {
            this.fileDataManager.registerNewConsumer(0, HsConsumerId.DEFAULT, this.subpartitionViewOperation);
            this.ioExecutor.trigger();
        }).isInstanceOf(IllegalStateException.class).hasMessageContaining("HsFileDataManager is already released.");
    }

    @Test
    void testConsumeWhileReleaseNoDeadlock(@TempDir Path path) throws Exception {
        final CompletableFuture completableFuture = new CompletableFuture();
        final CompletableFuture completableFuture2 = new CompletableFuture();
        final HsSubpartitionConsumer hsSubpartitionConsumer = new HsSubpartitionConsumer(new NoOpBufferAvailablityListener());
        HsFileDataManager hsFileDataManager = this.fileDataManager;
        hsFileDataManager.getClass();
        this.factory.allReaders.add(new HsSubpartitionFileReaderImpl(0, HsConsumerId.DEFAULT, this.dataFileChannel, hsSubpartitionConsumer, new HsFileDataIndexImpl(10, path.resolve(".index"), 256, Long.MAX_VALUE), 5, hsFileDataManager::releaseSubpartitionReader, BufferReaderWriterUtil.allocatedHeaderBuffer()) { // from class: org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataManagerTest.2
            public synchronized void fail(Throwable th) {
                try {
                    completableFuture2.complete(null);
                    completableFuture.get();
                    super.fail(th);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        });
        hsSubpartitionConsumer.setDiskDataView(this.fileDataManager.registerNewConsumer(0, HsConsumerId.DEFAULT, hsSubpartitionConsumer));
        hsSubpartitionConsumer.setMemoryDataView(TestingHsDataView.builder().setConsumeBufferFunction(num -> {
            throw new RuntimeException("expected exception.");
        }).build());
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataManagerTest.3
            public void go() throws Exception {
                completableFuture2.get();
                completableFuture.complete(null);
                hsSubpartitionConsumer.getNextBuffer();
            }
        };
        checkedThread.start();
        this.fileDataManager.release();
        checkedThread.sync();
    }

    private static FileChannel openFileChannel(Path path) throws IOException {
        return FileChannel.open(path, StandardOpenOption.READ);
    }
}
