package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
import org.apache.flink.runtime.io.network.util.TestProducerSource;
import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.CheckedSupplier;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Mockito;

@ExtendWith({NoOpTestExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.class */
public class PipelinedSubpartitionTest extends SubpartitionTestBase {

    @RegisterExtension
    private static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION = new TestExecutorExtension<>(Executors::newCachedThreadPool);

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest$FailurePipelinedSubpartition.class */
    private static class FailurePipelinedSubpartition extends PipelinedSubpartition {
        FailurePipelinedSubpartition(int i, int i2, ResultPartition resultPartition) {
            super(i, i2, resultPartition);
        }

        Throwable getFailureCause() {
            return new RuntimeException("Expected test exception");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.SubpartitionTestBase
    /* renamed from: createSubpartition, reason: merged with bridge method [inline-methods] */
    public PipelinedSubpartition mo186createSubpartition() throws Exception {
        return createPipelinedSubpartition();
    }

    @Override // org.apache.flink.runtime.io.network.partition.SubpartitionTestBase
    ResultSubpartition createFailingWritesSubpartition() throws Exception {
        Assumptions.assumeThat(false).isTrue();
        return null;
    }

    @TestTemplate
    void testIllegalReadViewRequest() throws Exception {
        PipelinedSubpartition mo186createSubpartition = mo186createSubpartition();
        Assertions.assertThat(mo186createSubpartition.createReadView(new NoOpBufferAvailablityListener())).isNotNull();
        Assertions.assertThatThrownBy(() -> {
            mo186createSubpartition.createReadView(new NoOpBufferAvailablityListener());
        }).withFailMessage("Did not throw expected exception after duplicate notifyNonEmpty view request.", new Object[0]).isInstanceOf(IllegalStateException.class);
    }

    @TestTemplate
    void testIsReleasedChecksParent() {
        PipelinedSubpartition pipelinedSubpartition = (PipelinedSubpartition) Mockito.mock(PipelinedSubpartition.class);
        PipelinedSubpartitionView pipelinedSubpartitionView = new PipelinedSubpartitionView(pipelinedSubpartition, (BufferAvailabilityListener) Mockito.mock(BufferAvailabilityListener.class));
        Assertions.assertThat(pipelinedSubpartitionView.isReleased()).isFalse();
        ((PipelinedSubpartition) Mockito.verify(pipelinedSubpartition, Mockito.times(1))).isReleased();
        Mockito.when(Boolean.valueOf(pipelinedSubpartition.isReleased())).thenReturn(true);
        Assertions.assertThat(pipelinedSubpartitionView.isReleased()).isTrue();
        ((PipelinedSubpartition) Mockito.verify(pipelinedSubpartition, Mockito.times(2))).isReleased();
    }

    @TestTemplate
    void testConcurrentFastProduceAndFastConsume() throws Exception {
        testProduceConsume(false, false);
    }

    @TestTemplate
    void testConcurrentFastProduceAndSlowConsume() throws Exception {
        testProduceConsume(false, true);
    }

    @TestTemplate
    void testConcurrentSlowProduceAndFastConsume() throws Exception {
        testProduceConsume(true, false);
    }

    @TestTemplate
    void testConcurrentSlowProduceAndSlowConsume() throws Exception {
        testProduceConsume(true, true);
    }

    private void testProduceConsume(boolean z, boolean z2) throws Exception {
        TestProducerSource testProducerSource = new TestProducerSource() { // from class: org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionTest.1
            private int numberOfBuffers;

            @Override // org.apache.flink.runtime.io.network.util.TestProducerSource
            public TestProducerSource.BufferAndChannel getNextBuffer() throws Exception {
                if (this.numberOfBuffers == 128) {
                    return null;
                }
                MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(32768);
                int i = this.numberOfBuffers * 8192;
                for (int i2 = 0; i2 < 32768; i2 += 4) {
                    allocateUnpooledSegment.putInt(i2, i);
                    i++;
                }
                this.numberOfBuffers++;
                return new TestProducerSource.BufferAndChannel(allocateUnpooledSegment.getArray(), 0);
            }
        };
        TestConsumerCallback testConsumerCallback = new TestConsumerCallback() { // from class: org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionTest.2
            private int numberOfBuffers;

            @Override // org.apache.flink.runtime.io.network.util.TestConsumerCallback
            public void onBuffer(Buffer buffer) {
                MemorySegment memorySegment = buffer.getMemorySegment();
                Assertions.assertThat(buffer.getSize()).isEqualTo(memorySegment.size());
                int size = this.numberOfBuffers * (memorySegment.size() / 4);
                for (int i = 0; i < memorySegment.size(); i += 4) {
                    Assertions.assertThat(memorySegment.getInt(i)).isEqualTo(size);
                    size++;
                }
                this.numberOfBuffers++;
                buffer.recycleBuffer();
            }

            @Override // org.apache.flink.runtime.io.network.util.TestConsumerCallback
            public void onEvent(AbstractEvent abstractEvent) {
            }
        };
        PipelinedSubpartition mo186createSubpartition = mo186createSubpartition();
        TestSubpartitionProducer testSubpartitionProducer = new TestSubpartitionProducer(mo186createSubpartition, z, testProducerSource);
        TestSubpartitionConsumer testSubpartitionConsumer = new TestSubpartitionConsumer(z2, testConsumerCallback);
        testSubpartitionConsumer.setSubpartitionView(mo186createSubpartition.createReadView(testSubpartitionConsumer));
        testSubpartitionProducer.getClass();
        CompletableFuture supplyAsync = CompletableFuture.supplyAsync(CheckedSupplier.unchecked(testSubpartitionProducer::call), EXECUTOR_EXTENSION.getExecutor());
        testSubpartitionConsumer.getClass();
        FutureUtils.waitForAll(Arrays.asList(supplyAsync, CompletableFuture.supplyAsync(CheckedSupplier.unchecked(testSubpartitionConsumer::call), EXECUTOR_EXTENSION.getExecutor()))).get(60000L, TimeUnit.MILLISECONDS);
    }

    @TestTemplate
    void testCleanupReleasedPartitionNoView() throws Exception {
        testCleanupReleasedPartition(false);
    }

    @TestTemplate
    void testCleanupReleasedPartitionWithView() throws Exception {
        testCleanupReleasedPartition(true);
    }

    private void testCleanupReleasedPartition(boolean z) throws Exception {
        PipelinedSubpartition mo186createSubpartition = mo186createSubpartition();
        BufferConsumer createFilledFinishedBufferConsumer = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096);
        BufferConsumer createFilledFinishedBufferConsumer2 = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096);
        try {
            mo186createSubpartition.add(createFilledFinishedBufferConsumer);
            mo186createSubpartition.add(createFilledFinishedBufferConsumer2);
            Assertions.assertThat(mo186createSubpartition.getNumberOfQueuedBuffers()).isEqualTo(2);
            PipelinedSubpartitionView pipelinedSubpartitionView = null;
            if (z) {
                pipelinedSubpartitionView = mo186createSubpartition.createReadView(new NoOpBufferAvailablityListener());
            }
            mo186createSubpartition.release();
            Assertions.assertThat(mo186createSubpartition.getNumberOfQueuedBuffers()).isZero();
            Assertions.assertThat(mo186createSubpartition.isReleased()).isTrue();
            if (z) {
                Assertions.assertThat(pipelinedSubpartitionView.isReleased()).isTrue();
            }
            Assertions.assertThat(createFilledFinishedBufferConsumer.isRecycled()).isTrue();
            boolean isRecycled = createFilledFinishedBufferConsumer.isRecycled();
            if (!isRecycled) {
                createFilledFinishedBufferConsumer.close();
            }
            boolean isRecycled2 = createFilledFinishedBufferConsumer2.isRecycled();
            if (!isRecycled2) {
                createFilledFinishedBufferConsumer2.close();
            }
            Assertions.assertThat(isRecycled).withFailMessage("buffer 1 not recycled", new Object[0]).isTrue();
            Assertions.assertThat(isRecycled2).withFailMessage("buffer 2 not recycled", new Object[0]).isTrue();
            Assertions.assertThat(mo186createSubpartition.getTotalNumberOfBuffersUnsafe()).isEqualTo(2L);
            Assertions.assertThat(mo186createSubpartition.getTotalNumberOfBytesUnsafe()).isZero();
        } catch (Throwable th) {
            if (!createFilledFinishedBufferConsumer.isRecycled()) {
                createFilledFinishedBufferConsumer.close();
            }
            if (!createFilledFinishedBufferConsumer2.isRecycled()) {
                createFilledFinishedBufferConsumer2.close();
            }
            throw th;
        }
    }

    @TestTemplate
    void testReleaseParent() throws Exception {
        verifyViewReleasedAfterParentRelease(mo186createSubpartition());
    }

    @TestTemplate
    void testNumberOfQueueBuffers() throws Exception {
        PipelinedSubpartition mo186createSubpartition = mo186createSubpartition();
        mo186createSubpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096));
        Assertions.assertThat(mo186createSubpartition.getNumberOfQueuedBuffers()).isOne();
        mo186createSubpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096));
        Assertions.assertThat(mo186createSubpartition.getNumberOfQueuedBuffers()).isEqualTo(2);
        mo186createSubpartition.getNextBuffer();
        Assertions.assertThat(mo186createSubpartition.getNumberOfQueuedBuffers()).isOne();
    }

    @TestTemplate
    void testNewBufferSize() throws Exception {
        PipelinedSubpartition mo186createSubpartition = mo186createSubpartition();
        Assertions.assertThat(mo186createSubpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4))).isEqualTo(Integer.MAX_VALUE);
        mo186createSubpartition.bufferSize(42);
        Assertions.assertThat(mo186createSubpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4))).isEqualTo(42);
    }

    @TestTemplate
    void testNegativeNewBufferSize() throws Exception {
        PipelinedSubpartition mo186createSubpartition = mo186createSubpartition();
        Assertions.assertThat(mo186createSubpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4))).isEqualTo(Integer.MAX_VALUE);
        Assertions.assertThatThrownBy(() -> {
            mo186createSubpartition.bufferSize(-1);
        }).isInstanceOf(IllegalArgumentException.class);
    }

    @TestTemplate
    void testNegativeBufferSizeAsSignOfAddingFail() throws Exception {
        PipelinedSubpartition mo186createSubpartition = mo186createSubpartition();
        Assertions.assertThat(mo186createSubpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4))).isEqualTo(Integer.MAX_VALUE);
        mo186createSubpartition.finish();
        Assertions.assertThat(mo186createSubpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4))).isEqualTo(-1);
    }

    @TestTemplate
    void testProducerFailedException() {
        Assertions.assertThat(new FailurePipelinedSubpartition(0, 2, PartitionTestUtils.createPartition()).createReadView(new NoOpBufferAvailablityListener()).getFailureCause()).isNotNull().isInstanceOf(CancelTaskException.class);
    }

    @TestTemplate
    void testConsumeTimeoutableCheckpointBarrierQuickly() throws Exception {
        PipelinedSubpartition mo186createSubpartition = mo186createSubpartition();
        mo186createSubpartition.setChannelStateWriter(ChannelStateWriter.NO_OP);
        assertSubpartitionChannelStateFuturesAndQueuedBuffers(mo186createSubpartition, null, true, 0L, false);
        testConsumeQuicklyWithNDataBuffers(0, mo186createSubpartition, 5L);
        testConsumeQuicklyWithNDataBuffers(1, mo186createSubpartition, 6L);
        testConsumeQuicklyWithNDataBuffers(2, mo186createSubpartition, 7L);
    }

    private void testConsumeQuicklyWithNDataBuffers(int i, PipelinedSubpartition pipelinedSubpartition, long j) throws Exception {
        for (int i2 = 0; i2 < i; i2++) {
            pipelinedSubpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096));
        }
        pipelinedSubpartition.add(getTimeoutableBarrierBuffer(j));
        Assertions.assertThat(pipelinedSubpartition.getChannelStateCheckpointId()).isEqualTo(j);
        CompletableFuture<List<Buffer>> channelStateFuture = pipelinedSubpartition.getChannelStateFuture();
        assertSubpartitionChannelStateFuturesAndQueuedBuffers(pipelinedSubpartition, channelStateFuture, false, i + 1, false);
        for (int i3 = 0; i3 < i; i3++) {
            pollBufferAndCheckType(pipelinedSubpartition, Buffer.DataType.DATA_BUFFER);
        }
        pollBufferAndCheckType(pipelinedSubpartition, Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER);
        assertSubpartitionChannelStateFuturesAndQueuedBuffers(pipelinedSubpartition, channelStateFuture, true, 0L, true);
        FlinkAssertions.assertThatFuture(channelStateFuture).eventuallySucceeds().asList().isEmpty();
        pipelinedSubpartition.resumeConsumption();
    }

    @TestTemplate
    void testTimeoutAlignedToUnalignedBarrier() throws Exception {
        PipelinedSubpartition mo186createSubpartition = mo186createSubpartition();
        mo186createSubpartition.setChannelStateWriter(ChannelStateWriter.NO_OP);
        assertSubpartitionChannelStateFuturesAndQueuedBuffers(mo186createSubpartition, null, true, 0L, false);
        testTimeoutWithNDataBuffers(0, mo186createSubpartition, 7L);
        testTimeoutWithNDataBuffers(1, mo186createSubpartition, 8L);
    }

    private void testTimeoutWithNDataBuffers(int i, PipelinedSubpartition pipelinedSubpartition, long j) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            BufferConsumer createFilledFinishedBufferConsumer = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096);
            pipelinedSubpartition.add(createFilledFinishedBufferConsumer);
            arrayList.add(createFilledFinishedBufferConsumer.copy().build());
        }
        pipelinedSubpartition.add(getTimeoutableBarrierBuffer(j));
        Assertions.assertThat(pipelinedSubpartition.getChannelStateCheckpointId()).isEqualTo(j);
        CompletableFuture<List<Buffer>> channelStateFuture = pipelinedSubpartition.getChannelStateFuture();
        assertSubpartitionChannelStateFuturesAndQueuedBuffers(pipelinedSubpartition, channelStateFuture, false, i + 1, false);
        pipelinedSubpartition.alignedBarrierTimeout(j);
        assertSubpartitionChannelStateFuturesAndQueuedBuffers(pipelinedSubpartition, channelStateFuture, true, i + 1, true);
        pollBufferAndCheckType(pipelinedSubpartition, Buffer.DataType.PRIORITIZED_EVENT_BUFFER);
        for (int i3 = 0; i3 < i; i3++) {
            pollBufferAndCheckType(pipelinedSubpartition, Buffer.DataType.DATA_BUFFER);
        }
        FlinkAssertions.assertThatFuture(channelStateFuture).eventuallySucceeds().isEqualTo(arrayList);
    }

    private void pollBufferAndCheckType(PipelinedSubpartition pipelinedSubpartition, Buffer.DataType dataType) {
        ResultSubpartition.BufferAndBacklog pollBuffer = pipelinedSubpartition.pollBuffer();
        Assertions.assertThat(pollBuffer).isNotNull();
        Assertions.assertThat(pollBuffer.buffer().getDataType()).isEqualTo(dataType);
    }

    @TestTemplate
    void testConcurrentTimeoutableCheckpointBarrier() throws Exception {
        PipelinedSubpartition mo186createSubpartition = mo186createSubpartition();
        mo186createSubpartition.setChannelStateWriter(ChannelStateWriter.NO_OP);
        mo186createSubpartition.add(getTimeoutableBarrierBuffer(10L));
        Assertions.assertThat(mo186createSubpartition.getChannelStateCheckpointId()).isEqualTo(10L);
        CompletableFuture channelStateFuture = mo186createSubpartition.getChannelStateFuture();
        Assertions.assertThat(channelStateFuture).isNotNull();
        mo186createSubpartition.add(getTimeoutableBarrierBuffer(11L));
        channelStateFuture.getClass();
        Assertions.assertThatThrownBy(channelStateFuture::get).hasCauseInstanceOf(IllegalStateException.class).isInstanceOf(ExecutionException.class);
    }

    private BufferConsumer getTimeoutableBarrierBuffer(long j) throws IOException {
        return EventSerializer.toBufferConsumer(new CheckpointBarrier(j, System.currentTimeMillis(), CheckpointOptions.alignedWithTimeout(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault(), 1000L)), false);
    }

    private void assertSubpartitionChannelStateFuturesAndQueuedBuffers(PipelinedSubpartition pipelinedSubpartition, CompletableFuture<List<Buffer>> completableFuture, boolean z, long j, boolean z2) {
        Assertions.assertThat(pipelinedSubpartition.getChannelStateFuture() == null).isEqualTo(z);
        Assertions.assertThat(pipelinedSubpartition.getNumberOfQueuedBuffers()).isEqualTo(j);
        if (completableFuture != null) {
            Assertions.assertThat(completableFuture.isDone()).isEqualTo(z2);
        }
    }

    private void verifyViewReleasedAfterParentRelease(ResultSubpartition resultSubpartition) throws Exception {
        resultSubpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        resultSubpartition.finish();
        ResultSubpartitionView createReadView = resultSubpartition.createReadView((BufferAvailabilityListener) Mockito.mock(BufferAvailabilityListener.class));
        Assertions.assertThat(createReadView.getNextBuffer()).isNotNull();
        Assertions.assertThat(createReadView.getNextBuffer()).isNotNull();
        Assertions.assertThat(createReadView.isReleased()).isFalse();
        resultSubpartition.release();
        Assertions.assertThat(createReadView.isReleased()).isTrue();
    }

    public static PipelinedSubpartition createPipelinedSubpartition() {
        return new PipelinedSubpartition(0, 2, PartitionTestUtils.createPartition());
    }

    public static PipelinedSubpartition createPipelinedSubpartition(ResultPartition resultPartition) {
        return new PipelinedSubpartition(0, 2, resultPartition);
    }
}
