/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.api.writer;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.util.DeserializationUtils;
import org.apache.flink.runtime.operators.shipping.OutputEmitter;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.testutils.serialization.types.SerializationTestType;
import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
import org.apache.flink.testutils.serialization.types.Util;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.XORShiftRandom;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(value={ParameterizedTestExtension.class})
class RecordWriterTest {
    @Parameter
    public boolean isBroadcastWriter;

    RecordWriterTest() {
    }

    @Parameters(name="isBroadcastWriter={0}")
    public static List<Boolean> parameters() throws Exception {
        return Arrays.asList(false, true);
    }

    @TestTemplate
    void testBroadcastEventNoRecords() throws Exception {
        int numberOfSubpartitions = 4;
        int bufferSize = 32;
        ResultPartition partition = RecordWriterTest.createResultPartition(bufferSize, numberOfSubpartitions);
        RecordWriter writer = this.createRecordWriter((ResultPartitionWriter)partition);
        CheckpointBarrier barrier = new CheckpointBarrier(2148402839L, 2166311875L, CheckpointOptions.forCheckpointWithDefaultLocation());
        writer.broadcastEvent((AbstractEvent)barrier);
        Assertions.assertThat((int)partition.getBufferPool().bestEffortGetNumOfUsedBuffers()).isZero();
        for (int i = 0; i < numberOfSubpartitions; ++i) {
            Assertions.assertThat((int)partition.getNumberOfQueuedBuffers(i)).isOne();
            ResultSubpartitionView view = partition.createSubpartitionView(new ResultSubpartitionIndexSet(i), (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
            BufferOrEvent boe = RecordWriterTest.parseBuffer(view.getNextBuffer().buffer(), i);
            Assertions.assertThat((boolean)boe.isEvent()).isTrue();
            Assertions.assertThat((Object)boe.getEvent()).isEqualTo((Object)barrier);
            Assertions.assertThat((boolean)view.getAvailabilityAndBacklog(true).isAvailable()).isFalse();
        }
    }

    @TestTemplate
    void testBroadcastEventMixedRecords() throws Exception {
        XORShiftRandom rand = new XORShiftRandom();
        int numberOfSubpartitions = 4;
        int bufferSize = 32;
        int lenBytes = 4;
        ResultPartition partition = RecordWriterTest.createResultPartition(bufferSize, numberOfSubpartitions);
        RecordWriter writer = this.createRecordWriter((ResultPartitionWriter)partition);
        CheckpointBarrier barrier = new CheckpointBarrier(2147484939L, 2147483846L, CheckpointOptions.forCheckpointWithDefaultLocation());
        byte[] bytes = new byte[bufferSize / 2];
        rand.nextBytes(bytes);
        writer.emit((IOReadableWritable)new ByteArrayIO(bytes));
        bytes = new byte[bufferSize + 1];
        rand.nextBytes(bytes);
        writer.emit((IOReadableWritable)new ByteArrayIO(bytes));
        bytes = new byte[bufferSize - lenBytes];
        rand.nextBytes(bytes);
        writer.emit((IOReadableWritable)new ByteArrayIO(bytes));
        writer.broadcastEvent((AbstractEvent)barrier);
        if (this.isBroadcastWriter) {
            Assertions.assertThat((int)partition.getBufferPool().bestEffortGetNumOfUsedBuffers()).isEqualTo(3);
            for (int i = 0; i < numberOfSubpartitions; ++i) {
                Assertions.assertThat((int)partition.getNumberOfQueuedBuffers(i)).isEqualTo(4);
                ResultSubpartitionView view = partition.createSubpartitionView(new ResultSubpartitionIndexSet(i), (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
                for (int j = 0; j < 3; ++j) {
                    Assertions.assertThat((boolean)RecordWriterTest.parseBuffer(view.getNextBuffer().buffer(), 0).isBuffer()).isTrue();
                }
                BufferOrEvent boe = RecordWriterTest.parseBuffer(view.getNextBuffer().buffer(), i);
                Assertions.assertThat((boolean)boe.isEvent()).isTrue();
                Assertions.assertThat((Object)boe.getEvent()).isEqualTo((Object)barrier);
            }
        } else {
            Assertions.assertThat((int)partition.getBufferPool().bestEffortGetNumOfUsedBuffers()).isEqualTo(4);
            ResultSubpartitionView[] views = new ResultSubpartitionView[4];
            Assertions.assertThat((int)partition.getNumberOfQueuedBuffers(0)).isEqualTo(2);
            views[0] = partition.createSubpartitionView(new ResultSubpartitionIndexSet(0), (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
            Assertions.assertThat((boolean)RecordWriterTest.parseBuffer(views[0].getNextBuffer().buffer(), 0).isBuffer()).isTrue();
            Assertions.assertThat((int)partition.getNumberOfQueuedBuffers(1)).isEqualTo(3);
            views[1] = partition.createSubpartitionView(new ResultSubpartitionIndexSet(1), (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
            Assertions.assertThat((boolean)RecordWriterTest.parseBuffer(views[1].getNextBuffer().buffer(), 1).isBuffer()).isTrue();
            Assertions.assertThat((boolean)RecordWriterTest.parseBuffer(views[1].getNextBuffer().buffer(), 1).isBuffer()).isTrue();
            Assertions.assertThat((int)partition.getNumberOfQueuedBuffers(2)).isEqualTo(2);
            views[2] = partition.createSubpartitionView(new ResultSubpartitionIndexSet(2), (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
            Assertions.assertThat((boolean)RecordWriterTest.parseBuffer(views[2].getNextBuffer().buffer(), 2).isBuffer()).isTrue();
            views[3] = partition.createSubpartitionView(new ResultSubpartitionIndexSet(3), (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
            Assertions.assertThat((int)partition.getNumberOfQueuedBuffers(3)).isOne();
            for (int i = 0; i < numberOfSubpartitions; ++i) {
                BufferOrEvent boe = RecordWriterTest.parseBuffer(views[i].getNextBuffer().buffer(), i);
                Assertions.assertThat((boolean)boe.isEvent()).isTrue();
                Assertions.assertThat((Object)boe.getEvent()).isEqualTo((Object)barrier);
            }
        }
    }

    @TestTemplate
    void testBroadcastEventBufferReferenceCounting() throws Exception {
        int i;
        int bufferSize = 32768;
        int numSubpartitions = 2;
        ResultPartition partition = RecordWriterTest.createResultPartition(bufferSize, numSubpartitions);
        RecordWriter writer = this.createRecordWriter((ResultPartitionWriter)partition);
        writer.broadcastEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE);
        Buffer[] buffers = new Buffer[numSubpartitions];
        for (i = 0; i < numSubpartitions; ++i) {
            Assertions.assertThat((int)partition.getNumberOfQueuedBuffers(i)).isOne();
            ResultSubpartitionView view = partition.createSubpartitionView(new ResultSubpartitionIndexSet(i), (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
            buffers[i] = view.getNextBuffer().buffer();
            Assertions.assertThat((boolean)RecordWriterTest.parseBuffer(buffers[i], i).isEvent()).isTrue();
        }
        for (i = 0; i < numSubpartitions; ++i) {
            Assertions.assertThat((boolean)buffers[i].isRecycled()).isTrue();
        }
    }

    @TestTemplate
    void testBroadcastEventBufferIndependence() throws Exception {
        this.verifyBroadcastBufferOrEventIndependence(true);
    }

    @TestTemplate
    void testBroadcastEmitBufferIndependence() throws Exception {
        this.verifyBroadcastBufferOrEventIndependence(false);
    }

    @TestTemplate
    void testBroadcastEmitRecord(@TempDir Path tempPath) throws Exception {
        int numberOfSubpartitions = 4;
        int bufferSize = 32;
        int numValues = 8;
        int serializationLength = 4;
        ResultPartition partition = RecordWriterTest.createResultPartition(32, 4);
        RecordWriter writer = this.createRecordWriter((ResultPartitionWriter)partition);
        SpillingAdaptiveSpanningRecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer(new String[]{tempPath.toString()});
        ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<SerializationTestType>();
        Util.MockRecords records = Util.randomRecords((int)8, (SerializationTestTypeFactory)SerializationTestTypeFactory.INT);
        for (SerializationTestType record : records) {
            serializedRecords.add(record);
            writer.broadcastEmit((IOReadableWritable)record);
        }
        int numRequiredBuffers = 2;
        if (this.isBroadcastWriter) {
            Assertions.assertThat((int)partition.getBufferPool().bestEffortGetNumOfUsedBuffers()).isEqualTo(2);
        } else {
            Assertions.assertThat((int)partition.getBufferPool().bestEffortGetNumOfUsedBuffers()).isEqualTo(8);
        }
        for (int i = 0; i < 4; ++i) {
            Assertions.assertThat((int)partition.getNumberOfQueuedBuffers(i)).isEqualTo(2);
            ResultSubpartitionView view = partition.createSubpartitionView(new ResultSubpartitionIndexSet(i), (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
            RecordWriterTest.verifyDeserializationResults(view, (RecordDeserializer<SerializationTestType>)deserializer, (ArrayDeque<SerializationTestType>)serializedRecords.clone(), 2, 8);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testIsAvailableOrNot() throws Exception {
        NetworkBufferPool globalPool = new NetworkBufferPool(10, 128);
        BufferPool localPool = globalPool.createBufferPool(1, 1, 1, Integer.MAX_VALUE, 0);
        ResultPartition resultPartition = new ResultPartitionBuilder().setBufferPoolFactory((SupplierWithException<BufferPool, IOException>)((SupplierWithException)() -> localPool)).build();
        resultPartition.setup();
        RecordWriter recordWriter = this.createRecordWriter((ResultPartitionWriter)resultPartition);
        try {
            Assertions.assertThat((CompletableFuture)recordWriter.getAvailableFuture()).isDone();
            try (BufferBuilder bufferBuilder = localPool.requestBufferBuilder(0);){
                Assertions.assertThat((Object)bufferBuilder).isNotNull();
                Assertions.assertThat((CompletableFuture)recordWriter.getAvailableFuture()).isNotDone();
                Buffer buffer = BufferBuilderTestUtils.buildSingleBuffer(bufferBuilder);
                buffer.recycleBuffer();
            }
            Assertions.assertThat((CompletableFuture)recordWriter.getAvailableFuture()).isDone();
            Assertions.assertThat((CompletableFuture)recordWriter.getAvailableFuture()).isEqualTo((Object)RecordWriter.AVAILABLE);
        }
        finally {
            localPool.lazyDestroy();
            globalPool.destroy();
        }
    }

    private void verifyBroadcastBufferOrEventIndependence(boolean broadcastEvent) throws Exception {
        ResultPartition partition = RecordWriterTest.createResultPartition(4096, 2);
        RecordWriter writer = this.createRecordWriter((ResultPartitionWriter)partition);
        if (broadcastEvent) {
            writer.broadcastEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE);
        } else {
            writer.broadcastEmit((IOReadableWritable)new IntValue(0));
        }
        Assertions.assertThat((int)partition.getNumberOfQueuedBuffers(0)).isOne();
        Assertions.assertThat((int)partition.getNumberOfQueuedBuffers(1)).isOne();
        ResultSubpartitionView view0 = partition.createSubpartitionView(new ResultSubpartitionIndexSet(0), (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        ResultSubpartitionView view1 = partition.createSubpartitionView(new ResultSubpartitionIndexSet(1), (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        Buffer buffer1 = view0.getNextBuffer().buffer();
        Buffer buffer2 = view1.getNextBuffer().buffer();
        Assertions.assertThat((int)buffer1.getReaderIndex()).isZero();
        Assertions.assertThat((int)buffer2.getReaderIndex()).isZero();
        buffer1.setReaderIndex(1);
        ((AbstractIntegerAssert)Assertions.assertThat((int)buffer2.getReaderIndex()).withFailMessage("Buffer 2 shares the same reader index as buffer 1", new Object[0])).isZero();
    }

    public static void verifyDeserializationResults(ResultSubpartitionView view, RecordDeserializer<SerializationTestType> deserializer, ArrayDeque<SerializationTestType> expectedRecords, int numRequiredBuffers, int numValues) throws Exception {
        int assertRecords = 0;
        for (int j = 0; j < numRequiredBuffers; ++j) {
            Buffer buffer = view.getNextBuffer().buffer();
            deserializer.setNextBuffer(buffer);
            assertRecords += DeserializationUtils.deserializeRecords(expectedRecords, deserializer);
        }
        Assertions.assertThat((boolean)view.getAvailabilityAndBacklog(true).isAvailable()).isFalse();
        Assertions.assertThat((int)assertRecords).isEqualTo(numValues);
    }

    private RecordWriter createRecordWriter(ResultPartitionWriter writer) {
        if (this.isBroadcastWriter) {
            return new RecordWriterBuilder().setChannelSelector((ChannelSelector)new OutputEmitter(ShipStrategyType.BROADCAST, 0)).build(writer);
        }
        return new RecordWriterBuilder().build(writer);
    }

    public static ResultPartition createResultPartition(int bufferSize, int numSubpartitions) throws IOException {
        NettyShuffleEnvironment env = new NettyShuffleEnvironmentBuilder().setBufferSize(bufferSize).build();
        ResultPartition partition = PartitionTestUtils.createPartition(env, ResultPartitionType.PIPELINED, numSubpartitions);
        partition.setup();
        return partition;
    }

    static BufferOrEvent parseBuffer(Buffer buffer, int targetChannel) throws IOException {
        if (buffer.isBuffer()) {
            return new BufferOrEvent(buffer, new InputChannelInfo(0, targetChannel));
        }
        AbstractEvent event = EventSerializer.fromBuffer((Buffer)buffer, (ClassLoader)RecordWriterTest.class.getClassLoader());
        buffer.recycleBuffer();
        return new BufferOrEvent(event, new InputChannelInfo(0, targetChannel));
    }

    private static class ByteArrayIO
    implements IOReadableWritable {
        private final byte[] bytes;

        public ByteArrayIO(byte[] bytes) {
            this.bytes = bytes;
        }

        public void write(DataOutputView out) throws IOException {
            out.write(this.bytes);
        }

        public void read(DataInputView in) throws IOException {
            in.readFully(this.bytes);
        }
    }
}

