/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.api.writer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.writer.MultipleRecordWriters;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterTest;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.SingleRecordWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class RecordWriterDelegateTest {
    private static final int recordSize = 8;
    private static final int numberOfBuffers = 10;
    private static final int memorySegmentSize = 128;
    private NetworkBufferPool globalPool;

    RecordWriterDelegateTest() {
    }

    @BeforeEach
    void setup() {
        ((AbstractIntegerAssert)Assertions.assertThat((int)0).as("Illegal memory segment size", new Object[0])).isZero();
        this.globalPool = new NetworkBufferPool(10, 128);
    }

    @AfterEach
    void teardown() {
        this.globalPool.destroyAllBufferPools();
        this.globalPool.destroy();
    }

    @Test
    void testSingleRecordWriterAvailability() throws Exception {
        RecordWriter recordWriter = RecordWriterDelegateTest.createRecordWriter(this.globalPool);
        SingleRecordWriter writerDelegate = new SingleRecordWriter(recordWriter);
        Assertions.assertThat((Object)writerDelegate.getRecordWriter(0)).isEqualTo((Object)recordWriter);
        RecordWriterDelegateTest.verifyAvailability((RecordWriterDelegate)writerDelegate);
    }

    @Test
    void testMultipleRecordWritersAvailability() throws Exception {
        int numRecordWriters = 2;
        ArrayList<RecordWriter> recordWriters = new ArrayList<RecordWriter>(2);
        for (int i = 0; i < 2; ++i) {
            recordWriters.add(RecordWriterDelegateTest.createRecordWriter(this.globalPool));
        }
        MultipleRecordWriters writerDelegate = new MultipleRecordWriters(recordWriters);
        for (int i = 0; i < 2; ++i) {
            Assertions.assertThat((Object)writerDelegate.getRecordWriter(i)).isEqualTo(recordWriters.get(i));
        }
        RecordWriterDelegateTest.verifyAvailability((RecordWriterDelegate)writerDelegate);
    }

    @Test
    void testSingleRecordWriterBroadcastEvent() throws Exception {
        ResultPartition partition = RecordWriterTest.createResultPartition(128, 2);
        RecordWriter recordWriter = new RecordWriterBuilder().build((ResultPartitionWriter)partition);
        SingleRecordWriter writerDelegate = new SingleRecordWriter(recordWriter);
        RecordWriterDelegateTest.verifyBroadcastEvent((RecordWriterDelegate)writerDelegate, Collections.singletonList(partition));
    }

    @Test
    void testMultipleRecordWritersBroadcastEvent() throws Exception {
        int numRecordWriters = 2;
        ArrayList<RecordWriter> recordWriters = new ArrayList<RecordWriter>(2);
        ArrayList<ResultPartition> partitions = new ArrayList<ResultPartition>(2);
        for (int i = 0; i < 2; ++i) {
            ResultPartition partition = RecordWriterTest.createResultPartition(128, 2);
            partitions.add(partition);
            recordWriters.add(new RecordWriterBuilder().build((ResultPartitionWriter)partition));
        }
        MultipleRecordWriters writerDelegate = new MultipleRecordWriters(recordWriters);
        RecordWriterDelegateTest.verifyBroadcastEvent((RecordWriterDelegate)writerDelegate, partitions);
    }

    private static RecordWriter createRecordWriter(NetworkBufferPool globalPool) throws Exception {
        BufferPool localPool = globalPool.createBufferPool(1, 1, 1, Integer.MAX_VALUE, 0);
        ResultPartition partition = new ResultPartitionBuilder().setBufferPoolFactory((SupplierWithException<BufferPool, IOException>)((SupplierWithException)() -> localPool)).build();
        partition.setup();
        return new RecordWriterBuilder().build((ResultPartitionWriter)partition);
    }

    private static void verifyAvailability(RecordWriterDelegate writerDelegate) throws Exception {
        Assertions.assertThat((boolean)writerDelegate.isAvailable()).isTrue();
        Assertions.assertThat((CompletableFuture)writerDelegate.getAvailableFuture()).isDone();
        RecordWriter recordWriter = writerDelegate.getRecordWriter(0);
        for (int i = 0; i < 16; ++i) {
            recordWriter.emit((IOReadableWritable)new IntValue(i));
        }
        Assertions.assertThat((boolean)writerDelegate.isAvailable()).isFalse();
        CompletableFuture future = writerDelegate.getAvailableFuture();
        Assertions.assertThat((CompletableFuture)future).isNotDone();
        ResultSubpartitionView readView = recordWriter.getTargetPartition().createSubpartitionView(new ResultSubpartitionIndexSet(0), (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        Buffer buffer = readView.getNextBuffer().buffer();
        buffer.recycleBuffer();
        Assertions.assertThat((CompletableFuture)future).isDone();
        Assertions.assertThat((boolean)writerDelegate.isAvailable()).isTrue();
        Assertions.assertThat((CompletableFuture)writerDelegate.getAvailableFuture()).isDone();
    }

    private static void verifyBroadcastEvent(RecordWriterDelegate writerDelegate, List<ResultPartition> partitions) throws Exception {
        CancelCheckpointMarker message = new CancelCheckpointMarker(1L);
        writerDelegate.broadcastEvent((AbstractEvent)message);
        for (ResultPartition partition : partitions) {
            for (int i = 0; i < partition.getNumberOfSubpartitions(); ++i) {
                Assertions.assertThat((int)partition.getNumberOfQueuedBuffers(i)).isOne();
                ResultSubpartitionView view = partition.createSubpartitionView(new ResultSubpartitionIndexSet(i), (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
                BufferOrEvent boe = RecordWriterTest.parseBuffer(view.getNextBuffer().buffer(), i);
                Assertions.assertThat((boolean)boe.isEvent()).isTrue();
                Assertions.assertThat((Object)boe.getEvent()).isEqualTo((Object)message);
            }
        }
    }
}

