package org.apache.flink.runtime.io.network.api.writer;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.util.DeserializationUtils;
import org.apache.flink.runtime.operators.shipping.OutputEmitter;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.testutils.serialization.types.SerializationTestType;
import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
import org.apache.flink.testutils.serialization.types.Util;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.XORShiftRandom;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.class */
class RecordWriterTest {

    @Parameter
    public boolean isBroadcastWriter;

    /* loaded from: input_file:org/apache/flink/runtime/io/network/api/writer/RecordWriterTest$ByteArrayIO.class */
    private static class ByteArrayIO implements IOReadableWritable {
        private final byte[] bytes;

        public ByteArrayIO(byte[] bArr) {
            this.bytes = bArr;
        }

        public void write(DataOutputView dataOutputView) throws IOException {
            dataOutputView.write(this.bytes);
        }

        public void read(DataInputView dataInputView) throws IOException {
            dataInputView.readFully(this.bytes);
        }
    }

    RecordWriterTest() {
    }

    @Parameters(name = "isBroadcastWriter={0}")
    public static List<Boolean> parameters() throws Exception {
        return Arrays.asList(false, true);
    }

    @TestTemplate
    void testBroadcastEventNoRecords() throws Exception {
        ResultPartition createResultPartition = createResultPartition(32, 4);
        RecordWriter createRecordWriter = createRecordWriter(createResultPartition);
        CheckpointBarrier checkpointBarrier = new CheckpointBarrier(2148402839L, 2166311875L, CheckpointOptions.forCheckpointWithDefaultLocation());
        createRecordWriter.broadcastEvent(checkpointBarrier);
        Assertions.assertThat(createResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers()).isZero();
        for (int i = 0; i < 4; i++) {
            Assertions.assertThat(createResultPartition.getNumberOfQueuedBuffers(i)).isOne();
            ResultSubpartitionView createSubpartitionView = createResultPartition.createSubpartitionView(new ResultSubpartitionIndexSet(i), new NoOpBufferAvailablityListener());
            BufferOrEvent parseBuffer = parseBuffer(createSubpartitionView.getNextBuffer().buffer(), i);
            Assertions.assertThat(parseBuffer.isEvent()).isTrue();
            Assertions.assertThat(parseBuffer.getEvent()).isEqualTo(checkpointBarrier);
            Assertions.assertThat(createSubpartitionView.getAvailabilityAndBacklog(true).isAvailable()).isFalse();
        }
    }

    @TestTemplate
    void testBroadcastEventMixedRecords() throws Exception {
        XORShiftRandom xORShiftRandom = new XORShiftRandom();
        ResultPartition createResultPartition = createResultPartition(32, 4);
        RecordWriter createRecordWriter = createRecordWriter(createResultPartition);
        CheckpointBarrier checkpointBarrier = new CheckpointBarrier(2147484939L, 2147483846L, CheckpointOptions.forCheckpointWithDefaultLocation());
        byte[] bArr = new byte[32 / 2];
        xORShiftRandom.nextBytes(bArr);
        createRecordWriter.emit(new ByteArrayIO(bArr));
        byte[] bArr2 = new byte[32 + 1];
        xORShiftRandom.nextBytes(bArr2);
        createRecordWriter.emit(new ByteArrayIO(bArr2));
        byte[] bArr3 = new byte[32 - 4];
        xORShiftRandom.nextBytes(bArr3);
        createRecordWriter.emit(new ByteArrayIO(bArr3));
        createRecordWriter.broadcastEvent(checkpointBarrier);
        if (this.isBroadcastWriter) {
            Assertions.assertThat(createResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers()).isEqualTo(3);
            for (int i = 0; i < 4; i++) {
                Assertions.assertThat(createResultPartition.getNumberOfQueuedBuffers(i)).isEqualTo(4);
                ResultSubpartitionView createSubpartitionView = createResultPartition.createSubpartitionView(new ResultSubpartitionIndexSet(i), new NoOpBufferAvailablityListener());
                for (int i2 = 0; i2 < 3; i2++) {
                    Assertions.assertThat(parseBuffer(createSubpartitionView.getNextBuffer().buffer(), 0).isBuffer()).isTrue();
                }
                BufferOrEvent parseBuffer = parseBuffer(createSubpartitionView.getNextBuffer().buffer(), i);
                Assertions.assertThat(parseBuffer.isEvent()).isTrue();
                Assertions.assertThat(parseBuffer.getEvent()).isEqualTo(checkpointBarrier);
            }
            return;
        }
        Assertions.assertThat(createResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers()).isEqualTo(4);
        Assertions.assertThat(createResultPartition.getNumberOfQueuedBuffers(0)).isEqualTo(2);
        Assertions.assertThat(parseBuffer(r0[0].getNextBuffer().buffer(), 0).isBuffer()).isTrue();
        Assertions.assertThat(createResultPartition.getNumberOfQueuedBuffers(1)).isEqualTo(3);
        Assertions.assertThat(parseBuffer(r0[1].getNextBuffer().buffer(), 1).isBuffer()).isTrue();
        Assertions.assertThat(parseBuffer(r0[1].getNextBuffer().buffer(), 1).isBuffer()).isTrue();
        Assertions.assertThat(createResultPartition.getNumberOfQueuedBuffers(2)).isEqualTo(2);
        Assertions.assertThat(parseBuffer(r0[2].getNextBuffer().buffer(), 2).isBuffer()).isTrue();
        ResultSubpartitionView[] resultSubpartitionViewArr = {createResultPartition.createSubpartitionView(new ResultSubpartitionIndexSet(0), new NoOpBufferAvailablityListener()), createResultPartition.createSubpartitionView(new ResultSubpartitionIndexSet(1), new NoOpBufferAvailablityListener()), createResultPartition.createSubpartitionView(new ResultSubpartitionIndexSet(2), new NoOpBufferAvailablityListener()), createResultPartition.createSubpartitionView(new ResultSubpartitionIndexSet(3), new NoOpBufferAvailablityListener())};
        Assertions.assertThat(createResultPartition.getNumberOfQueuedBuffers(3)).isOne();
        for (int i3 = 0; i3 < 4; i3++) {
            BufferOrEvent parseBuffer2 = parseBuffer(resultSubpartitionViewArr[i3].getNextBuffer().buffer(), i3);
            Assertions.assertThat(parseBuffer2.isEvent()).isTrue();
            Assertions.assertThat(parseBuffer2.getEvent()).isEqualTo(checkpointBarrier);
        }
    }

    @TestTemplate
    void testBroadcastEventBufferReferenceCounting() throws Exception {
        ResultPartition createResultPartition = createResultPartition(32768, 2);
        createRecordWriter(createResultPartition).broadcastEvent(EndOfPartitionEvent.INSTANCE);
        Buffer[] bufferArr = new Buffer[2];
        for (int i = 0; i < 2; i++) {
            Assertions.assertThat(createResultPartition.getNumberOfQueuedBuffers(i)).isOne();
            bufferArr[i] = createResultPartition.createSubpartitionView(new ResultSubpartitionIndexSet(i), new NoOpBufferAvailablityListener()).getNextBuffer().buffer();
            Assertions.assertThat(parseBuffer(bufferArr[i], i).isEvent()).isTrue();
        }
        for (int i2 = 0; i2 < 2; i2++) {
            Assertions.assertThat(bufferArr[i2].isRecycled()).isTrue();
        }
    }

    @TestTemplate
    void testBroadcastEventBufferIndependence() throws Exception {
        verifyBroadcastBufferOrEventIndependence(true);
    }

    @TestTemplate
    void testBroadcastEmitBufferIndependence() throws Exception {
        verifyBroadcastBufferOrEventIndependence(false);
    }

    @TestTemplate
    void testBroadcastEmitRecord(@TempDir Path path) throws Exception {
        ResultPartition createResultPartition = createResultPartition(32, 4);
        RecordWriter createRecordWriter = createRecordWriter(createResultPartition);
        SpillingAdaptiveSpanningRecordDeserializer spillingAdaptiveSpanningRecordDeserializer = new SpillingAdaptiveSpanningRecordDeserializer(new String[]{path.toString()});
        ArrayDeque arrayDeque = new ArrayDeque();
        for (SerializationTestType serializationTestType : Util.randomRecords(8, SerializationTestTypeFactory.INT)) {
            arrayDeque.add(serializationTestType);
            createRecordWriter.broadcastEmit(serializationTestType);
        }
        if (this.isBroadcastWriter) {
            Assertions.assertThat(createResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers()).isEqualTo(2);
        } else {
            Assertions.assertThat(createResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers()).isEqualTo(8);
        }
        for (int i = 0; i < 4; i++) {
            Assertions.assertThat(createResultPartition.getNumberOfQueuedBuffers(i)).isEqualTo(2);
            verifyDeserializationResults(createResultPartition.createSubpartitionView(new ResultSubpartitionIndexSet(i), new NoOpBufferAvailablityListener()), spillingAdaptiveSpanningRecordDeserializer, arrayDeque.clone(), 2, 8);
        }
    }

    @TestTemplate
    void testIsAvailableOrNot() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 128);
        BufferPool createBufferPool = networkBufferPool.createBufferPool(1, 1, 1, 1, Integer.MAX_VALUE, 0);
        ResultPartition build = new ResultPartitionBuilder().setBufferPoolFactory(() -> {
            return createBufferPool;
        }).build();
        build.setup();
        RecordWriter createRecordWriter = createRecordWriter(build);
        try {
            Assertions.assertThat(createRecordWriter.getAvailableFuture()).isDone();
            BufferBuilder requestBufferBuilder = createBufferPool.requestBufferBuilder(0);
            Throwable th = null;
            try {
                try {
                    Assertions.assertThat(requestBufferBuilder).isNotNull();
                    Assertions.assertThat(createRecordWriter.getAvailableFuture()).isNotDone();
                    BufferBuilderTestUtils.buildSingleBuffer(requestBufferBuilder).recycleBuffer();
                    if (requestBufferBuilder != null) {
                        if (0 != 0) {
                            try {
                                requestBufferBuilder.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            requestBufferBuilder.close();
                        }
                    }
                    Assertions.assertThat(createRecordWriter.getAvailableFuture()).isDone();
                    Assertions.assertThat(createRecordWriter.getAvailableFuture()).isEqualTo(RecordWriter.AVAILABLE);
                    createBufferPool.lazyDestroy();
                    networkBufferPool.destroy();
                } finally {
                }
            } finally {
            }
        } catch (Throwable th3) {
            createBufferPool.lazyDestroy();
            networkBufferPool.destroy();
            throw th3;
        }
    }

    private void verifyBroadcastBufferOrEventIndependence(boolean z) throws Exception {
        ResultPartition createResultPartition = createResultPartition(4096, 2);
        RecordWriter createRecordWriter = createRecordWriter(createResultPartition);
        if (z) {
            createRecordWriter.broadcastEvent(EndOfPartitionEvent.INSTANCE);
        } else {
            createRecordWriter.broadcastEmit(new IntValue(0));
        }
        Assertions.assertThat(createResultPartition.getNumberOfQueuedBuffers(0)).isOne();
        Assertions.assertThat(createResultPartition.getNumberOfQueuedBuffers(1)).isOne();
        ResultSubpartitionView createSubpartitionView = createResultPartition.createSubpartitionView(new ResultSubpartitionIndexSet(0), new NoOpBufferAvailablityListener());
        ResultSubpartitionView createSubpartitionView2 = createResultPartition.createSubpartitionView(new ResultSubpartitionIndexSet(1), new NoOpBufferAvailablityListener());
        Buffer buffer = createSubpartitionView.getNextBuffer().buffer();
        Buffer buffer2 = createSubpartitionView2.getNextBuffer().buffer();
        Assertions.assertThat(buffer.getReaderIndex()).isZero();
        Assertions.assertThat(buffer2.getReaderIndex()).isZero();
        buffer.setReaderIndex(1);
        Assertions.assertThat(buffer2.getReaderIndex()).withFailMessage("Buffer 2 shares the same reader index as buffer 1", new Object[0]).isZero();
    }

    public static void verifyDeserializationResults(ResultSubpartitionView resultSubpartitionView, RecordDeserializer<SerializationTestType> recordDeserializer, ArrayDeque<SerializationTestType> arrayDeque, int i, int i2) throws Exception {
        int i3 = 0;
        for (int i4 = 0; i4 < i; i4++) {
            recordDeserializer.setNextBuffer(resultSubpartitionView.getNextBuffer().buffer());
            i3 += DeserializationUtils.deserializeRecords(arrayDeque, recordDeserializer);
        }
        Assertions.assertThat(resultSubpartitionView.getAvailabilityAndBacklog(true).isAvailable()).isFalse();
        Assertions.assertThat(i3).isEqualTo(i2);
    }

    private RecordWriter createRecordWriter(ResultPartitionWriter resultPartitionWriter) {
        return this.isBroadcastWriter ? new RecordWriterBuilder().setChannelSelector(new OutputEmitter(ShipStrategyType.BROADCAST, 0)).build(resultPartitionWriter) : new RecordWriterBuilder().build(resultPartitionWriter);
    }

    public static ResultPartition createResultPartition(int i, int i2) throws IOException {
        ResultPartition createPartition = PartitionTestUtils.createPartition(new NettyShuffleEnvironmentBuilder().setBufferSize(i).build(), ResultPartitionType.PIPELINED, i2);
        createPartition.setup();
        return createPartition;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static BufferOrEvent parseBuffer(Buffer buffer, int i) throws IOException {
        if (buffer.isBuffer()) {
            return new BufferOrEvent(buffer, new InputChannelInfo(0, i));
        }
        AbstractEvent fromBuffer = EventSerializer.fromBuffer(buffer, RecordWriterTest.class.getClassLoader());
        buffer.recycleBuffer();
        return new BufferOrEvent(fromBuffer, new InputChannelInfo(0, i));
    }
}
