/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.TestingPartitionRequestClient;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferListener;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.buffer.NoOpBufferPool;
import org.apache.flink.runtime.io.network.partition.AvailabilityUtil;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
import org.apache.flink.shaded.guava33.com.google.common.collect.Lists;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AtomicBooleanAssert;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

class RemoteInputChannelTest {
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();
    private static final long CHECKPOINT_ID = 1L;
    private static final CheckpointOptions UNALIGNED = CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault());
    private static final CheckpointOptions ALIGNED_WITH_TIMEOUT = CheckpointOptions.alignedWithTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (long)10L);

    RemoteInputChannelTest() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testGateNotifiedOnBarrierConversion() throws IOException, InterruptedException {
        boolean sequenceNumber = false;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(1, 4096);
        try {
            SingleInputGate inputGate = new SingleInputGateBuilder().setBufferPoolFactory(networkBufferPool.createBufferPool(1, 1)).build();
            inputGate.setup();
            RemoteInputChannel channel = InputChannelBuilder.newBuilder().setConnectionManager(new TestVerifyConnectionManager(new TestVerifyPartitionRequestClient())).buildRemoteChannel(inputGate);
            channel.requestSubpartitions();
            channel.onBuffer(EventSerializer.toBuffer((AbstractEvent)new CheckpointBarrier(1L, 123L, CheckpointOptions.alignedWithTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (long)Integer.MAX_VALUE)), (boolean)false), 0, 0, 0);
            inputGate.pollNext();
            channel.convertToPriorityEvent(0);
            Assertions.assertThat((CompletableFuture)inputGate.getPriorityEventAvailableFuture()).isDone();
        }
        finally {
            networkBufferPool.destroy();
        }
    }

    @Test
    void testExceptionOnReordering() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        RemoteInputChannel inputChannel = this.createRemoteInputChannel(inputGate);
        Buffer buffer = TestBufferFactory.createBuffer(32768);
        inputChannel.onBuffer(buffer.retainBuffer(), 0, -1, 0);
        inputChannel.onBuffer(buffer, 29, -1, 0);
        Assertions.assertThatThrownBy(() -> ((RemoteInputChannel)inputChannel).getNextBuffer()).withFailMessage("Did not throw expected exception after enqueuing an out-of-order buffer.", new Object[0]);
        Assertions.assertThat((boolean)buffer.isRecycled()).isFalse();
        inputChannel.releaseAllResources();
        Assertions.assertThat((boolean)buffer.isRecycled()).isTrue();
    }

    @Test
    void testExceptionOnPersisting() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        RemoteInputChannel inputChannel = InputChannelBuilder.newBuilder().setStateWriter((ChannelStateWriter)new ChannelStateWriter.NoOpChannelStateWriter(){

            public void addInputData(long checkpointId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> data) {
                try {
                    data.close();
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
                throw new ExpectedTestException();
            }
        }).buildRemoteChannel(inputGate);
        inputChannel.checkpointStarted(new CheckpointBarrier(42L, System.currentTimeMillis(), CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault())));
        Buffer buffer = TestBufferFactory.createBuffer(32768);
        Assertions.assertThat((boolean)buffer.isRecycled()).isFalse();
        Assertions.assertThatThrownBy(() -> inputChannel.onBuffer(buffer, 0, -1, 0)).isInstanceOf(ExpectedTestException.class);
        Assertions.assertThat((boolean)buffer.isRecycled()).isFalse();
        inputChannel.releaseAllResources();
        Assertions.assertThat((boolean)buffer.isRecycled()).isTrue();
    }

    @Test
    void testConcurrentOnBufferAndRelease() throws Exception {
        this.testConcurrentReleaseAndSomething(8192, (inputChannel, buffer, j) -> {
            inputChannel.onBuffer(buffer, j.intValue(), -1, 0);
            return true;
        });
    }

    @Test
    void testConcurrentNotifyBufferAvailableAndRelease() throws Exception {
        this.testConcurrentReleaseAndSomething(1024, (inputChannel, buffer, j) -> inputChannel.getBufferManager().notifyBufferAvailable(buffer));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testConcurrentReleaseAndSomething(int numberOfRepetitions, TriFunction<RemoteInputChannel, Buffer, Integer, Boolean> function) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        Buffer buffer = TestBufferFactory.createBuffer(32768);
        try {
            SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
            for (int i = 0; i < numberOfRepetitions; ++i) {
                RemoteInputChannel inputChannel = this.createRemoteInputChannel(inputGate);
                Callable<Void> enqueueTask = () -> {
                    do {
                        for (int j = 0; j < 128; ++j) {
                            if (((Boolean)function.apply(inputChannel, buffer.retainBuffer(), j)).booleanValue()) continue;
                            buffer.recycleBuffer();
                        }
                    } while (!inputChannel.isReleased());
                    return null;
                };
                Callable<Void> releaseTask = () -> {
                    inputChannel.releaseAllResources();
                    return null;
                };
                ArrayList results = Lists.newArrayListWithCapacity((int)2);
                results.add(executor.submit(enqueueTask));
                results.add(executor.submit(releaseTask));
                for (Future result : results) {
                    result.get();
                }
                ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfQueuedBuffers()).withFailMessage("Resource leak during concurrent release and notifyBufferAvailable.", new Object[0])).isZero();
            }
        }
        finally {
            executor.shutdown();
            Assertions.assertThat((boolean)buffer.isRecycled()).isFalse();
            buffer.recycleBuffer();
            Assertions.assertThat((boolean)buffer.isRecycled()).isTrue();
        }
    }

    @Test
    void testRetriggerWithoutPartitionRequest() {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        RemoteInputChannel ch = this.createRemoteInputChannel(inputGate, 0, 500, 3000);
        Assertions.assertThatThrownBy(() -> ((RemoteInputChannel)ch).retriggerSubpartitionRequest()).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testPartitionRequestExponentialBackoff() throws Exception {
        int[] expectedDelays = new int[]{500, 1000, 1500, 2000};
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        ResultPartitionID partitionId = new ResultPartitionID();
        TestVerifyPartitionRequestClient client = new TestVerifyPartitionRequestClient();
        TestVerifyConnectionManager connectionManager = new TestVerifyConnectionManager(client);
        RemoteInputChannel ch = this.createRemoteInputChannel(inputGate, connectionManager, partitionId, 500, 1000);
        ch.requestSubpartitions();
        client.verifyResult(partitionId, 0, 0);
        for (int expected : expectedDelays) {
            ch.retriggerSubpartitionRequest();
            Assertions.assertThat((int)ch.getCurrentBackoff()).isEqualTo(expected);
        }
        Assertions.assertThatThrownBy(() -> {
            ch.retriggerSubpartitionRequest();
            ch.getNextBuffer();
        }).isInstanceOf(IOException.class);
    }

    @Test
    void testPartitionRequestSingleBackoff() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        ResultPartitionID partitionId = new ResultPartitionID();
        TestVerifyPartitionRequestClient client = new TestVerifyPartitionRequestClient();
        TestVerifyConnectionManager connectionManager = new TestVerifyConnectionManager(client);
        RemoteInputChannel ch = this.createRemoteInputChannel(inputGate, connectionManager, partitionId, 500, 500);
        ch.requestSubpartitions();
        client.verifyResult(partitionId, 0, 0);
        ch.retriggerSubpartitionRequest();
        Assertions.assertThat((int)ch.getCurrentBackoff()).isEqualTo(500);
        ch.retriggerSubpartitionRequest();
        Assertions.assertThatThrownBy(() -> ((RemoteInputChannel)ch).getNextBuffer()).isInstanceOf(IOException.class);
    }

    @Test
    void testPartitionRequestNoBackoff() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        ResultPartitionID partitionId = new ResultPartitionID();
        TestVerifyPartitionRequestClient client = new TestVerifyPartitionRequestClient();
        TestVerifyConnectionManager connectionManager = new TestVerifyConnectionManager(client);
        RemoteInputChannel ch = this.createRemoteInputChannel(inputGate, connectionManager, partitionId, 0, 0);
        ch.requestSubpartitions();
        client.verifyResult(partitionId, 0, 0);
        ch.retriggerSubpartitionRequest();
        Assertions.assertThatThrownBy(() -> ((RemoteInputChannel)ch).getNextBuffer()).isInstanceOf(IOException.class);
    }

    @Test
    void testOnFailedPartitionRequest() {
        ResultPartitionID partitionId = new ResultPartitionID();
        TestPartitionProducerStateProvider provider = new TestPartitionProducerStateProvider(partitionId);
        SingleInputGate inputGate = new SingleInputGateBuilder().setPartitionProducerStateProvider(provider).build();
        RemoteInputChannel ch = InputChannelBuilder.newBuilder().setPartitionId(partitionId).buildRemoteChannel(inputGate);
        ch.onFailedPartitionRequest();
        Assertions.assertThat((boolean)provider.isInvoked()).isTrue();
    }

    @Test
    void testProducerFailedException() throws Exception {
        ConnectionManager connManager = (ConnectionManager)Mockito.mock(ConnectionManager.class);
        Mockito.when((Object)connManager.createPartitionRequestClient((ConnectionID)ArgumentMatchers.any(ConnectionID.class))).thenReturn((Object)((PartitionRequestClient)Mockito.mock(PartitionRequestClient.class)));
        SingleInputGate gate = InputChannelTestUtils.createSingleInputGate(1);
        RemoteInputChannel ch = InputChannelTestUtils.createRemoteInputChannel(gate, 0, connManager);
        ch.onError((Throwable)new ProducerFailedException((Throwable)new RuntimeException("Expected test exception.")));
        ch.requestSubpartitions();
        Assertions.assertThatThrownBy(() -> ((RemoteInputChannel)ch).getNextBuffer()).isInstanceOf(CancelTaskException.class);
    }

    @Test
    void testPartitionConnectionException() {
        TestingExceptionConnectionManager connManager = new TestingExceptionConnectionManager();
        SingleInputGate gate = InputChannelTestUtils.createSingleInputGate(1);
        RemoteInputChannel ch = InputChannelTestUtils.createRemoteInputChannel(gate, 0, connManager);
        gate.setInputChannels(new InputChannel[]{ch});
        gate.requestPartitions();
        Assertions.assertThatThrownBy(() -> ((RemoteInputChannel)ch).getNextBuffer()).isInstanceOf(PartitionConnectionException.class);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testAvailableBuffersLessThanRequiredBuffers() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(16, 32);
        int numFloatingBuffers = 14;
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = this.createRemoteInputChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        Throwable thrown = null;
        try {
            BufferPool bufferPool = (BufferPool)Mockito.spy((Object)networkBufferPool.createBufferPool(14, 14));
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartitions();
            Buffer exclusiveBuffer = inputChannel.requestBuffer();
            Assertions.assertThat((Object)exclusiveBuffer).isNotNull();
            int numRecycleFloatingBuffers = 2;
            ArrayDeque<Buffer> floatingBufferQueue = new ArrayDeque<Buffer>(2);
            for (int i = 0; i < 2; ++i) {
                Buffer floatingBuffer = bufferPool.requestBuffer();
                Assertions.assertThat((Object)floatingBuffer).isNotNull();
                floatingBufferQueue.add(floatingBuffer);
            }
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)2))).requestBuffer();
            inputChannel.onSenderBacklog(14);
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)15))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)1))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfAvailableBuffers()).withFailMessage("There should be 13 buffers available in the channel", new Object[0])).isEqualTo(13);
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfRequiredBuffers()).withFailMessage("There should be 16 buffers required in the channel", new Object[0])).isEqualTo(16);
            ((AbstractIntegerAssert)Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).withFailMessage("There should be 0 buffers available in local pool", new Object[0])).isZero();
            Assertions.assertThat((boolean)inputChannel.isWaitingForFloatingBuffers()).isTrue();
            inputChannel.onSenderBacklog(16);
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)15))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)1))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfAvailableBuffers()).withFailMessage("There should be 13 buffers available in the channel", new Object[0])).isEqualTo(13);
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfRequiredBuffers()).withFailMessage("There should be 18 buffers required in the channel", new Object[0])).isEqualTo(18);
            ((AbstractIntegerAssert)Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).withFailMessage("There should be 0 buffers available in local pool", new Object[0])).isZero();
            Assertions.assertThat((boolean)inputChannel.isWaitingForFloatingBuffers()).isTrue();
            exclusiveBuffer.recycleBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)15))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)1))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfAvailableBuffers()).withFailMessage("There should be 14 buffers available in the channel", new Object[0])).isEqualTo(14);
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfRequiredBuffers()).withFailMessage("There should be 18 buffers required in the channel", new Object[0])).isEqualTo(18);
            ((AbstractIntegerAssert)Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).withFailMessage("There should be 0 buffers available in local pool", new Object[0])).isZero();
            Assertions.assertThat((boolean)inputChannel.isWaitingForFloatingBuffers()).isTrue();
            ((Buffer)floatingBufferQueue.poll()).recycleBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)16))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)2))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfAvailableBuffers()).withFailMessage("There should be 15 buffers available in the channel", new Object[0])).isEqualTo(15);
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfRequiredBuffers()).withFailMessage("There should be 18 buffers required in the channel", new Object[0])).isEqualTo(18);
            ((AbstractIntegerAssert)Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).withFailMessage("There should be 0 buffers available in local pool", new Object[0])).isZero();
            Assertions.assertThat((boolean)inputChannel.isWaitingForFloatingBuffers()).isTrue();
            inputChannel.onSenderBacklog(13);
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)16))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)2))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfAvailableBuffers()).withFailMessage("There should be 15 buffers available in the channel", new Object[0])).isEqualTo(15);
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfRequiredBuffers()).withFailMessage("There should be 15 buffers required in the channel", new Object[0])).isEqualTo(15);
            ((AbstractIntegerAssert)Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).withFailMessage("There should be 0 buffers available in local pool", new Object[0])).isZero();
            Assertions.assertThat((boolean)inputChannel.isWaitingForFloatingBuffers()).isTrue();
            ((Buffer)floatingBufferQueue.poll()).recycleBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)16))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)2))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfAvailableBuffers()).withFailMessage("There should be 15 buffers available in the channel", new Object[0])).isEqualTo(15);
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfRequiredBuffers()).withFailMessage("There should be 15 buffers required in the channel", new Object[0])).isEqualTo(15);
            ((AbstractIntegerAssert)Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).withFailMessage("There should be 1 buffers available in local pool", new Object[0])).isOne();
            Assertions.assertThat((boolean)inputChannel.isWaitingForFloatingBuffers()).isFalse();
            inputChannel.onSenderBacklog(15);
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)18))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)3))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfAvailableBuffers()).withFailMessage("There should be 16 buffers available in the channel", new Object[0])).isEqualTo(16);
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfRequiredBuffers()).withFailMessage("There should be 17 buffers required in the channel", new Object[0])).isEqualTo(17);
            ((AbstractIntegerAssert)Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).withFailMessage("There should be 0 buffers available in local pool", new Object[0])).isZero();
            Assertions.assertThat((boolean)inputChannel.isWaitingForFloatingBuffers()).isTrue();
        }
        catch (Throwable t) {
            try {
                thrown = t;
            }
            catch (Throwable throwable) {
                RemoteInputChannelTest.cleanup(networkBufferPool, null, null, thrown, new InputChannel[]{inputChannel});
                throw throwable;
            }
            RemoteInputChannelTest.cleanup(networkBufferPool, null, null, thrown, new InputChannel[]{inputChannel});
        }
        RemoteInputChannelTest.cleanup(networkBufferPool, null, null, thrown, new InputChannel[]{inputChannel});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testAvailableBuffersEqualToRequiredBuffers() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(16, 32);
        int numFloatingBuffers = 14;
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = this.createRemoteInputChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        Throwable thrown = null;
        try {
            BufferPool bufferPool = (BufferPool)Mockito.spy((Object)networkBufferPool.createBufferPool(14, 14));
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartitions();
            Buffer exclusiveBuffer = inputChannel.requestBuffer();
            Assertions.assertThat((Object)exclusiveBuffer).isNotNull();
            Buffer floatingBuffer = bufferPool.requestBuffer();
            Assertions.assertThat((Object)floatingBuffer).isNotNull();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)1))).requestBuffer();
            inputChannel.onSenderBacklog(12);
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)14))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)0))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfAvailableBuffers()).withFailMessage("There should be 14 buffers available in the channel", new Object[0])).isEqualTo(14);
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfRequiredBuffers()).withFailMessage("There should be 14 buffers required in the channel", new Object[0])).isEqualTo(14);
            ((AbstractIntegerAssert)Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).withFailMessage("There should be 0 buffers available in local pool", new Object[0])).isZero();
            floatingBuffer.recycleBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)14))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)0))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfAvailableBuffers()).withFailMessage("There should be 14 buffers available in the channel", new Object[0])).isEqualTo(14);
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfRequiredBuffers()).withFailMessage("There should be 14 buffers required in the channel", new Object[0])).isEqualTo(14);
            ((AbstractIntegerAssert)Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).withFailMessage("There should be 1 buffers available in local pool", new Object[0])).isOne();
            exclusiveBuffer.recycleBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)14))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)0))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfAvailableBuffers()).withFailMessage("There should be 14 buffers available in the channel", new Object[0])).isEqualTo(14);
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfRequiredBuffers()).withFailMessage("There should be 14 buffers required in the channel", new Object[0])).isEqualTo(14);
            ((AbstractIntegerAssert)Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).withFailMessage("There should be 2 buffers available in local pool", new Object[0])).isEqualTo(2);
        }
        catch (Throwable t) {
            try {
                thrown = t;
            }
            catch (Throwable throwable) {
                RemoteInputChannelTest.cleanup(networkBufferPool, null, null, thrown, new InputChannel[]{inputChannel});
                throw throwable;
            }
            RemoteInputChannelTest.cleanup(networkBufferPool, null, null, thrown, new InputChannel[]{inputChannel});
        }
        RemoteInputChannelTest.cleanup(networkBufferPool, null, null, thrown, new InputChannel[]{inputChannel});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testAvailableBuffersMoreThanRequiredBuffers() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(16, 32);
        int numFloatingBuffers = 14;
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = this.createRemoteInputChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        Throwable thrown = null;
        try {
            BufferPool bufferPool = (BufferPool)Mockito.spy((Object)networkBufferPool.createBufferPool(14, 14));
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartitions();
            Buffer exclusiveBuffer = inputChannel.requestBuffer();
            Assertions.assertThat((Object)exclusiveBuffer).isNotNull();
            Buffer floatingBuffer = bufferPool.requestBuffer();
            Assertions.assertThat((Object)floatingBuffer).isNotNull();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)1))).requestBuffer();
            inputChannel.onSenderBacklog(12);
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)14))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)0))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfAvailableBuffers()).withFailMessage("There should be 14 buffers available in the channel", new Object[0])).isEqualTo(14);
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfRequiredBuffers()).withFailMessage("There should be 14 buffers required in the channel", new Object[0])).isEqualTo(14);
            ((AbstractIntegerAssert)Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).withFailMessage("There should be 0 buffers available in local pool", new Object[0])).isZero();
            inputChannel.onSenderBacklog(10);
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)14))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)0))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfAvailableBuffers()).withFailMessage("There should be 14 buffers available in the channel", new Object[0])).isEqualTo(14);
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfRequiredBuffers()).withFailMessage("There should be 12 buffers required in the channel", new Object[0])).isEqualTo(12);
            ((AbstractIntegerAssert)Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).withFailMessage("There should be 0 buffers available in local pool", new Object[0])).isZero();
            exclusiveBuffer.recycleBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)14))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)0))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfAvailableBuffers()).withFailMessage("There should be 14 buffers available in the channel", new Object[0])).isEqualTo(14);
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfRequiredBuffers()).withFailMessage("There should be 12 buffers required in the channel", new Object[0])).isEqualTo(12);
            ((AbstractIntegerAssert)Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).withFailMessage("There should be 1 buffers available in local pool", new Object[0])).isOne();
            floatingBuffer.recycleBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)14))).requestBuffer();
            ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)0))).addBufferListener((BufferListener)inputChannel.getBufferManager());
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfAvailableBuffers()).withFailMessage("There should be 14 buffers available in the channel", new Object[0])).isEqualTo(14);
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfRequiredBuffers()).withFailMessage("There should be 12 buffers required in the channel", new Object[0])).isEqualTo(12);
            ((AbstractIntegerAssert)Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).withFailMessage("There should be 2 buffers available in local pool", new Object[0])).isEqualTo(2);
        }
        catch (Throwable t) {
            try {
                thrown = t;
            }
            catch (Throwable throwable) {
                RemoteInputChannelTest.cleanup(networkBufferPool, null, null, thrown, new InputChannel[]{inputChannel});
                throw throwable;
            }
            RemoteInputChannelTest.cleanup(networkBufferPool, null, null, thrown, new InputChannel[]{inputChannel});
        }
        RemoteInputChannelTest.cleanup(networkBufferPool, null, null, thrown, new InputChannel[]{inputChannel});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testFairDistributionFloatingBuffers() throws Exception {
        int numExclusiveBuffers = 2;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32);
        int numFloatingBuffers = 3;
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(3, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel[] inputChannels = new RemoteInputChannel[]{this.createRemoteInputChannel(inputGate), this.createRemoteInputChannel(inputGate), this.createRemoteInputChannel(inputGate)};
        inputGate.setInputChannels((InputChannel[])inputChannels);
        Throwable thrown = null;
        try {
            BufferPool bufferPool = (BufferPool)Mockito.spy((Object)networkBufferPool.createBufferPool(3, 3));
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputGate.requestPartitions();
            for (RemoteInputChannel inputChannel : inputChannels) {
                inputChannel.requestSubpartitions();
            }
            ArrayList<Buffer> floatingBuffers = new ArrayList<Buffer>(3);
            for (int i = 0; i < 3; ++i) {
                Buffer buffer = bufferPool.requestBuffer();
                Assertions.assertThat((Object)buffer).isNotNull();
                floatingBuffers.add(buffer);
            }
            for (Object inputChannel : inputChannels) {
                inputChannel.onSenderBacklog(8);
                ((BufferPool)Mockito.verify((Object)bufferPool, (VerificationMode)Mockito.times((int)1))).addBufferListener((BufferListener)inputChannel.getBufferManager());
                ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfAvailableBuffers()).withFailMessage("There should be %d buffers available in the channel", new Object[]{2})).isEqualTo(2);
            }
            for (Buffer buffer : floatingBuffers) {
                buffer.recycleBuffer();
            }
            for (Object inputChannel : inputChannels) {
                ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfAvailableBuffers()).withFailMessage("There should be 3 buffers available in the channel", new Object[0])).isEqualTo(3);
                ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getUnannouncedCredit()).withFailMessage("There should be 1 unannounced credits in the channel", new Object[0])).isOne();
            }
        }
        catch (Throwable t) {
            thrown = t;
        }
        finally {
            RemoteInputChannelTest.cleanup(networkBufferPool, null, null, thrown, (InputChannel[])inputChannels);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testFailureInNotifyBufferAvailable() throws Exception {
        boolean numExclusiveBuffers = true;
        boolean numFloatingBuffers = true;
        int numTotalBuffers = 2;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(2, 32);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        RemoteInputChannel successfulRemoteIC = this.createRemoteInputChannel(inputGate);
        successfulRemoteIC.requestSubpartitions();
        RemoteInputChannel failingRemoteIC = this.createRemoteInputChannel(inputGate);
        Buffer buffer = null;
        Throwable thrown = null;
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(1, 1);
            inputGate.setBufferPool(bufferPool);
            buffer = (Buffer)Preconditions.checkNotNull((Object)bufferPool.requestBuffer());
            failingRemoteIC.onSenderBacklog(1);
            successfulRemoteIC.onSenderBacklog(2);
            buffer.recycleBuffer();
            buffer = null;
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((RemoteInputChannel)failingRemoteIC).checkError()).isInstanceOf(IOException.class)).hasCauseInstanceOf(IllegalStateException.class);
            Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).isZero();
            buffer = successfulRemoteIC.requestBuffer();
            ((ObjectAssert)Assertions.assertThat((Object)buffer).withFailMessage("buffer should still remain in failingRemoteIC", new Object[0])).isNull();
            failingRemoteIC.releaseAllResources();
            Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).isZero();
            buffer = successfulRemoteIC.requestBuffer();
            ((ObjectAssert)Assertions.assertThat((Object)buffer).withFailMessage("no buffer given to successfulRemoteIC", new Object[0])).isNotNull();
        }
        catch (Throwable t) {
            try {
                thrown = t;
            }
            catch (Throwable throwable) {
                RemoteInputChannelTest.cleanup(networkBufferPool, null, buffer, thrown, new InputChannel[]{failingRemoteIC, successfulRemoteIC});
                throw throwable;
            }
            RemoteInputChannelTest.cleanup(networkBufferPool, null, buffer, thrown, new InputChannel[]{failingRemoteIC, successfulRemoteIC});
        }
        RemoteInputChannelTest.cleanup(networkBufferPool, null, buffer, thrown, new InputChannel[]{failingRemoteIC, successfulRemoteIC});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testConcurrentOnSenderBacklogAndRelease() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(130, 32);
        int numFloatingBuffers = 128;
        ExecutorService executor = Executors.newFixedThreadPool(2);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        final RemoteInputChannel inputChannel = this.createRemoteInputChannel(inputGate);
        Throwable thrown = null;
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(128, 128);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartitions();
            Callable<Void> requestBufferTask = new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    do {
                        for (int j = 1; j <= 128; ++j) {
                            inputChannel.onSenderBacklog(j);
                        }
                    } while (!inputChannel.isReleased());
                    return null;
                }
            };
            Callable<Void> releaseTask = new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    inputChannel.releaseAllResources();
                    return null;
                }
            };
            RemoteInputChannelTest.submitTasksAndWaitForResults(executor, new Callable[]{requestBufferTask, releaseTask});
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfAvailableBuffers()).withFailMessage("There should be no buffers available in the channel.", new Object[0])).isZero();
            ((AbstractIntegerAssert)Assertions.assertThat((int)(bufferPool.getNumberOfAvailableMemorySegments() + networkBufferPool.getNumberOfAvailableMemorySegments())).withFailMessage("There should be 130 buffers available in local pool.", new Object[0])).isEqualTo(130);
        }
        catch (Throwable t) {
            try {
                thrown = t;
            }
            catch (Throwable throwable) {
                RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
                throw throwable;
            }
            RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
        }
        RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testConcurrentOnSenderBacklogAndRecycle() throws Exception {
        int numExclusiveSegments = 120;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(248, 32);
        int numFloatingBuffers = 128;
        int backlog = 128;
        ExecutorService executor = Executors.newFixedThreadPool(3);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        final RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(inputGate, 120);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        Throwable thrown = null;
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(128, 128);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartitions();
            Callable<Void> requestBufferTask = new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    for (int j = 1; j <= 128; ++j) {
                        inputChannel.onSenderBacklog(j);
                    }
                    return null;
                }
            };
            RemoteInputChannelTest.submitTasksAndWaitForResults(executor, new Callable[]{this.recycleBufferTask(inputChannel, bufferPool, 120, 128), requestBufferTask});
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfAvailableBuffers()).withFailMessage("There should be %d buffers available in the channel.", new Object[]{128})).isEqualTo(inputChannel.getNumberOfRequiredBuffers());
            ((AbstractIntegerAssert)Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).withFailMessage("There should be no buffers available in local pool.", new Object[0])).isZero();
        }
        catch (Throwable t) {
            try {
                thrown = t;
            }
            catch (Throwable throwable) {
                RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
                throw throwable;
            }
            RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
        }
        RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testConcurrentRecycleAndRelease() throws Exception {
        int numExclusiveSegments = 120;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(248, 32);
        int numFloatingBuffers = 128;
        ExecutorService executor = Executors.newFixedThreadPool(3);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        final RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(inputGate, 120);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        Throwable thrown = null;
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(128, 128);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartitions();
            Callable<Void> releaseTask = new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    inputChannel.releaseAllResources();
                    return null;
                }
            };
            RemoteInputChannelTest.submitTasksAndWaitForResults(executor, new Callable[]{this.recycleBufferTask(inputChannel, bufferPool, 120, 128), releaseTask});
            ((AbstractIntegerAssert)Assertions.assertThat((int)inputChannel.getNumberOfAvailableBuffers()).withFailMessage("There should be no buffers available in the channel.", new Object[0])).isZero();
            ((AbstractIntegerAssert)Assertions.assertThat((int)bufferPool.getNumberOfAvailableMemorySegments()).withFailMessage("There should be %d buffers available in local pool.", new Object[]{128})).isEqualTo(128);
            ((AbstractIntegerAssert)Assertions.assertThat((int)networkBufferPool.getNumberOfAvailableMemorySegments()).withFailMessage("There should be %d buffers available in global pool.", new Object[]{120})).isEqualTo(120);
        }
        catch (Throwable t) {
            try {
                thrown = t;
            }
            catch (Throwable throwable) {
                RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
                throw throwable;
            }
            RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
        }
        RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testConcurrentRecycleAndRelease2() throws Exception {
        int retries = 1000;
        int numExclusiveBuffers = 2;
        int numFloatingBuffers = 2;
        int numTotalBuffers = 4;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(4, 32);
        ExecutorService executor = Executors.newFixedThreadPool(2);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = this.createRemoteInputChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        Throwable thrown = null;
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(2, 2);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartitions();
            Callable<Void> bufferPoolInteractionsTask = () -> {
                for (int i = 0; i < 1000; ++i) {
                    try (BufferBuilder bufferBuilder = bufferPool.requestBufferBuilderBlocking();){
                        Buffer buffer = BufferBuilderTestUtils.buildSingleBuffer(bufferBuilder);
                        buffer.recycleBuffer();
                        continue;
                    }
                }
                return null;
            };
            Callable<Void> channelInteractionsTask = () -> {
                ArrayList<Buffer> exclusiveBuffers = new ArrayList<Buffer>(2);
                ArrayList<Buffer> floatingBuffers = new ArrayList<Buffer>(2);
                try {
                    for (int i = 0; i < 1000; ++i) {
                        Buffer buffer;
                        for (int j = 0; j < 4 && (buffer = inputChannel.requestBuffer()) != null; ++j) {
                            if (buffer.getRecycler() == inputChannel.getBufferManager()) {
                                exclusiveBuffers.add(buffer);
                                continue;
                            }
                            floatingBuffers.add(buffer);
                        }
                        floatingBuffers.forEach(Buffer::recycleBuffer);
                        floatingBuffers.clear();
                        Assertions.assertThat(exclusiveBuffers).hasSize(2);
                        inputChannel.onSenderBacklog(0);
                        exclusiveBuffers.forEach(Buffer::recycleBuffer);
                        exclusiveBuffers.clear();
                    }
                }
                finally {
                    inputChannel.releaseAllResources();
                }
                return null;
            };
            RemoteInputChannelTest.submitTasksAndWaitForResults(executor, new Callable[]{bufferPoolInteractionsTask, channelInteractionsTask});
        }
        catch (Throwable t) {
            try {
                thrown = t;
            }
            catch (Throwable throwable) {
                RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
                throw throwable;
            }
            RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
        }
        RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testConcurrentGetNextBufferAndRelease() throws Exception {
        int numTotalBuffers = 1000;
        int numFloatingBuffers = 998;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(1000, 32);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)networkBufferPool);
        RemoteInputChannel inputChannel = this.createRemoteInputChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{inputChannel});
        ExecutorService executor = Executors.newFixedThreadPool(2);
        Throwable thrown = null;
        try {
            BufferPool bufferPool = networkBufferPool.createBufferPool(998, 998);
            inputGate.setBufferPool(bufferPool);
            inputGate.setupChannels();
            inputChannel.requestSubpartitions();
            for (int i = 0; i < 1000; ++i) {
                Buffer buffer = inputChannel.requestBuffer();
                inputChannel.onBuffer(buffer, i, 0, 0);
            }
            Callable<Void> getNextBufferTask = () -> {
                block3: {
                    try {
                        for (int i = 0; i < 1000; ++i) {
                            Optional bufferAndAvailability = inputChannel.getNextBuffer();
                            bufferAndAvailability.ifPresent(buffer -> buffer.buffer().recycleBuffer());
                        }
                    }
                    catch (Throwable t) {
                        if (inputChannel.isReleased()) break block3;
                        throw new AssertionError("Exceptions are expected here only if the input channel was released", t);
                    }
                }
                return null;
            };
            Callable<Void> releaseTask = () -> {
                inputChannel.releaseAllResources();
                return null;
            };
            RemoteInputChannelTest.submitTasksAndWaitForResults(executor, new Callable[]{getNextBufferTask, releaseTask});
        }
        catch (Throwable t) {
            try {
                thrown = t;
            }
            catch (Throwable throwable) {
                RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
                throw throwable;
            }
            RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
        }
        RemoteInputChannelTest.cleanup(networkBufferPool, executor, null, thrown, new InputChannel[]{inputChannel});
    }

    @Test
    void testPartitionNotFoundExceptionWhileRetriggeringRequest() throws Exception {
        RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(InputChannelTestUtils.createSingleInputGate(1), 0, new TestingConnectionManager());
        inputChannel.requestSubpartitions();
        inputChannel.retriggerSubpartitionRequest();
        Assertions.assertThatThrownBy(() -> ((RemoteInputChannel)inputChannel).checkError()).isInstanceOfSatisfying(PartitionNotFoundException.class, notFound -> Assertions.assertThat((Object)inputChannel.getPartitionId()).isEqualTo((Object)notFound.getPartitionId()));
    }

    @Test
    void testPartitionConnectionExceptionWhileRequestingPartition() throws Exception {
        RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(InputChannelTestUtils.createSingleInputGate(1), 0, new TestingExceptionConnectionManager());
        Assertions.assertThatThrownBy(() -> ((RemoteInputChannel)inputChannel).requestSubpartitions()).isInstanceOfSatisfying(PartitionConnectionException.class, ex -> Assertions.assertThat((Object)inputChannel.getPartitionId()).isEqualTo((Object)ex.getPartitionId()));
    }

    @Test
    void testUnblockReleasedChannel() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        RemoteInputChannel remoteChannel = this.createRemoteInputChannel(inputGate);
        remoteChannel.releaseAllResources();
        Assertions.assertThatThrownBy(() -> ((RemoteInputChannel)remoteChannel).resumeConsumption()).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testReleasedChannelAnnounceBufferSize() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        RemoteInputChannel remoteChannel = this.createRemoteInputChannel(inputGate);
        remoteChannel.releaseAllResources();
        Assertions.assertThatThrownBy(() -> remoteChannel.announceBufferSize(10)).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testOnUpstreamBlockedAndResumed() throws Exception {
        TestBufferPool bufferPool = new TestBufferPool();
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(bufferPool);
        RemoteInputChannel remoteChannel1 = this.createRemoteInputChannel(inputGate, 0, 2);
        RemoteInputChannel remoteChannel2 = this.createRemoteInputChannel(inputGate, 1, 0);
        inputGate.setup();
        remoteChannel1.requestSubpartitions();
        remoteChannel2.requestSubpartitions();
        remoteChannel1.onSenderBacklog(2);
        remoteChannel2.onSenderBacklog(2);
        Assertions.assertThat((int)remoteChannel1.getNumberOfAvailableBuffers()).isEqualTo(4);
        Assertions.assertThat((int)remoteChannel2.getNumberOfAvailableBuffers()).isEqualTo(2);
        Buffer barrier = EventSerializer.toBuffer((AbstractEvent)new CheckpointBarrier(1L, 123L, CheckpointOptions.alignedWithTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (long)Integer.MAX_VALUE)), (boolean)false);
        remoteChannel1.onBuffer(barrier, 0, 0, 0);
        remoteChannel2.onBuffer(barrier, 0, 0, 0);
        Assertions.assertThat((int)remoteChannel1.getNumberOfAvailableBuffers()).isEqualTo(4);
        Assertions.assertThat((int)remoteChannel2.getNumberOfAvailableBuffers()).isZero();
        remoteChannel1.resumeConsumption();
        remoteChannel2.resumeConsumption();
        Assertions.assertThat((int)remoteChannel1.getUnannouncedCredit()).isEqualTo(4);
        Assertions.assertThat((int)remoteChannel2.getUnannouncedCredit()).isZero();
        remoteChannel1.onSenderBacklog(4);
        remoteChannel2.onSenderBacklog(4);
        Assertions.assertThat((int)remoteChannel1.getNumberOfAvailableBuffers()).isEqualTo(6);
        Assertions.assertThat((int)remoteChannel2.getNumberOfAvailableBuffers()).isEqualTo(4);
        Assertions.assertThat((int)remoteChannel1.getUnannouncedCredit()).isEqualTo(6);
        Assertions.assertThat((int)remoteChannel2.getUnannouncedCredit()).isEqualTo(4);
    }

    @Test
    void testRequestBuffer() throws Exception {
        int i;
        TestBufferPool bufferPool = new TestBufferPool();
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(bufferPool);
        RemoteInputChannel remoteChannel1 = this.createRemoteInputChannel(inputGate, 0, 2);
        RemoteInputChannel remoteChannel2 = this.createRemoteInputChannel(inputGate, 1, 0);
        inputGate.setup();
        remoteChannel1.requestSubpartitions();
        remoteChannel2.requestSubpartitions();
        remoteChannel1.onSenderBacklog(2);
        remoteChannel2.onSenderBacklog(2);
        for (i = 4; i >= 0; --i) {
            Assertions.assertThat((int)remoteChannel1.getNumberOfRequiredBuffers()).isEqualTo(i);
            remoteChannel1.requestBuffer();
        }
        for (i = 2; i >= 0; --i) {
            Assertions.assertThat((int)remoteChannel2.getNumberOfRequiredBuffers()).isEqualTo(i);
            remoteChannel2.requestBuffer();
        }
    }

    @Test
    void testPrioritySequenceNumbers() throws Exception {
        int sequenceNumber = 0;
        int bufferSize = 1;
        RemoteInputChannel channel = this.buildInputGateAndGetChannel(sequenceNumber);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBarrier(channel, sequenceNumber++, UNALIGNED);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.assertGetNextBufferSequenceNumbers(channel, 2, 0, 1, 3, 4);
    }

    @Test
    void testGetInflightBuffers() throws Exception {
        int bufferSize = 1;
        int sequenceNumber = 0;
        RemoteInputChannel channel = this.buildInputGateAndGetChannel(sequenceNumber);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBarrier(channel, sequenceNumber++, UNALIGNED);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.assertInflightBufferSizes(channel, 1, 2);
    }

    @Test
    void testGetAllInflightBuffers() throws Exception {
        int sequenceNumber = 0x7FFFFFFD;
        int bufferSize = 1;
        RemoteInputChannel channel = this.buildInputGateAndGetChannel(sequenceNumber);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.assertInflightBufferSizes(channel, 1, 2, 3, 4);
    }

    @Test
    void testGetInflightBuffersOverflow() throws Exception {
        for (int startingSequence = 0x7FFFFFF5; startingSequence != -2147483646; ++startingSequence) {
            RemoteInputChannel channel = this.buildInputGateAndGetChannel(startingSequence);
            int bufferSize = 1;
            int sequenceNumber = startingSequence;
            this.sendBuffer(channel, sequenceNumber++, bufferSize++);
            this.sendBuffer(channel, sequenceNumber++, bufferSize++);
            this.sendBarrier(channel, sequenceNumber++, UNALIGNED);
            this.sendBuffer(channel, sequenceNumber++, bufferSize++);
            this.sendBuffer(channel, sequenceNumber++, bufferSize++);
            ((ListAssert)Assertions.assertThat(RemoteInputChannelTest.toBufferSizes(channel.getInflightBuffers(1L))).withFailMessage("For starting sequence " + startingSequence, new Object[0])).contains((Object[])new Integer[]{1, 2});
        }
    }

    @Test
    void testGetInflightBuffersAfterPollingBuffer() throws Exception {
        int bufferSize = 1;
        int sequenceNumber = 0;
        RemoteInputChannel channel = this.buildInputGateAndGetChannel(sequenceNumber);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBarrier(channel, sequenceNumber++, UNALIGNED);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.assertGetNextBufferSequenceNumbers(channel, 2, 0);
        this.assertInflightBufferSizes(channel, 2);
    }

    private static List<Integer> toBufferSizes(List<Buffer> inflightBuffers) {
        return inflightBuffers.stream().map(buffer -> buffer.getSize()).collect(Collectors.toList());
    }

    @Test
    void testRequiresAnnouncement() throws Exception {
        int sequenceNumber = 0;
        int bufferSize = 1;
        RemoteInputChannel channel = this.buildInputGateAndGetChannel(sequenceNumber);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBarrier(channel, sequenceNumber++, ALIGNED_WITH_TIMEOUT);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        InputChannel.BufferAndAvailability nextBuffer = (InputChannel.BufferAndAvailability)channel.getNextBuffer().get();
        Assertions.assertThat((int)nextBuffer.getSequenceNumber()).isEqualTo(2);
        Assertions.assertThat((boolean)nextBuffer.morePriorityEvents()).isFalse();
        Assertions.assertThat((boolean)nextBuffer.moreAvailable()).isTrue();
        Assertions.assertThat((Comparable)nextBuffer.buffer().getDataType()).isEqualTo((Object)Buffer.DataType.PRIORITIZED_EVENT_BUFFER);
        this.assertGetNextBufferSequenceNumbers(channel, 0, 1);
        nextBuffer = (InputChannel.BufferAndAvailability)channel.getNextBuffer().get();
        Assertions.assertThat((int)nextBuffer.getSequenceNumber()).isEqualTo(2);
        Assertions.assertThat((Comparable)nextBuffer.buffer().getDataType()).isEqualTo((Object)Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER);
        Assertions.assertThat((int)((InputChannel.BufferAndAvailability)channel.getNextBuffer().get()).getSequenceNumber()).isEqualTo(3);
    }

    @Test
    void testGetInflightBuffersBeforeProcessingAnnouncement() throws Exception {
        int bufferSize = 1;
        int sequenceNumber = 0;
        RemoteInputChannel channel = this.buildInputGateAndGetChannel(sequenceNumber);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBarrier(channel, sequenceNumber++, ALIGNED_WITH_TIMEOUT);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.assertInflightBufferSizes(channel, 1, 2);
    }

    @Test
    void testGetInflightBuffersAfterProcessingAnnouncement() throws Exception {
        int bufferSize = 1;
        int sequenceNumber = 0;
        RemoteInputChannel channel = this.buildInputGateAndGetChannel(sequenceNumber);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBarrier(channel, sequenceNumber++, ALIGNED_WITH_TIMEOUT);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.assertGetNextBufferSequenceNumbers(channel, 2);
        this.assertInflightBufferSizes(channel, 1, 2);
    }

    @Test
    void testGetInflightBuffersAfterProcessingAnnouncementAndBuffer() throws Exception {
        int bufferSize = 1;
        int sequenceNumber = 0;
        RemoteInputChannel channel = this.buildInputGateAndGetChannel(sequenceNumber);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBarrier(channel, sequenceNumber++, ALIGNED_WITH_TIMEOUT);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.sendBuffer(channel, sequenceNumber++, bufferSize++);
        this.assertGetNextBufferSequenceNumbers(channel, 2, 0);
        this.assertInflightBufferSizes(channel, 2);
    }

    @Test
    void testSizeOfQueuedBuffers() throws Exception {
        int i;
        int sequenceNumber = 0;
        int bufferSize = 1;
        int queueSize = 0;
        RemoteInputChannel channel = this.buildInputGateAndGetChannel(sequenceNumber);
        Assertions.assertThat((long)channel.unsynchronizedGetSizeOfQueuedBuffers()).isZero();
        for (i = 0; i < 2; ++i) {
            this.sendBuffer(channel, sequenceNumber++, bufferSize++);
            Assertions.assertThat((long)channel.unsynchronizedGetSizeOfQueuedBuffers()).isEqualTo((long)(queueSize += bufferSize));
        }
        this.sendBarrier(channel, sequenceNumber++, UNALIGNED);
        Assertions.assertThat((long)channel.unsynchronizedGetSizeOfQueuedBuffers()).isEqualTo((long)(queueSize += EventSerializer.toSerializedEvent((AbstractEvent)new CheckpointBarrier(1L, 123L, UNALIGNED)).remaining()));
        for (i = 0; i < 3; ++i) {
            Optional nextBuffer = channel.getNextBuffer();
            Assertions.assertThat((long)channel.unsynchronizedGetSizeOfQueuedBuffers()).isEqualTo((long)(queueSize -= ((InputChannel.BufferAndAvailability)nextBuffer.get()).buffer().getSize()));
        }
        Assertions.assertThat((long)channel.unsynchronizedGetSizeOfQueuedBuffers()).isZero();
    }

    private void sendBarrier(RemoteInputChannel channel, int sequenceNumber, CheckpointOptions checkpointOptions) throws IOException {
        this.send(channel, sequenceNumber, EventSerializer.toBuffer((AbstractEvent)new CheckpointBarrier(1L, 123L, checkpointOptions), (boolean)checkpointOptions.isUnalignedCheckpoint()));
    }

    private void sendBuffer(RemoteInputChannel channel, int sequenceNumber, int dataSize) throws IOException {
        this.send(channel, sequenceNumber, TestBufferFactory.createBuffer(dataSize));
    }

    private void send(RemoteInputChannel channel, int sequenceNumber, Buffer buffer) throws IOException {
        channel.onBuffer(buffer, sequenceNumber, 0, 0);
        channel.checkError();
    }

    private void assertInflightBufferSizes(RemoteInputChannel channel, Integer ... bufferSizes) throws CheckpointException {
        Assertions.assertThat(RemoteInputChannelTest.toBufferSizes(channel.getInflightBuffers(1L))).containsExactly((Object[])bufferSizes);
    }

    private void assertGetNextBufferSequenceNumbers(RemoteInputChannel channel, Integer ... sequenceNumbers) throws IOException {
        ArrayList actualSequenceNumbers = new ArrayList();
        for (int i = 0; i < sequenceNumbers.length; ++i) {
            channel.getNextBuffer().map(InputChannel.BufferAndAvailability::getSequenceNumber).ifPresent(actualSequenceNumbers::add);
        }
        Assertions.assertThat(actualSequenceNumbers).contains((Object[])sequenceNumbers);
    }

    private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate) {
        return this.createRemoteInputChannel(inputGate, 0, 0, 0);
    }

    private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate, int consumedSubpartitionIndex, int initialCredits) {
        return InputChannelBuilder.newBuilder().setSubpartitionIndexSet(new ResultSubpartitionIndexSet(consumedSubpartitionIndex)).setNetworkBuffersPerChannel(initialCredits).buildRemoteChannel(inputGate);
    }

    private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate, int consumedSubpartitionIndex, int partitionRequestTimeout, int maxBackoff) {
        return InputChannelBuilder.newBuilder().setSubpartitionIndexSet(new ResultSubpartitionIndexSet(consumedSubpartitionIndex)).setPartitionRequestListenerTimeout(partitionRequestTimeout).setMaxBackoff(maxBackoff).buildRemoteChannel(inputGate);
    }

    private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate, ConnectionManager connectionManager, ResultPartitionID partitionId, int partitionRequestTimeout, int maxBackoff) {
        return InputChannelBuilder.newBuilder().setPartitionRequestListenerTimeout(partitionRequestTimeout).setMaxBackoff(maxBackoff).setPartitionId(partitionId).setConnectionManager(connectionManager).buildRemoteChannel(inputGate);
    }

    private RemoteInputChannel buildInputGateAndGetChannel(int expectedSequenceNumber) throws IOException {
        RemoteInputChannel channel = this.buildInputGateAndGetChannel();
        channel.setExpectedSequenceNumber(expectedSequenceNumber);
        return channel;
    }

    private RemoteInputChannel buildInputGateAndGetChannel() throws IOException {
        return (RemoteInputChannel)this.buildInputGate().getChannel(0);
    }

    private SingleInputGate buildInputGate() throws IOException {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(4, 4096);
        SingleInputGate inputGate = new SingleInputGateBuilder().setChannelFactory(InputChannelBuilder::buildRemoteChannel).setBufferPoolFactory(networkBufferPool.createBufferPool(1, 4)).setSegmentProvider((MemorySegmentProvider)networkBufferPool).build();
        inputGate.setup();
        inputGate.requestPartitions();
        return inputGate;
    }

    @Test
    void testOnFailedPartitionRequestDoesNotBlockNetworkThreads() throws Exception {
        long testBlockedWaitTimeoutMillis = 30000L;
        PartitionProducerStateChecker partitionProducerStateChecker = (jobId, intermediateDataSetId, resultPartitionId) -> CompletableFuture.completedFuture(ExecutionState.RUNNING);
        NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
        Task task = new TestTaskBuilder((ShuffleEnvironment<?, ?>)shuffleEnvironment).setPartitionProducerStateChecker(partitionProducerStateChecker).build(EXECUTOR_EXTENSION.getExecutor());
        SingleInputGate inputGate = new SingleInputGateBuilder().setPartitionProducerStateProvider((PartitionProducerStateProvider)task).build();
        TestTaskBuilder.setTaskState(task, ExecutionState.RUNNING);
        final OneShotLatch ready = new OneShotLatch();
        final OneShotLatch blocker = new OneShotLatch();
        final AtomicBoolean timedOutOrInterrupted = new AtomicBoolean(false);
        TestingConnectionManager blockingConnectionManager = new TestingConnectionManager(){

            @Override
            public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) {
                ready.trigger();
                try {
                    blocker.await(30000L, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException | TimeoutException e) {
                    timedOutOrInterrupted.set(true);
                }
                return new TestingPartitionRequestClient();
            }
        };
        RemoteInputChannel remoteInputChannel = InputChannelBuilder.newBuilder().setConnectionManager(blockingConnectionManager).buildRemoteChannel(inputGate);
        inputGate.setInputChannels(new InputChannel[]{remoteInputChannel});
        Thread simulatedNetworkThread = new Thread(() -> {
            try {
                ready.await();
                remoteInputChannel.onFailedPartitionRequest();
                blocker.trigger();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        simulatedNetworkThread.start();
        inputGate.requestPartitions();
        simulatedNetworkThread.join();
        ((AtomicBooleanAssert)Assertions.assertThat((AtomicBoolean)timedOutOrInterrupted).withFailMessage("Test ended by timeout or interruption - this indicates that the network thread was blocked.", new Object[0])).isFalse();
    }

    @Test
    void testNotifyOnPriority() throws IOException {
        SingleInputGate inputGate = new SingleInputGateBuilder().build();
        RemoteInputChannel channel = InputChannelTestUtils.createRemoteInputChannel(inputGate, 0);
        CheckpointOptions options = new CheckpointOptions((SnapshotType)CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault());
        AvailabilityUtil.assertPriorityAvailability((InputGate)inputGate, false, false, () -> AvailabilityUtil.assertAvailability((AvailabilityProvider)inputGate, false, true, () -> channel.onBuffer(EventSerializer.toBuffer((AbstractEvent)new CheckpointBarrier(1L, 123L, options), (boolean)false), 0, 0, 0)));
        AvailabilityUtil.assertPriorityAvailability((InputGate)inputGate, false, true, () -> AvailabilityUtil.assertAvailability((AvailabilityProvider)inputGate, true, true, () -> channel.onBuffer(EventSerializer.toBuffer((AbstractEvent)new CheckpointBarrier(2L, 123L, options), (boolean)true), 1, 0, 0)));
    }

    @Test
    void testBuffersInUseCount() throws Exception {
        RemoteInputChannel remoteInputChannel = this.buildInputGateAndGetChannel();
        Buffer buffer = TestBufferFactory.createBuffer(32768);
        remoteInputChannel.onBuffer(buffer.retainBuffer(), 0, 1, 0);
        Assertions.assertThat((int)remoteInputChannel.getBuffersInUseCount()).isEqualTo(2);
        remoteInputChannel.onBuffer(buffer.retainBuffer(), 1, 3, 0);
        Assertions.assertThat((int)remoteInputChannel.getBuffersInUseCount()).isEqualTo(5);
        remoteInputChannel.getNextBuffer();
        Assertions.assertThat((int)remoteInputChannel.getBuffersInUseCount()).isEqualTo(4);
        remoteInputChannel.getNextBuffer();
        Assertions.assertThat((int)remoteInputChannel.getBuffersInUseCount()).isEqualTo(3);
        remoteInputChannel.getNextBuffer();
        Assertions.assertThat((int)remoteInputChannel.getBuffersInUseCount()).isEqualTo(3);
    }

    @Test
    void testReleasedChannelNotifyRequiredSegmentId() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        RemoteInputChannel remoteChannel = this.createRemoteInputChannel(inputGate);
        remoteChannel.releaseAllResources();
        Assertions.assertThatThrownBy(() -> remoteChannel.notifyRequiredSegmentId(0, 0)).isInstanceOf(IllegalStateException.class);
    }

    private Callable<Void> recycleBufferTask(RemoteInputChannel inputChannel, BufferPool bufferPool, int numExclusiveSegments, int numFloatingBuffers) throws Exception {
        final ArrayDeque<Buffer> exclusiveBuffers = new ArrayDeque<Buffer>(numExclusiveSegments);
        for (int i = 0; i < numExclusiveSegments; ++i) {
            Buffer buffer = inputChannel.requestBuffer();
            Assertions.assertThat((Object)buffer).isNotNull();
            exclusiveBuffers.add(buffer);
        }
        final ArrayDeque<Buffer> floatingBuffers = new ArrayDeque<Buffer>(numFloatingBuffers);
        for (int i = 0; i < numFloatingBuffers; ++i) {
            Buffer buffer = bufferPool.requestBuffer();
            Assertions.assertThat((Object)buffer).isNotNull();
            floatingBuffers.add(buffer);
        }
        return new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                Random random = new Random();
                while (!exclusiveBuffers.isEmpty() && !floatingBuffers.isEmpty()) {
                    if (random.nextBoolean()) {
                        ((Buffer)exclusiveBuffers.poll()).recycleBuffer();
                        continue;
                    }
                    ((Buffer)floatingBuffers.poll()).recycleBuffer();
                }
                while (!exclusiveBuffers.isEmpty()) {
                    ((Buffer)exclusiveBuffers.poll()).recycleBuffer();
                }
                while (!floatingBuffers.isEmpty()) {
                    ((Buffer)floatingBuffers.poll()).recycleBuffer();
                }
                return null;
            }
        };
    }

    static void submitTasksAndWaitForResults(ExecutorService executor, Callable[] tasks) throws Exception {
        ArrayList results = Lists.newArrayListWithCapacity((int)tasks.length);
        for (Callable task : tasks) {
            results.add(executor.submit(task));
        }
        for (Future result : results) {
            result.get();
        }
    }

    public static void cleanup(NetworkBufferPool networkBufferPool, @Nullable ExecutorService executor, @Nullable Buffer buffer, @Nullable Throwable throwable, InputChannel ... inputChannels) throws Exception {
        for (InputChannel inputChannel : inputChannels) {
            try {
                inputChannel.releaseAllResources();
            }
            catch (Throwable tInner) {
                throwable = ExceptionUtils.firstOrSuppressed((Throwable)tInner, (Throwable)throwable);
            }
        }
        if (buffer != null && !buffer.isRecycled()) {
            buffer.recycleBuffer();
        }
        try {
            networkBufferPool.destroyAllBufferPools();
        }
        catch (Throwable tInner) {
            throwable = ExceptionUtils.firstOrSuppressed((Throwable)tInner, (Throwable)throwable);
        }
        try {
            networkBufferPool.destroy();
        }
        catch (Throwable tInner) {
            throwable = ExceptionUtils.firstOrSuppressed((Throwable)tInner, (Throwable)throwable);
        }
        if (executor != null) {
            executor.shutdown();
        }
        if (throwable != null) {
            ExceptionUtils.rethrowException((Throwable)throwable);
        }
    }

    private static final class TestBufferPool
    extends NoOpBufferPool {
        private TestBufferPool() {
        }

        @Override
        public Buffer requestBuffer() {
            MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)1024);
            return new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE);
        }
    }

    private static final class TestVerifyPartitionRequestClient
    extends TestingPartitionRequestClient {
        private ResultPartitionID partitionId;
        private ResultSubpartitionIndexSet subpartitionIndexSet;
        private int delayMs;

        private TestVerifyPartitionRequestClient() {
        }

        @Override
        public void requestSubpartition(ResultPartitionID partitionId, ResultSubpartitionIndexSet subpartitionIndexSet, RemoteInputChannel channel, int delayMs) {
            this.partitionId = partitionId;
            this.subpartitionIndexSet = subpartitionIndexSet;
            this.delayMs = delayMs;
        }

        void verifyResult(ResultPartitionID expectedId, int expectedSubpartitionIndex, int expectedDelayMs) {
            Assertions.assertThat((Object)this.partitionId).isEqualTo((Object)expectedId);
            Assertions.assertThat((Object)new ResultSubpartitionIndexSet(expectedSubpartitionIndex)).isEqualTo((Object)this.subpartitionIndexSet);
            Assertions.assertThat((int)this.delayMs).isEqualTo(expectedDelayMs);
        }
    }

    private static final class TestVerifyConnectionManager
    extends TestingConnectionManager {
        private final PartitionRequestClient client;

        TestVerifyConnectionManager(TestingPartitionRequestClient client) {
            this.client = (PartitionRequestClient)Preconditions.checkNotNull((Object)client);
        }

        @Override
        public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) {
            return this.client;
        }
    }

    private static final class TestPartitionProducerStateProvider
    implements PartitionProducerStateProvider {
        private boolean isInvoked;
        private final ResultPartitionID partitionId;

        TestPartitionProducerStateProvider(ResultPartitionID partitionId) {
            this.partitionId = (ResultPartitionID)Preconditions.checkNotNull((Object)partitionId);
        }

        public void requestPartitionProducerState(IntermediateDataSetID intermediateDataSetId, ResultPartitionID resultPartitionId, Consumer<? super PartitionProducerStateProvider.ResponseHandle> responseConsumer) {
            Assertions.assertThat((Object)resultPartitionId).isEqualTo((Object)this.partitionId);
            this.isInvoked = true;
        }

        boolean isInvoked() {
            return this.isInvoked;
        }
    }

    private static final class TestingExceptionConnectionManager
    extends TestingConnectionManager {
        private TestingExceptionConnectionManager() {
        }

        @Override
        public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException {
            throw new IOException("");
        }
    }

    private static interface TriFunction<T, U, V, R> {
        public R apply(T var1, U var2, V var3) throws Exception;
    }
}

