package org.apache.flink.runtime.io.network.api.writer;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.HashMap;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.testutils.serialization.types.IntType;
import org.apache.flink.testutils.serialization.types.SerializationTestType;
import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
import org.apache.flink.testutils.serialization.types.Util;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.class */
class BroadcastRecordWriterTest {
    BroadcastRecordWriterTest() {
    }

    @Test
    void testBroadcastMixedRandomEmitRecord(@TempDir Path path) throws Exception {
        ResultPartition createResultPartition = RecordWriterTest.createResultPartition(32, 8);
        BroadcastRecordWriter broadcastRecordWriter = new BroadcastRecordWriter(createResultPartition, -1L, "test");
        SpillingAdaptiveSpanningRecordDeserializer spillingAdaptiveSpanningRecordDeserializer = new SpillingAdaptiveSpanningRecordDeserializer(new String[]{path.toString()});
        Util.MockRecords<SerializationTestType> randomRecords = Util.randomRecords(8, SerializationTestTypeFactory.INT);
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 8; i++) {
            hashMap.put(Integer.valueOf(i), new ArrayDeque());
        }
        int i2 = 0;
        for (SerializationTestType serializationTestType : randomRecords) {
            int i3 = i2;
            i2++;
            int i4 = i3 % 8;
            broadcastRecordWriter.emit(serializationTestType, i4);
            ((ArrayDeque) hashMap.get(Integer.valueOf(i4))).add(serializationTestType);
            broadcastRecordWriter.broadcastEmit(serializationTestType);
            for (int i5 = 0; i5 < 8; i5++) {
                ((ArrayDeque) hashMap.get(Integer.valueOf(i5))).add(serializationTestType);
            }
        }
        Assertions.assertThat(16).isEqualTo(createResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
        int i6 = 0;
        while (i6 < 8) {
            Assertions.assertThat(createResultPartition.getNumberOfQueuedBuffers(i6)).isEqualTo(9);
            RecordWriterTest.verifyDeserializationResults(createResultPartition.createSubpartitionView(new ResultSubpartitionIndexSet(i6), new NoOpBufferAvailablityListener()), spillingAdaptiveSpanningRecordDeserializer, (ArrayDeque) hashMap.get(Integer.valueOf(i6)), 9, 8 + 1 + (i6 < 0 ? 1 : 0));
            i6++;
        }
    }

    @Test
    void testRandomEmitAndBufferRecycling() throws Exception {
        ResultPartition createResultPartition = RecordWriterTest.createResultPartition(2 * 8, 2);
        BufferPool bufferPool = createResultPartition.getBufferPool();
        BroadcastRecordWriter broadcastRecordWriter = new BroadcastRecordWriter(createResultPartition, -1L, "test");
        Arrays.asList(bufferPool.requestBuffer(), bufferPool.requestBuffer()).forEach((v0) -> {
            v0.recycleBuffer();
        });
        Assertions.assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isEqualTo(3);
        broadcastRecordWriter.broadcastEmit(new IntType(1));
        broadcastRecordWriter.broadcastEmit(new IntType(2));
        Assertions.assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isEqualTo(2);
        Assertions.assertThat(createResultPartition.getNumberOfQueuedBuffers(0)).isOne();
        closeConsumer(createResultPartition.createSubpartitionView(new ResultSubpartitionIndexSet(0), new NoOpBufferAvailablityListener()), 2 * 8);
        Assertions.assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isEqualTo(2);
        broadcastRecordWriter.emit(new IntType(3), 0);
        Assertions.assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isOne();
        Assertions.assertThat(createResultPartition.getNumberOfQueuedBuffers(1)).isOne();
        closeConsumer(createResultPartition.createSubpartitionView(new ResultSubpartitionIndexSet(1), new NoOpBufferAvailablityListener()), 2 * 8);
        Assertions.assertThat(bufferPool.getNumberOfAvailableMemorySegments()).isEqualTo(2);
    }

    private static void closeConsumer(ResultSubpartitionView resultSubpartitionView, int i) throws IOException {
        Buffer buffer = resultSubpartitionView.getNextBuffer().buffer();
        Assertions.assertThat(buffer.getSize()).isEqualTo(i);
        buffer.recycleBuffer();
    }
}
