package org.apache.flink.runtime.io.disk.iomanager;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.util.TestNotificationListener;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.class */
class AsynchronousFileIOChannelTest {
    private static final Logger LOG = LoggerFactory.getLogger(AsynchronousFileIOChannelTest.class);

    @RegisterExtension
    private static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION = new TestExecutorExtension<>(Executors::newCachedThreadPool);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest$FailingWriteRequest.class */
    public static class FailingWriteRequest implements WriteRequest {
        private final AsynchronousFileIOChannel<MemorySegment, WriteRequest> channel;
        private final MemorySegment segment;

        protected FailingWriteRequest(AsynchronousFileIOChannel<MemorySegment, WriteRequest> asynchronousFileIOChannel, MemorySegment memorySegment) {
            this.channel = asynchronousFileIOChannel;
            this.segment = memorySegment;
        }

        public void write() throws IOException {
            throw new IOException();
        }

        public void requestDone(IOException iOException) {
            this.channel.handleProcessedBuffer(this.segment, iOException);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest$NoOpCallback.class */
    public static class NoOpCallback<T> implements RequestDoneCallback<T> {
        private NoOpCallback() {
        }

        public void requestSuccessful(T t) {
        }

        public void requestFailed(T t, IOException iOException) {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest$NoOpWriteRequest.class */
    private static class NoOpWriteRequest implements WriteRequest {
        private NoOpWriteRequest() {
        }

        public void requestDone(IOException iOException) {
        }

        public void write() {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest$TestAsyncFileIOChannel.class */
    private static class TestAsyncFileIOChannel extends AsynchronousFileIOChannel<Buffer, WriteRequest> {
        protected TestAsyncFileIOChannel(FileIOChannel.ID id, RequestQueue<WriteRequest> requestQueue, RequestDoneCallback<Buffer> requestDoneCallback, boolean z) throws IOException {
            super(id, requestQueue, requestDoneCallback, z);
        }

        int getNumberOfOutstandingRequests() {
            return this.requestsNotReturned.get();
        }
    }

    AsynchronousFileIOChannelTest() {
    }

    @Test
    void testAllRequestsProcessedListenerNotification() throws Exception {
        Random random = new Random();
        RequestQueue requestQueue = new RequestQueue();
        NoOpCallback noOpCallback = new NoOpCallback();
        TestNotificationListener testNotificationListener = new TestNotificationListener();
        IOManagerAsync iOManagerAsync = new IOManagerAsync();
        Throwable th = null;
        for (int i = 0; i < 10; i++) {
            try {
                try {
                    TestAsyncFileIOChannel testAsyncFileIOChannel = new TestAsyncFileIOChannel(iOManagerAsync.createChannel(), requestQueue, noOpCallback, true);
                    CountDownLatch countDownLatch = new CountDownLatch(3);
                    Buffer buildSomeBuffer = BufferBuilderTestUtils.buildSomeBuffer();
                    NoOpWriteRequest noOpWriteRequest = new NoOpWriteRequest();
                    Callable callable = () -> {
                        for (int i2 = 0; i2 < 10; i2++) {
                            LOG.debug("Starting run {}.", Integer.valueOf(i2 + 1));
                            for (int i3 = 0; i3 < 100; i3++) {
                                testAsyncFileIOChannel.addRequest(noOpWriteRequest);
                            }
                            LOG.debug("Added all ({}) requests of run {}.", 100, Integer.valueOf(i2 + 1));
                            int nextInt = random.nextInt(10);
                            LOG.debug("Sleeping for {} ms before next run.", Integer.valueOf(nextInt));
                            Thread.sleep(nextInt);
                        }
                        LOG.debug("Done. Closing channel.");
                        testAsyncFileIOChannel.close();
                        countDownLatch.countDown();
                        return null;
                    };
                    Callable callable2 = () -> {
                        for (int i2 = 0; i2 < 1000; i2++) {
                            requestQueue.take();
                            testAsyncFileIOChannel.handleProcessedBuffer(buildSomeBuffer, null);
                        }
                        LOG.debug("Processed all ({}) requests.", 100);
                        countDownLatch.countDown();
                        return null;
                    };
                    Callable callable3 = () -> {
                        while (true) {
                            int numberOfNotifications = testNotificationListener.getNumberOfNotifications();
                            if (testAsyncFileIOChannel.registerAllRequestsProcessedListener(testNotificationListener)) {
                                testNotificationListener.waitForNotification(numberOfNotifications);
                            } else if (testAsyncFileIOChannel.isClosed()) {
                                LOG.debug("Stopping listener. Channel closed.");
                                countDownLatch.countDown();
                                return null;
                            }
                        }
                    };
                    LinkedList linkedList = new LinkedList();
                    linkedList.add(callable);
                    linkedList.add(callable2);
                    linkedList.add(callable3);
                    Collections.shuffle(linkedList);
                    Iterator it = linkedList.iterator();
                    while (it.hasNext()) {
                        EXECUTOR_EXTENSION.getExecutor().submit((Callable) it.next());
                    }
                    Assertions.assertThat(countDownLatch.await(2L, TimeUnit.MINUTES)).withFailMessage("Test failed due to a timeout. This indicates a deadlock due to the waythat listeners are registered/notified in the asynchronous file I/Ochannel.", new Object[0]).isTrue();
                    testNotificationListener.reset();
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (iOManagerAsync != null) {
                    if (th != null) {
                        try {
                            iOManagerAsync.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        iOManagerAsync.close();
                    }
                }
                throw th3;
            }
        }
        if (iOManagerAsync != null) {
            if (0 == 0) {
                iOManagerAsync.close();
                return;
            }
            try {
                iOManagerAsync.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    @Test
    void testClosedButAddRequestAndRegisterListenerRace() throws Exception {
        RequestQueue requestQueue = new RequestQueue();
        NoOpCallback noOpCallback = new NoOpCallback();
        TestNotificationListener testNotificationListener = new TestNotificationListener();
        IOManagerAsync iOManagerAsync = new IOManagerAsync();
        Throwable th = null;
        for (int i = 0; i < 1024; i++) {
            try {
                try {
                    TestAsyncFileIOChannel testAsyncFileIOChannel = new TestAsyncFileIOChannel(iOManagerAsync.createChannel(), requestQueue, noOpCallback, true);
                    CountDownLatch countDownLatch = new CountDownLatch(2);
                    NoOpWriteRequest noOpWriteRequest = new NoOpWriteRequest();
                    testAsyncFileIOChannel.close();
                    Callable callable = () -> {
                        try {
                            testAsyncFileIOChannel.addRequest(noOpWriteRequest);
                            countDownLatch.countDown();
                            return null;
                        } catch (Throwable th2) {
                            countDownLatch.countDown();
                            throw th2;
                        }
                    };
                    Callable callable2 = () -> {
                        while (true) {
                            try {
                                int numberOfNotifications = testNotificationListener.getNumberOfNotifications();
                                if (testAsyncFileIOChannel.registerAllRequestsProcessedListener(testNotificationListener)) {
                                    testNotificationListener.waitForNotification(numberOfNotifications);
                                } else if (testAsyncFileIOChannel.isClosed()) {
                                    return null;
                                }
                            } finally {
                                countDownLatch.countDown();
                            }
                        }
                    };
                    ExecutorService executor = EXECUTOR_EXTENSION.getExecutor();
                    executor.submit(callable);
                    executor.submit(callable2);
                    Assertions.assertThat(countDownLatch.await(2L, TimeUnit.MINUTES)).withFailMessage("Test failed due to a timeout. This indicates a deadlock due to the waythat listeners are registered/notified in the asynchronous file I/Ochannel.", new Object[0]).isTrue();
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (iOManagerAsync != null) {
                    if (th != null) {
                        try {
                            iOManagerAsync.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        iOManagerAsync.close();
                    }
                }
                throw th3;
            }
        }
        if (iOManagerAsync != null) {
            if (0 == 0) {
                iOManagerAsync.close();
                return;
            }
            try {
                iOManagerAsync.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    @Test
    void testClosingWaits() throws Exception {
        IOManagerAsync iOManagerAsync = new IOManagerAsync();
        Throwable th = null;
        try {
            final MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(32768);
            final AtomicInteger atomicInteger = new AtomicInteger();
            final AtomicBoolean atomicBoolean = new AtomicBoolean();
            BlockChannelWriterWithCallback createBlockChannelWriter = iOManagerAsync.createBlockChannelWriter(iOManagerAsync.createChannel(), new RequestDoneCallback<MemorySegment>() { // from class: org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannelTest.1
                public void requestSuccessful(MemorySegment memorySegment) {
                    atomicInteger.set(atomicInteger.get() + 1);
                    if (memorySegment != allocateUnpooledSegment) {
                        atomicBoolean.set(true);
                    }
                }

                public void requestFailed(MemorySegment memorySegment, IOException iOException) {
                    atomicBoolean.set(true);
                }
            });
            for (int i = 0; i < 100; i++) {
                try {
                    createBlockChannelWriter.writeBlock(allocateUnpooledSegment);
                } catch (Throwable th2) {
                    createBlockChannelWriter.closeAndDelete();
                    throw th2;
                }
            }
            createBlockChannelWriter.close();
            Assertions.assertThat(atomicInteger).hasValue(100);
            Assertions.assertThat(atomicBoolean).isFalse();
            createBlockChannelWriter.closeAndDelete();
            if (iOManagerAsync != null) {
                if (0 == 0) {
                    iOManagerAsync.close();
                    return;
                }
                try {
                    iOManagerAsync.close();
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                }
            }
        } catch (Throwable th4) {
            if (iOManagerAsync != null) {
                if (0 != 0) {
                    try {
                        iOManagerAsync.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    iOManagerAsync.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testExceptionForwardsToClose() throws Exception {
        IOManagerAsync iOManagerAsync = new IOManagerAsync();
        Throwable th = null;
        try {
            testExceptionForwardsToClose(iOManagerAsync, 100, 1);
            testExceptionForwardsToClose(iOManagerAsync, 100, 50);
            testExceptionForwardsToClose(iOManagerAsync, 100, 100);
            if (iOManagerAsync != null) {
                if (0 == 0) {
                    iOManagerAsync.close();
                    return;
                }
                try {
                    iOManagerAsync.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (iOManagerAsync != null) {
                if (0 != 0) {
                    try {
                        iOManagerAsync.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    iOManagerAsync.close();
                }
            }
            throw th3;
        }
    }

    private void testExceptionForwardsToClose(IOManagerAsync iOManagerAsync, int i, final int i2) throws IOException {
        MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(32768);
        FileIOChannel.ID createChannel = iOManagerAsync.createChannel();
        AsynchronousBlockWriterWithCallback asynchronousBlockWriterWithCallback = new AsynchronousBlockWriterWithCallback(createChannel, iOManagerAsync.getWriteRequestQueue(createChannel), new NoOpCallback()) { // from class: org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannelTest.2
            private int numBlocks;

            public void writeBlock(MemorySegment memorySegment) throws IOException {
                this.numBlocks++;
                if (this.numBlocks != i2) {
                    super.writeBlock(memorySegment);
                } else {
                    this.requestsNotReturned.incrementAndGet();
                    this.requestQueue.add(new FailingWriteRequest(this, memorySegment));
                }
            }
        };
        Assertions.assertThatThrownBy(() -> {
            for (int i3 = 0; i3 < i; i3++) {
                try {
                    asynchronousBlockWriterWithCallback.writeBlock(allocateUnpooledSegment);
                } catch (Throwable th) {
                    asynchronousBlockWriterWithCallback.closeAndDelete();
                    throw th;
                }
            }
            asynchronousBlockWriterWithCallback.close();
            asynchronousBlockWriterWithCallback.closeAndDelete();
        }).isInstanceOf(IOException.class);
    }
}
