/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition;
import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionReader;
import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
import org.apache.flink.runtime.io.network.partition.BoundedData;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SubpartitionTestBase;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(value={ParameterizedTestExtension.class})
class BoundedBlockingSubpartitionTest
extends SubpartitionTestBase {
    private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
    private static FileChannelManager fileChannelManager;
    private final BoundedBlockingSubpartitionType type;
    private final boolean sslEnabled;
    @TempDir
    private Path tmpFolder;

    @Parameters(name="type = {0}, sslEnabled = {1}")
    private static List<Object[]> parameters() {
        return Arrays.stream(BoundedBlockingSubpartitionType.values()).map(type -> new Object[][]{{type, true}, {type, false}}).flatMap(Arrays::stream).collect(Collectors.toList());
    }

    @BeforeAll
    static void setUp() {
        fileChannelManager = new FileChannelManagerImpl(new String[]{tempDir}, "testing");
    }

    @AfterAll
    static void shutdown() throws Exception {
        fileChannelManager.close();
    }

    BoundedBlockingSubpartitionTest(BoundedBlockingSubpartitionType type, boolean sslEnabled) {
        this.type = type;
        this.sslEnabled = sslEnabled;
    }

    @TestTemplate
    void testCreateReaderBeforeFinished() throws Exception {
        ResultSubpartition partition = this.createSubpartition();
        Assertions.assertThatThrownBy(() -> partition.createReadView((BufferAvailabilityListener)new NoOpBufferAvailablityListener())).isInstanceOf(IllegalStateException.class);
        partition.release();
    }

    @TestTemplate
    void testCloseBoundedData() throws Exception {
        TestingBoundedDataReader reader = new TestingBoundedDataReader();
        TestingBoundedData data = new TestingBoundedData(reader);
        BoundedBlockingSubpartitionReader bbspr = new BoundedBlockingSubpartitionReader((BoundedBlockingSubpartition)this.createSubpartition(), (BoundedData)data, 10, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        bbspr.releaseAllResources();
        Assertions.assertThat((boolean)reader.closed).isTrue();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testRecycleCurrentBufferOnFailure(BoundedBlockingSubpartitionType type, boolean sslEnabled) throws Exception {
        ResultPartition resultPartition = PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING, fileChannelManager);
        BoundedBlockingSubpartition subpartition = new BoundedBlockingSubpartition(0, resultPartition, (BoundedData)new FailingBoundedData(), !sslEnabled && type == BoundedBlockingSubpartitionType.FILE);
        BufferConsumer consumer = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(100);
        try {
            subpartition.add(consumer);
            Assertions.assertThatThrownBy(() -> subpartition.createReadView((BufferAvailabilityListener)new NoOpBufferAvailablityListener()));
            Assertions.assertThat((boolean)consumer.isRecycled()).isFalse();
            Assertions.assertThat((Object)subpartition.getCurrentBuffer()).isNotNull();
            Assertions.assertThat((boolean)subpartition.getCurrentBuffer().isRecycled()).isFalse();
        }
        finally {
            subpartition.release();
            Assertions.assertThat((boolean)consumer.isRecycled()).isTrue();
            Assertions.assertThat((Object)subpartition.getCurrentBuffer()).isNull();
        }
    }

    @Override
    ResultSubpartition createSubpartition() throws Exception {
        ResultPartition resultPartition = PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING, fileChannelManager);
        return this.type.create(0, resultPartition, new File(this.tmpFolder.toFile(), "subpartition"), 32768, this.sslEnabled);
    }

    @Override
    ResultSubpartition createFailingWritesSubpartition() throws Exception {
        ResultPartition resultPartition = PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING, fileChannelManager);
        return new BoundedBlockingSubpartition(0, resultPartition, (BoundedData)new FailingBoundedData(), !this.sslEnabled && this.type == BoundedBlockingSubpartitionType.FILE);
    }

    private static class TestingBoundedDataReader
    implements BoundedData.Reader {
        boolean closed;

        private TestingBoundedDataReader() {
        }

        @Nullable
        public Buffer nextBuffer() throws IOException {
            return null;
        }

        public void close() throws IOException {
            this.closed = true;
        }
    }

    private static class TestingBoundedData
    implements BoundedData {
        private BoundedData.Reader reader;

        private TestingBoundedData(BoundedData.Reader reader) {
            this.reader = (BoundedData.Reader)Preconditions.checkNotNull((Object)reader);
        }

        public void writeBuffer(Buffer buffer) throws IOException {
        }

        public void finishWrite() throws IOException {
        }

        public BoundedData.Reader createReader(ResultSubpartitionView ignored) throws IOException {
            return this.reader;
        }

        public long getSize() {
            throw new UnsupportedOperationException();
        }

        public Path getFilePath() {
            throw new UnsupportedOperationException();
        }

        public void close() {
        }
    }

    private static class FailingBoundedData
    implements BoundedData {
        private FailingBoundedData() {
        }

        public void writeBuffer(Buffer buffer) throws IOException {
            throw new IOException("test");
        }

        public void finishWrite() throws IOException {
            throw new UnsupportedOperationException();
        }

        public BoundedData.Reader createReader(ResultSubpartitionView subpartitionView) throws IOException {
            throw new UnsupportedOperationException();
        }

        public long getSize() {
            throw new UnsupportedOperationException();
        }

        public Path getFilePath() {
            throw new UnsupportedOperationException();
        }

        public void close() {
        }
    }
}

