/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.NoOpFileChannelManager;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.AwaitableBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class PipelinedSubpartitionWithReadViewTest {
    ResultPartition resultPartition;
    PipelinedSubpartition subpartition;
    AwaitableBufferAvailablityListener availablityListener;
    PipelinedSubpartitionView readView;
    @Parameter
    private boolean compressionEnabled;

    PipelinedSubpartitionWithReadViewTest() {
    }

    @Parameters(name="compressionEnabled = {0}")
    private static List<Boolean> parameters() {
        return Arrays.asList(false, true);
    }

    @BeforeEach
    void before() throws IOException {
        this.setup(ResultPartitionType.PIPELINED);
        this.subpartition = new PipelinedSubpartition(0, 2, Integer.MAX_VALUE, this.resultPartition);
        this.availablityListener = new AwaitableBufferAvailablityListener();
        this.readView = this.subpartition.createReadView((BufferAvailabilityListener)this.availablityListener);
    }

    @AfterEach
    void tearDown() {
        this.readView.releaseAllResources();
        this.subpartition.release();
    }

    @TestTemplate
    void testAddTwoNonFinishedBuffer() throws IOException {
        this.subpartition.add(BufferBuilderTestUtils.createBufferBuilder().createBufferConsumer());
        this.subpartition.add(BufferBuilderTestUtils.createBufferBuilder().createBufferConsumer());
        Assertions.assertThatThrownBy(() -> ((PipelinedSubpartitionView)this.readView).getNextBuffer()).isInstanceOf(IllegalStateException.class);
    }

    @TestTemplate
    void testRelease() {
        this.readView.releaseAllResources();
        this.resultPartition.close();
        Assertions.assertThat((Collection)this.resultPartition.getPartitionManager().getUnreleasedPartitions()).doesNotContain((Object[])new ResultPartitionID[]{this.resultPartition.getPartitionId()});
    }

    @TestTemplate
    void testAddEmptyNonFinishedBuffer() throws IOException {
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isZero();
        BufferBuilder bufferBuilder = BufferBuilderTestUtils.createBufferBuilder();
        this.subpartition.add(bufferBuilder.createBufferConsumer());
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isZero();
        Assertions.assertThat((Object)this.readView.getNextBuffer()).isNull();
        bufferBuilder.finish();
        bufferBuilder = BufferBuilderTestUtils.createBufferBuilder();
        this.subpartition.add(bufferBuilder.createBufferConsumer());
        Assertions.assertThat((int)this.subpartition.getBuffersInBacklogUnsafe()).isOne();
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isOne();
        Assertions.assertThat((Object)this.readView.getNextBuffer()).isNull();
        Assertions.assertThat((int)this.subpartition.getBuffersInBacklogUnsafe()).isZero();
    }

    @TestTemplate
    void testAddNonEmptyNotFinishedBuffer() throws Exception {
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isZero();
        this.subpartition.add(BufferBuilderTestUtils.createFilledUnfinishedBufferConsumer(1024));
        Assertions.assertThat((int)this.subpartition.getBuffersInBacklogUnsafe()).isZero();
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 1024, false, 0, false, false);
    }

    @TestTemplate
    void testUnfinishedBufferBehindFinished() throws Exception {
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1025));
        this.subpartition.add(BufferBuilderTestUtils.createFilledUnfinishedBufferConsumer(1024));
        Assertions.assertThat((int)this.subpartition.getBuffersInBacklogUnsafe()).isOne();
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isGreaterThan(0L);
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 1025, false, 0, false, true);
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 1024, false, 0, false, false);
        PipelinedSubpartitionWithReadViewTest.assertNoNextBuffer((ResultSubpartitionView)this.readView);
    }

    @TestTemplate
    void testFlushWithUnfinishedBufferBehindFinished() throws Exception {
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1025));
        this.subpartition.add(BufferBuilderTestUtils.createFilledUnfinishedBufferConsumer(1024));
        long oldNumNotifications = this.availablityListener.getNumNotifications();
        Assertions.assertThat((int)this.subpartition.getBuffersInBacklogUnsafe()).isOne();
        this.subpartition.flush();
        Assertions.assertThat((long)oldNumNotifications).isGreaterThan(0L);
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isEqualTo(oldNumNotifications);
        Assertions.assertThat((int)this.subpartition.getBuffersInBacklogUnsafe()).isEqualTo(2);
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 1025, true, 1, false, true);
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 1024, false, 0, false, false);
        PipelinedSubpartitionWithReadViewTest.assertNoNextBuffer((ResultSubpartitionView)this.readView);
    }

    @TestTemplate
    void testFlushWithUnfinishedBufferBehindFinished2() throws Exception {
        this.subpartition.flush();
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isZero();
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1025));
        this.subpartition.add(BufferBuilderTestUtils.createFilledUnfinishedBufferConsumer(1024));
        Assertions.assertThat((int)this.subpartition.getBuffersInBacklogUnsafe()).isOne();
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 1025, false, 0, false, true);
        long oldNumNotifications = this.availablityListener.getNumNotifications();
        this.subpartition.flush();
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isEqualTo(oldNumNotifications + 1L);
        this.subpartition.flush();
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isEqualTo(oldNumNotifications + 1L);
        Assertions.assertThat((int)this.subpartition.getBuffersInBacklogUnsafe()).isOne();
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 1024, false, 0, false, false);
        PipelinedSubpartitionWithReadViewTest.assertNoNextBuffer((ResultSubpartitionView)this.readView);
    }

    @TestTemplate
    void testMultipleEmptyBuffers() throws Exception {
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isZero();
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(0));
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isZero();
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(0));
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isOne();
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(0));
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isOne();
        Assertions.assertThat((int)this.subpartition.getBuffersInBacklogUnsafe()).isEqualTo(2);
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1024));
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isOne();
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 1024, false, 0, false, true);
    }

    @TestTemplate
    void testEmptyFlush() {
        this.subpartition.flush();
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isZero();
    }

    @TestTemplate
    void testBasicPipelinedProduceConsumeLogic() throws Exception {
        Assertions.assertThat((boolean)this.readView.getAvailabilityAndBacklog(false).isAvailable()).isFalse();
        PipelinedSubpartitionWithReadViewTest.assertNoNextBuffer((ResultSubpartitionView)this.readView);
        Assertions.assertThat((boolean)this.readView.getAvailabilityAndBacklog(false).isAvailable()).isFalse();
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isZero();
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        Assertions.assertThat((boolean)this.readView.getAvailabilityAndBacklog(false).isAvailable()).isFalse();
        Assertions.assertThat((long)this.subpartition.getTotalNumberOfBuffersUnsafe()).isOne();
        Assertions.assertThat((int)this.subpartition.getBuffersInBacklogUnsafe()).isZero();
        Assertions.assertThat((long)this.subpartition.getTotalNumberOfBytesUnsafe()).isZero();
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isZero();
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 32768, false, 0, false, true);
        Assertions.assertThat((long)this.subpartition.getTotalNumberOfBytesUnsafe()).isEqualTo(32768L);
        Assertions.assertThat((int)this.subpartition.getBuffersInBacklogUnsafe()).isZero();
        PipelinedSubpartitionWithReadViewTest.assertNoNextBuffer((ResultSubpartitionView)this.readView);
        Assertions.assertThat((int)this.subpartition.getBuffersInBacklogUnsafe()).isZero();
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        Assertions.assertThat((boolean)this.readView.getAvailabilityAndBacklog(false).isAvailable()).isFalse();
        Assertions.assertThat((long)this.subpartition.getTotalNumberOfBuffersUnsafe()).isEqualTo(2L);
        Assertions.assertThat((int)this.subpartition.getBuffersInBacklogUnsafe()).isZero();
        Assertions.assertThat((long)this.subpartition.getTotalNumberOfBytesUnsafe()).isEqualTo(32768L);
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isZero();
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 32768, false, 0, false, true);
        Assertions.assertThat((long)this.subpartition.getTotalNumberOfBytesUnsafe()).isEqualTo(65536L);
        Assertions.assertThat((int)this.subpartition.getBuffersInBacklogUnsafe()).isZero();
        PipelinedSubpartitionWithReadViewTest.assertNoNextBuffer((ResultSubpartitionView)this.readView);
        Assertions.assertThat((int)this.subpartition.getBuffersInBacklogUnsafe()).isZero();
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        Assertions.assertThat((boolean)this.readView.getAvailabilityAndBacklog(false).isAvailable()).isFalse();
        this.subpartition.add(BufferBuilderTestUtils.createEventBufferConsumer(32768, Buffer.DataType.EVENT_BUFFER));
        Assertions.assertThat((boolean)this.readView.getAvailabilityAndBacklog(false).isAvailable()).isFalse();
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        Assertions.assertThat((boolean)this.readView.getAvailabilityAndBacklog(false).isAvailable()).isFalse();
        Assertions.assertThat((long)this.subpartition.getTotalNumberOfBuffersUnsafe()).isEqualTo(5L);
        Assertions.assertThat((int)this.subpartition.getBuffersInBacklogUnsafe()).isOne();
        Assertions.assertThat((long)this.subpartition.getTotalNumberOfBytesUnsafe()).isEqualTo(65536L);
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isOne();
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 32768, true, 0, true, true);
        Assertions.assertThat((long)this.subpartition.getTotalNumberOfBytesUnsafe()).isEqualTo(98304L);
        Assertions.assertThat((int)this.subpartition.getBuffersInBacklogUnsafe()).isZero();
        PipelinedSubpartitionWithReadViewTest.assertNextEvent((ResultSubpartitionView)this.readView, 32768, null, false, 0, false, true);
        Assertions.assertThat((long)this.subpartition.getTotalNumberOfBytesUnsafe()).isEqualTo(131072L);
        Assertions.assertThat((int)this.subpartition.getBuffersInBacklogUnsafe()).isZero();
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 32768, false, 0, false, true);
        Assertions.assertThat((long)this.subpartition.getTotalNumberOfBytesUnsafe()).isEqualTo(163840L);
        Assertions.assertThat((int)this.subpartition.getBuffersInBacklogUnsafe()).isZero();
        PipelinedSubpartitionWithReadViewTest.assertNoNextBuffer((ResultSubpartitionView)this.readView);
        Assertions.assertThat((int)this.subpartition.getBuffersInBacklogUnsafe()).isZero();
        Assertions.assertThat((long)this.subpartition.getTotalNumberOfBuffersUnsafe()).isEqualTo(5L);
        Assertions.assertThat((long)this.subpartition.getTotalNumberOfBytesUnsafe()).isEqualTo(163840L);
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isOne();
    }

    @TestTemplate
    void testBarrierOvertaking() throws Exception {
        RecordingChannelStateWriter channelStateWriter = new RecordingChannelStateWriter();
        this.subpartition.setChannelStateWriter((ChannelStateWriter)channelStateWriter);
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1));
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isZero();
        Assertions.assertThat((long)this.availablityListener.getNumPriorityEvents()).isZero();
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(2));
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isOne();
        Assertions.assertThat((long)this.availablityListener.getNumPriorityEvents()).isZero();
        BufferConsumer eventBuffer = EventSerializer.toBufferConsumer((AbstractEvent)EndOfSuperstepEvent.INSTANCE, (boolean)false);
        this.subpartition.add(eventBuffer);
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isOne();
        Assertions.assertThat((long)this.availablityListener.getNumPriorityEvents()).isZero();
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4));
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isOne();
        Assertions.assertThat((long)this.availablityListener.getNumPriorityEvents()).isZero();
        CheckpointOptions options = CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)new CheckpointStorageLocationReference(new byte[]{0, 1, 2}));
        channelStateWriter.start(0L, options);
        BufferConsumer barrierBuffer = EventSerializer.toBufferConsumer((AbstractEvent)new CheckpointBarrier(0L, 0L, options), (boolean)true);
        this.subpartition.add(barrierBuffer);
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isOne();
        Assertions.assertThat((long)this.availablityListener.getNumPriorityEvents()).isOne();
        List inflight = channelStateWriter.getAddedOutput().get((Object)this.subpartition.getSubpartitionInfo());
        Assertions.assertThat(inflight.stream().map(Buffer::getSize).collect(Collectors.toList())).containsExactly((Object[])new Integer[]{1, 2, 4});
        inflight.forEach(Buffer::recycleBuffer);
        PipelinedSubpartitionWithReadViewTest.assertNextEvent((ResultSubpartitionView)this.readView, barrierBuffer.getWrittenBytes(), CheckpointBarrier.class, true, 2, false, true);
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 1, true, 1, false, true);
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 2, true, 0, true, true);
        PipelinedSubpartitionWithReadViewTest.assertNextEvent((ResultSubpartitionView)this.readView, eventBuffer.getWrittenBytes(), EndOfSuperstepEvent.class, false, 0, false, true);
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 4, false, 0, false, true);
        PipelinedSubpartitionWithReadViewTest.assertNoNextBuffer((ResultSubpartitionView)this.readView);
    }

    @TestTemplate
    void testAvailabilityAfterPriority() throws Exception {
        this.subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP);
        CheckpointOptions options = CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)new CheckpointStorageLocationReference(new byte[]{0, 1, 2}));
        BufferConsumer barrierBuffer = EventSerializer.toBufferConsumer((AbstractEvent)new CheckpointBarrier(0L, 0L, options), (boolean)true);
        this.subpartition.add(barrierBuffer);
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isOne();
        Assertions.assertThat((long)this.availablityListener.getNumPriorityEvents()).isOne();
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1));
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isEqualTo(2L);
        Assertions.assertThat((long)this.availablityListener.getNumPriorityEvents()).isOne();
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(2));
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isEqualTo(2L);
        Assertions.assertThat((long)this.availablityListener.getNumPriorityEvents()).isOne();
        PipelinedSubpartitionWithReadViewTest.assertNextEvent((ResultSubpartitionView)this.readView, barrierBuffer.getWrittenBytes(), CheckpointBarrier.class, true, 1, false, true);
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 1, false, 0, false, true);
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 2, false, 0, false, true);
        PipelinedSubpartitionWithReadViewTest.assertNoNextBuffer((ResultSubpartitionView)this.readView);
    }

    @TestTemplate
    void testBacklogConsistentWithNumberOfConsumableBuffers() throws Exception {
        this.testBacklogConsistentWithNumberOfConsumableBuffers(false, false);
    }

    @TestTemplate
    void testBacklogConsistentWithConsumableBuffersForFlushedPartition() throws Exception {
        this.testBacklogConsistentWithNumberOfConsumableBuffers(true, false);
    }

    @TestTemplate
    void testBacklogConsistentWithConsumableBuffersForFinishedPartition() throws Exception {
        this.testBacklogConsistentWithNumberOfConsumableBuffers(false, true);
    }

    private void testBacklogConsistentWithNumberOfConsumableBuffers(boolean isFlushRequested, boolean isFinished) throws Exception {
        int numberOfAddedBuffers = 5;
        for (int i = 1; i <= 5; ++i) {
            if (i < 5 || isFinished) {
                this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1024));
                continue;
            }
            this.subpartition.add(BufferBuilderTestUtils.createFilledUnfinishedBufferConsumer(1024));
        }
        if (isFlushRequested) {
            this.subpartition.flush();
        }
        if (isFinished) {
            this.subpartition.finish();
        }
        int backlog = this.subpartition.getBuffersInBacklogUnsafe();
        int numberOfConsumableBuffers = 0;
        try (CloseableRegistry closeableRegistry = new CloseableRegistry();){
            while (this.readView.getAvailabilityAndBacklog(true).isAvailable()) {
                ResultSubpartition.BufferAndBacklog bufferAndBacklog = this.readView.getNextBuffer();
                Assertions.assertThat((Object)bufferAndBacklog).isNotNull();
                if (bufferAndBacklog.buffer().isBuffer()) {
                    ++numberOfConsumableBuffers;
                }
                closeableRegistry.registerCloseable(() -> ((Buffer)bufferAndBacklog.buffer()).recycleBuffer());
            }
            Assertions.assertThat((int)backlog).isEqualTo(numberOfConsumableBuffers);
        }
    }

    @TestTemplate
    void testResumeBlockedSubpartitionWithEvents() throws IOException, InterruptedException {
        this.blockSubpartitionByCheckpoint(1);
        this.subpartition.add(BufferBuilderTestUtils.createEventBufferConsumer(32768, Buffer.DataType.EVENT_BUFFER));
        this.checkNumNotificationsAndAvailability(1);
        this.resumeConsumptionAndCheckAvailability(0, true);
        PipelinedSubpartitionWithReadViewTest.assertNextEvent((ResultSubpartitionView)this.readView, 32768, null, false, 0, false, true);
    }

    @TestTemplate
    void testResumeBlockedSubpartitionWithUnfinishedBufferFlushed() throws IOException, InterruptedException {
        this.blockSubpartitionByCheckpoint(1);
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        this.subpartition.flush();
        this.checkNumNotificationsAndAvailability(1);
        this.resumeConsumptionAndCheckAvailability(Integer.MAX_VALUE, true);
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 32768, false, 0, false, true);
    }

    @TestTemplate
    void testResumeBlockedSubpartitionWithUnfinishedBufferNotFlushed() throws IOException, InterruptedException {
        this.blockSubpartitionByCheckpoint(1);
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        this.checkNumNotificationsAndAvailability(1);
        this.resumeConsumptionAndCheckAvailability(Integer.MAX_VALUE, false);
    }

    @TestTemplate
    void testResumeBlockedSubpartitionWithFinishedBuffers() throws IOException, InterruptedException {
        this.blockSubpartitionByCheckpoint(1);
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        this.checkNumNotificationsAndAvailability(1);
        this.resumeConsumptionAndCheckAvailability(Integer.MAX_VALUE, true);
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 32768, false, 0, false, true);
        PipelinedSubpartitionWithReadViewTest.assertNextBuffer((ResultSubpartitionView)this.readView, 32768, false, 0, false, true);
    }

    @TestTemplate
    void testResumeBlockedEmptySubpartition() throws IOException, InterruptedException {
        this.blockSubpartitionByCheckpoint(1);
        this.resumeConsumptionAndCheckAvailability(Integer.MAX_VALUE, false);
        PipelinedSubpartitionWithReadViewTest.assertNoNextBuffer((ResultSubpartitionView)this.readView);
    }

    private void blockSubpartitionByCheckpoint(int numNotifications) throws IOException, InterruptedException {
        this.subpartition.add(BufferBuilderTestUtils.createEventBufferConsumer(32768, Buffer.DataType.ALIGNED_CHECKPOINT_BARRIER));
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isEqualTo((long)numNotifications);
        PipelinedSubpartitionWithReadViewTest.assertNextEvent((ResultSubpartitionView)this.readView, 32768, null, false, 0, false, true);
    }

    private void checkNumNotificationsAndAvailability(int numNotifications) throws IOException, InterruptedException {
        Assertions.assertThat((long)this.availablityListener.getNumNotifications()).isEqualTo((long)numNotifications);
        Assertions.assertThat((boolean)this.readView.getAvailabilityAndBacklog(true).isAvailable()).isFalse();
        PipelinedSubpartitionWithReadViewTest.assertNoNextBuffer((ResultSubpartitionView)this.readView);
    }

    private void resumeConsumptionAndCheckAvailability(int availableCredit, boolean dataAvailable) {
        this.readView.resumeConsumption();
        Assertions.assertThat((boolean)this.readView.getAvailabilityAndBacklog(availableCredit > 0).isAvailable()).isEqualTo(dataAvailable);
    }

    static void assertNextBuffer(ResultSubpartitionView readView, int expectedReadableBufferSize, boolean expectedIsDataAvailable, int expectedBuffersInBacklog, boolean expectedIsEventAvailable, boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
        PipelinedSubpartitionWithReadViewTest.assertNextBufferOrEvent(readView, expectedReadableBufferSize, true, null, expectedIsDataAvailable, expectedBuffersInBacklog, expectedIsEventAvailable, expectedRecycledAfterRecycle);
    }

    static void assertNextEvent(ResultSubpartitionView readView, int expectedReadableBufferSize, Class<? extends AbstractEvent> expectedEventClass, boolean expectedIsDataAvailable, int expectedBuffersInBacklog, boolean expectedIsEventAvailable, boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
        PipelinedSubpartitionWithReadViewTest.assertNextBufferOrEvent(readView, expectedReadableBufferSize, false, expectedEventClass, expectedIsDataAvailable, expectedBuffersInBacklog, expectedIsEventAvailable, expectedRecycledAfterRecycle);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void assertNextBufferOrEvent(ResultSubpartitionView readView, int expectedReadableBufferSize, boolean expectedIsBuffer, @Nullable Class<? extends AbstractEvent> expectedEventClass, boolean expectedIsDataAvailable, int expectedBuffersInBacklog, boolean expectedIsEventAvailable, boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
        Preconditions.checkArgument((expectedEventClass == null || !expectedIsBuffer ? 1 : 0) != 0);
        ResultSubpartition.BufferAndBacklog bufferAndBacklog = readView.getNextBuffer();
        Assertions.assertThat((Object)bufferAndBacklog).isNotNull();
        try {
            ((AbstractIntegerAssert)Assertions.assertThat((int)bufferAndBacklog.buffer().readableBytes()).as("buffer size", new Object[0])).isEqualTo(expectedReadableBufferSize);
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)bufferAndBacklog.buffer().isBuffer()).as("buffer or event", new Object[0])).isEqualTo(expectedIsBuffer);
            if (expectedEventClass != null) {
                Assertions.assertThat((Object)EventSerializer.fromBuffer((Buffer)bufferAndBacklog.buffer(), (ClassLoader)ClassLoader.getSystemClassLoader())).isInstanceOf(expectedEventClass);
            }
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)bufferAndBacklog.isDataAvailable()).as("data available", new Object[0])).isEqualTo(expectedIsDataAvailable);
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)readView.getAvailabilityAndBacklog(true).isAvailable()).as("data available", new Object[0])).isEqualTo(expectedIsDataAvailable);
            ((AbstractIntegerAssert)Assertions.assertThat((int)bufferAndBacklog.buffersInBacklog()).as("backlog", new Object[0])).isEqualTo(expectedBuffersInBacklog);
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)bufferAndBacklog.isEventAvailable()).as("event available", new Object[0])).isEqualTo(expectedIsEventAvailable);
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)readView.getAvailabilityAndBacklog(false).isAvailable()).as("event available", new Object[0])).isEqualTo(expectedIsEventAvailable);
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)bufferAndBacklog.buffer().isRecycled()).as("not recycled", new Object[0])).isFalse();
        }
        finally {
            bufferAndBacklog.buffer().recycleBuffer();
        }
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)bufferAndBacklog.buffer().isRecycled()).as("recycled", new Object[0])).isEqualTo(expectedRecycledAfterRecycle);
    }

    static void assertNoNextBuffer(ResultSubpartitionView readView) throws IOException, InterruptedException {
        Assertions.assertThat((Object)readView.getNextBuffer()).isNull();
    }

    void setup(ResultPartitionType resultPartitionType) throws IOException {
        this.resultPartition = PartitionTestUtils.createPartition(resultPartitionType, (FileChannelManager)NoOpFileChannelManager.INSTANCE, this.compressionEnabled, 32768);
        this.resultPartition.setup();
    }
}

