/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.disk.NoOpFileChannelManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.InputGateFairnessTest;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateTest;
import org.apache.flink.runtime.io.network.util.TestPartitionProducer;
import org.apache.flink.runtime.io.network.util.TestProducerSource;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.shaded.guava33.com.google.common.collect.Lists;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.CheckedSupplier;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

class LocalInputChannelTest {
    LocalInputChannelTest() {
    }

    @Test
    void testNoDataPersistedAfterReceivingAlignedBarrier() throws Exception {
        CheckpointBarrier barrier = new CheckpointBarrier(1L, 0L, CheckpointOptions.alignedWithTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (long)123L));
        BufferConsumer barrierHolder = EventSerializer.toBufferConsumer((AbstractEvent)barrier, (boolean)false);
        BufferConsumer data = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1);
        RecordingChannelStateWriter stateWriter = new RecordingChannelStateWriter();
        LocalInputChannel channel = InputChannelBuilder.newBuilder().setPartitionManager(new SingleInputGateTest.TestingResultPartitionManager(InputChannelTestUtils.createResultSubpartitionView(barrierHolder, data))).setStateWriter(stateWriter).buildLocalChannel(new SingleInputGateBuilder().build());
        channel.requestSubpartitions();
        channel.getNextBuffer();
        stateWriter.start(barrier.getId(), barrier.getCheckpointOptions());
        channel.checkpointStarted(barrier);
        channel.getNextBuffer();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)stateWriter.getAddedInput().isEmpty()).withFailMessage("no data should be persisted after receiving a barrier", new Object[0])).isTrue();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testConcurrentConsumeMultiplePartitions() throws Exception {
        int parallelism = 32;
        int producerBufferPoolSize = 33;
        int numberOfBuffersPerChannel = 1024;
        ExecutorService executor = Executors.newFixedThreadPool(64);
        NetworkBufferPool networkBuffers = new NetworkBufferPool(2080, 32768);
        ResultPartitionManager partitionManager = new ResultPartitionManager();
        ResultPartitionID[] partitionIds = new ResultPartitionID[32];
        TestPartitionProducer[] partitionProducers = new TestPartitionProducer[32];
        for (int i = 0; i < 32; ++i) {
            partitionIds[i] = new ResultPartitionID();
            ResultPartition partition = new ResultPartitionBuilder().setResultPartitionId(partitionIds[i]).setNumberOfSubpartitions(32).setNumTargetKeyGroups(32).setResultPartitionManager(partitionManager).setBufferPoolFactory((SupplierWithException<BufferPool, IOException>)((SupplierWithException)() -> networkBuffers.createBufferPool(33, 33, 32, Integer.MAX_VALUE, 0))).build();
            partition.setup();
            partitionProducers[i] = new TestPartitionProducer((BufferWritingResultPartition)partition, false, new TestPartitionProducerBufferSource(32, 32768, 1024));
        }
        try {
            int i;
            ArrayList results = Lists.newArrayListWithCapacity((int)33);
            for (i = 0; i < 32; ++i) {
                results.add(CompletableFuture.supplyAsync(CheckedSupplier.unchecked(partitionProducers[i]::call), executor));
            }
            for (i = 0; i < 32; ++i) {
                TestLocalInputChannelConsumer consumer = new TestLocalInputChannelConsumer(i, 32, 1024, networkBuffers.createBufferPool(32, 32), partitionManager, new TaskEventDispatcher(), partitionIds);
                results.add(CompletableFuture.supplyAsync(CheckedSupplier.unchecked(consumer::call), executor));
            }
            FutureUtils.waitForAll((Collection)results).get();
        }
        finally {
            networkBuffers.destroyAllBufferPools();
            networkBuffers.destroy();
            executor.shutdown();
        }
    }

    @Test
    void testPartitionRequestExponentialBackoff() throws Exception {
        int initialBackoff = 500;
        int maxBackoff = 3000;
        int[] expectedDelays = new int[]{initialBackoff, 1000, 2000, maxBackoff};
        SingleInputGate inputGate = (SingleInputGate)Mockito.mock(SingleInputGate.class);
        BufferProvider bufferProvider = (BufferProvider)Mockito.mock(BufferProvider.class);
        Mockito.when((Object)inputGate.getBufferProvider()).thenReturn((Object)bufferProvider);
        ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
        LocalInputChannel ch = InputChannelTestUtils.createLocalInputChannel(inputGate, partitionManager, initialBackoff, maxBackoff);
        Mockito.when((Object)partitionManager.createSubpartitionView((ResultPartitionID)ArgumentMatchers.eq((Object)ch.partitionId), (ResultSubpartitionIndexSet)ArgumentMatchers.any(ResultSubpartitionIndexSet.class), (BufferAvailabilityListener)ArgumentMatchers.any(BufferAvailabilityListener.class))).thenThrow(new Throwable[]{new PartitionNotFoundException(ch.partitionId)});
        Timer timer = (Timer)Mockito.mock(Timer.class);
        ((Timer)Mockito.doAnswer(invocation -> {
            ((TimerTask)invocation.getArguments()[0]).run();
            return null;
        }).when((Object)timer)).schedule((TimerTask)ArgumentMatchers.any(TimerTask.class), ArgumentMatchers.anyLong());
        ch.requestSubpartitions();
        ((ResultPartitionManager)Mockito.verify((Object)partitionManager)).createSubpartitionView((ResultPartitionID)ArgumentMatchers.eq((Object)ch.partitionId), (ResultSubpartitionIndexSet)ArgumentMatchers.any(ResultSubpartitionIndexSet.class), (BufferAvailabilityListener)ArgumentMatchers.any(BufferAvailabilityListener.class));
        int[] nArray = expectedDelays;
        int n = nArray.length;
        for (int i = 0; i < n; ++i) {
            long expected = nArray[i];
            ch.retriggerSubpartitionRequest(timer);
            ((Timer)Mockito.verify((Object)timer)).schedule((TimerTask)ArgumentMatchers.any(TimerTask.class), ArgumentMatchers.eq((long)expected));
        }
        ch.retriggerSubpartitionRequest(timer);
        Assertions.assertThatThrownBy(() -> ((LocalInputChannel)ch).getNextBuffer());
    }

    @Test
    void testProducerFailedException() throws Exception {
        ResultSubpartitionView view = (ResultSubpartitionView)Mockito.mock(ResultSubpartitionView.class);
        Mockito.when((Object)view.isReleased()).thenReturn((Object)true);
        Mockito.when((Object)view.getFailureCause()).thenReturn((Object)new Exception("Expected test exception"));
        ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
        Mockito.when((Object)partitionManager.createSubpartitionView((ResultPartitionID)ArgumentMatchers.any(ResultPartitionID.class), (ResultSubpartitionIndexSet)ArgumentMatchers.any(ResultSubpartitionIndexSet.class), (BufferAvailabilityListener)ArgumentMatchers.any(BufferAvailabilityListener.class))).thenReturn((Object)view);
        SingleInputGate inputGate = (SingleInputGate)Mockito.mock(SingleInputGate.class);
        BufferProvider bufferProvider = (BufferProvider)Mockito.mock(BufferProvider.class);
        Mockito.when((Object)inputGate.getBufferProvider()).thenReturn((Object)bufferProvider);
        LocalInputChannel ch = InputChannelTestUtils.createLocalInputChannel(inputGate, partitionManager);
        ch.requestSubpartitions();
        Assertions.assertThatThrownBy(() -> ((LocalInputChannel)ch).getNextBuffer()).isInstanceOf(CancelTaskException.class);
    }

    @Test
    void testPartitionNotFoundExceptionWhileRequestingPartition() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        LocalInputChannel localChannel = InputChannelTestUtils.createLocalInputChannel(inputGate, new ResultPartitionManager());
        Assertions.assertThatThrownBy(() -> ((LocalInputChannel)localChannel).requestSubpartitions()).isInstanceOfSatisfying(PartitionNotFoundException.class, notFound -> Assertions.assertThat((Object)localChannel.getPartitionId()).isEqualTo((Object)notFound.getPartitionId()));
    }

    @Test
    void testRetriggerPartitionRequestWhilePartitionNotFound() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        LocalInputChannel localChannel = InputChannelTestUtils.createLocalInputChannel(inputGate, new ResultPartitionManager(), 1, 1);
        inputGate.setInputChannels(new InputChannel[]{localChannel});
        localChannel.requestSubpartitions();
        Assertions.assertThat((Object)inputGate.getRetriggerLocalRequestTimer()).isNotNull();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testChannelErrorWhileRetriggeringRequest() {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        final LocalInputChannel localChannel = InputChannelTestUtils.createLocalInputChannel(inputGate, new ResultPartitionManager());
        Timer timer = new Timer(true){

            @Override
            public void schedule(TimerTask task, long delay) {
                task.run();
                Assertions.assertThatThrownBy(() -> ((LocalInputChannel)localChannel).checkError()).isInstanceOfSatisfying(PartitionNotFoundException.class, notFound -> Assertions.assertThat((Object)localChannel2.partitionId).isEqualTo((Object)notFound.getPartitionId()));
            }
        };
        try {
            localChannel.retriggerSubpartitionRequest(timer);
        }
        finally {
            timer.cancel();
        }
    }

    @Test
    void testConcurrentReleaseAndRetriggerPartitionRequest() throws Exception {
        SingleInputGate gate = InputChannelTestUtils.createSingleInputGate(1);
        ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
        Mockito.when((Object)partitionManager.createSubpartitionView((ResultPartitionID)ArgumentMatchers.any(ResultPartitionID.class), (ResultSubpartitionIndexSet)ArgumentMatchers.any(ResultSubpartitionIndexSet.class), (BufferAvailabilityListener)ArgumentMatchers.any(BufferAvailabilityListener.class))).thenAnswer(invocationOnMock -> {
            Thread.sleep(100L);
            throw new PartitionNotFoundException(new ResultPartitionID());
        });
        LocalInputChannel channel = InputChannelTestUtils.createLocalInputChannel(gate, partitionManager, 1, 1);
        Thread releaser = new Thread(() -> {
            try {
                gate.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        });
        Thread requester = new Thread(() -> {
            try {
                channel.requestSubpartitions();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        });
        requester.start();
        releaser.start();
        releaser.join();
        requester.join();
    }

    @Test
    void testGetNextAfterPartitionReleased() throws Exception {
        ResultSubpartitionView subpartitionView = InputChannelTestUtils.createResultSubpartitionView(false);
        SingleInputGateTest.TestingResultPartitionManager partitionManager = new SingleInputGateTest.TestingResultPartitionManager(subpartitionView);
        LocalInputChannel channel = InputChannelTestUtils.createLocalInputChannel(new SingleInputGateBuilder().build(), partitionManager);
        channel.requestSubpartitions();
        Assertions.assertThat((Optional)channel.getNextBuffer()).isNotPresent();
        subpartitionView.releaseAllResources();
        Assertions.assertThatThrownBy(() -> ((LocalInputChannel)channel).getNextBuffer()).isInstanceOf(CancelTaskException.class);
        channel.releaseAllResources();
        Assertions.assertThat((Optional)channel.getNextBuffer()).isNotPresent();
    }

    @Test
    void testGetBufferFromLocalChannelWhenCompressionEnabled() throws Exception {
        ResultSubpartitionView subpartitionView = InputChannelTestUtils.createResultSubpartitionView(true);
        SingleInputGateTest.TestingResultPartitionManager partitionManager = new SingleInputGateTest.TestingResultPartitionManager(subpartitionView);
        LocalInputChannel channel = InputChannelTestUtils.createLocalInputChannel(new SingleInputGateBuilder().build(), partitionManager);
        channel.requestSubpartitions();
        Optional bufferAndAvailability = channel.getNextBuffer();
        Assertions.assertThat((Optional)bufferAndAvailability).hasValueSatisfying(value -> Assertions.assertThat((boolean)value.buffer().isCompressed()).isFalse());
    }

    @Test
    void testUnblockReleasedChannel() throws Exception {
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        LocalInputChannel localChannel = InputChannelTestUtils.createLocalInputChannel(inputGate, new ResultPartitionManager());
        localChannel.releaseAllResources();
        Assertions.assertThatThrownBy(() -> ((LocalInputChannel)localChannel).resumeConsumption()).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testAnnounceBufferSize() throws Exception {
        AtomicInteger lastBufferSize = new AtomicInteger(0);
        SingleInputGateTest.TestingResultPartitionManager partitionManager = new SingleInputGateTest.TestingResultPartitionManager(InputChannelTestUtils.createResultSubpartitionView(true));
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        LocalInputChannel localChannel = InputChannelTestUtils.createLocalInputChannel(inputGate, partitionManager);
        localChannel.requestSubpartitions();
        localChannel.announceBufferSize(10);
        localChannel.releaseAllResources();
        Assertions.assertThatThrownBy(() -> localChannel.announceBufferSize(12)).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testEnqueueAvailableChannelWhenResuming() throws IOException, InterruptedException {
        PipelinedResultPartition parent = (PipelinedResultPartition)PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED, NoOpFileChannelManager.INSTANCE);
        ResultSubpartition subpartition = parent.getAllPartitions()[0];
        ResultSubpartitionView subpartitionView = subpartition.createReadView(view -> {});
        SingleInputGateTest.TestingResultPartitionManager partitionManager = new SingleInputGateTest.TestingResultPartitionManager(subpartitionView);
        LocalInputChannel channel = InputChannelTestUtils.createLocalInputChannel(new SingleInputGateBuilder().build(), partitionManager);
        channel.requestSubpartitions();
        subpartition.add(EventSerializer.toBufferConsumer((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), (boolean)false));
        Assertions.assertThat((Optional)channel.getNextBuffer()).isPresent();
        subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096));
        subpartition.flush();
        Assertions.assertThat((Optional)channel.inputGate.pollNext()).isNotPresent();
        channel.resumeConsumption();
        Optional nextBuffer = channel.inputGate.pollNext();
        Assertions.assertThat((Optional)nextBuffer).hasValueSatisfying(value -> Assertions.assertThat((boolean)value.isBuffer()).isTrue());
    }

    @Test
    void testCheckpointingInflightData() throws Exception {
        SingleInputGate inputGate = new SingleInputGateBuilder().build();
        PipelinedResultPartition parent = (PipelinedResultPartition)PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED, NoOpFileChannelManager.INSTANCE);
        ResultSubpartition subpartition = parent.getAllPartitions()[0];
        ResultSubpartitionView subpartitionView = subpartition.createReadView(view -> {});
        SingleInputGateTest.TestingResultPartitionManager partitionManager = new SingleInputGateTest.TestingResultPartitionManager(subpartitionView);
        RecordingChannelStateWriter stateWriter = new RecordingChannelStateWriter();
        LocalInputChannel channel = InputChannelTestUtils.createLocalInputChannel(inputGate, partitionManager, 0, 0, b -> b.setStateWriter(stateWriter));
        inputGate.setInputChannels(new InputChannel[]{channel});
        channel.requestSubpartitions();
        CheckpointStorageLocationReference location = CheckpointStorageLocationReference.getDefault();
        CheckpointOptions options = CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)location);
        stateWriter.start(0L, options);
        CheckpointBarrier barrier = new CheckpointBarrier(0L, 123L, options);
        channel.checkpointStarted(barrier);
        subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1));
        Assertions.assertThat((Optional)channel.getNextBuffer()).isPresent();
        subpartition.add(EventSerializer.toBufferConsumer((AbstractEvent)barrier, (boolean)true));
        Assertions.assertThat((Optional)channel.getNextBuffer()).isPresent();
        subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(2));
        Assertions.assertThat((Optional)channel.getNextBuffer()).isPresent();
        Assertions.assertThat((int[])stateWriter.getAddedInput().get((Object)channel.getChannelInfo()).stream().mapToInt(Buffer::getSize).toArray()).containsExactly(new int[]{1});
    }

    @Test
    void testAnnounceNewBufferSize() throws IOException, InterruptedException {
        PipelinedResultPartition parent = (PipelinedResultPartition)new ResultPartitionBuilder().setResultPartitionType(ResultPartitionType.PIPELINED).setFileChannelManager(NoOpFileChannelManager.INSTANCE).setNumberOfSubpartitions(2).build();
        ResultSubpartition subpartition0 = parent.getAllPartitions()[0];
        ResultSubpartition subpartition1 = parent.getAllPartitions()[1];
        LocalInputChannel channel0 = InputChannelTestUtils.createLocalInputChannel(new SingleInputGateBuilder().build(), new SingleInputGateTest.TestingResultPartitionManager(subpartition0.createReadView(view -> {})));
        LocalInputChannel channel1 = InputChannelTestUtils.createLocalInputChannel(new SingleInputGateBuilder().build(), new SingleInputGateTest.TestingResultPartitionManager(subpartition1.createReadView(view -> {})));
        channel0.requestSubpartitions();
        channel1.requestSubpartitions();
        Assertions.assertThat((int)subpartition0.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(16))).isEqualTo(Integer.MAX_VALUE);
        Assertions.assertThat((int)subpartition1.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(16))).isEqualTo(Integer.MAX_VALUE);
        channel0.announceBufferSize(9);
        channel1.announceBufferSize(20);
        Assertions.assertThat((int)subpartition0.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(16))).isEqualTo(9);
        Assertions.assertThat((int)subpartition1.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(16))).isEqualTo(20);
    }

    @Test
    void testReceivingBuffersInUseBeforeSubpartitionViewInitialization() throws Exception {
        ResultSubpartitionView subpartitionView = InputChannelTestUtils.createResultSubpartitionView(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096), BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096), BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096));
        SingleInputGateTest.TestingResultPartitionManager partitionManager = new SingleInputGateTest.TestingResultPartitionManager(subpartitionView);
        SingleInputGate inputGate = InputChannelTestUtils.createSingleInputGate(1);
        LocalInputChannel localChannel = InputChannelTestUtils.createLocalInputChannel(inputGate, partitionManager);
        inputGate.setInputChannels(new InputChannel[]{localChannel});
        Assertions.assertThat((int)localChannel.getBuffersInUseCount()).isZero();
        localChannel.requestSubpartitions();
        Assertions.assertThat((int)localChannel.getBuffersInUseCount()).isEqualTo(3);
    }

    private static class TestLocalInputChannelConsumer
    implements Callable<Void> {
        private final SingleInputGate inputGate;
        private final int numberOfInputChannels;
        private final int numberOfExpectedBuffersPerChannel;

        TestLocalInputChannelConsumer(int subpartitionIndex, int numberOfInputChannels, int numberOfExpectedBuffersPerChannel, BufferPool bufferPool, ResultPartitionManager partitionManager, TaskEventDispatcher taskEventDispatcher, ResultPartitionID[] consumedPartitionIds) throws IOException {
            Preconditions.checkArgument((numberOfInputChannels >= 1 ? 1 : 0) != 0);
            Preconditions.checkArgument((numberOfExpectedBuffersPerChannel >= 1 ? 1 : 0) != 0);
            this.inputGate = new SingleInputGateBuilder().setNumberOfChannels(numberOfInputChannels).setBufferPoolFactory(bufferPool).build();
            InputChannel[] inputChannels = new InputChannel[numberOfInputChannels];
            for (int i = 0; i < numberOfInputChannels; ++i) {
                inputChannels[i] = InputChannelBuilder.newBuilder().setChannelIndex(i).setSubpartitionIndexSet(new ResultSubpartitionIndexSet(subpartitionIndex)).setPartitionManager(partitionManager).setPartitionId(consumedPartitionIds[i]).setTaskEventPublisher((TaskEventPublisher)taskEventDispatcher).buildLocalChannel(this.inputGate);
            }
            InputGateFairnessTest.setupInputGate(this.inputGate, inputChannels);
            this.numberOfInputChannels = numberOfInputChannels;
            this.numberOfExpectedBuffersPerChannel = numberOfExpectedBuffersPerChannel;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() throws Exception {
            int[] numberOfBuffersPerChannel = new int[this.numberOfInputChannels];
            try {
                Optional boe;
                while ((boe = this.inputGate.getNext()).isPresent()) {
                    if (!((BufferOrEvent)boe.get()).isBuffer()) continue;
                    ((BufferOrEvent)boe.get()).getBuffer().recycleBuffer();
                    int n = ((BufferOrEvent)boe.get()).getChannelInfo().getInputChannelIdx();
                    numberOfBuffersPerChannel[n] = numberOfBuffersPerChannel[n] + 1;
                    if (numberOfBuffersPerChannel[n] <= this.numberOfExpectedBuffersPerChannel) continue;
                    throw new IllegalStateException("Received more buffers than expected on channel " + String.valueOf(((BufferOrEvent)boe.get()).getChannelInfo()) + ".");
                }
                for (int i = 0; i < numberOfBuffersPerChannel.length; ++i) {
                    int actualNumberOfReceivedBuffers = numberOfBuffersPerChannel[i];
                    if (actualNumberOfReceivedBuffers == this.numberOfExpectedBuffersPerChannel) continue;
                    throw new IllegalStateException("Received unexpected number of buffers on channel " + i + " (" + actualNumberOfReceivedBuffers + " instead of " + this.numberOfExpectedBuffersPerChannel + ").");
                }
            }
            finally {
                this.inputGate.close();
            }
            return null;
        }
    }

    private static class TestPartitionProducerBufferSource
    implements TestProducerSource {
        private final int bufferSize;
        private final List<Byte> channelIndexes;

        TestPartitionProducerBufferSource(int parallelism, int bufferSize, int numberOfBuffersToProduce) {
            this.bufferSize = bufferSize;
            this.channelIndexes = Lists.newArrayListWithCapacity((int)(parallelism * numberOfBuffersToProduce));
            for (byte i = 0; i < parallelism; i = (byte)(i + 1)) {
                for (int j = 0; j < numberOfBuffersToProduce; ++j) {
                    this.channelIndexes.add(i);
                }
            }
            Collections.shuffle(this.channelIndexes);
        }

        @Override
        public TestProducerSource.BufferAndChannel getNextBuffer() throws Exception {
            if (this.channelIndexes.size() > 0) {
                byte channelIndex = this.channelIndexes.remove(0);
                return new TestProducerSource.BufferAndChannel(new byte[this.bufferSize], channelIndex);
            }
            return null;
        }
    }
}

