/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingBufferAccumulator;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingTierProducerAgent;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingTieredStorageMemoryManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageConfiguration;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyServiceImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartitionFactory;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferAccumulator;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageProducerClient;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.shuffle.NettyShuffleUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.IgnoreShutdownRejectedExecutionHandler;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class TieredResultPartitionTest {
    private static final int NUM_THREADS = 4;
    private static final int NETWORK_BUFFER_SIZE = 1024;
    private static final int NUM_TOTAL_BUFFERS = 1000;
    private static final int NUM_TOTAL_BYTES_IN_READ_POOL = 0x2000000;
    private FileChannelManager fileChannelManager;
    private NetworkBufferPool globalPool;
    private BatchShuffleReadBufferPool readBufferPool;
    private ScheduledExecutorService readIOExecutor;
    private TaskIOMetricGroup taskIOMetricGroup;
    @TempDir
    public Path tempDataPath;

    TieredResultPartitionTest() {
    }

    @BeforeEach
    void before() {
        this.fileChannelManager = new FileChannelManagerImpl(new String[]{this.tempDataPath.toString()}, "testing");
        this.globalPool = new NetworkBufferPool(1000, 1024);
        this.readBufferPool = new BatchShuffleReadBufferPool(0x2000000L, 1024);
        this.readIOExecutor = new ScheduledThreadPoolExecutor(4, (ThreadFactory)new ExecutorThreadFactory("test-io-scheduler-thread"), (RejectedExecutionHandler)new IgnoreShutdownRejectedExecutionHandler());
    }

    @AfterEach
    void after() throws Exception {
        this.fileChannelManager.close();
        this.globalPool.destroy();
        this.readBufferPool.destroy();
        this.readIOExecutor.shutdown();
    }

    @Test
    void testClose() throws Exception {
        boolean numBuffers = true;
        BufferPool bufferPool = this.globalPool.createBufferPool(1, 1);
        TieredResultPartition partition = this.createTieredStoreResultPartition(1, bufferPool, false);
        partition.close();
        Assertions.assertThat((boolean)bufferPool.isDestroyed()).isTrue();
    }

    @Test
    void testRelease() throws Exception {
        int numSubpartitions = 2;
        int numBuffers = 10;
        BufferPool bufferPool = this.globalPool.createBufferPool(10, 10);
        TieredResultPartition partition = this.createTieredStoreResultPartition(2, bufferPool, false);
        partition.emitRecord(ByteBuffer.allocate(5120), 1);
        partition.close();
        Assertions.assertThat((boolean)bufferPool.isDestroyed()).isTrue();
        partition.release();
        while (((File[])Preconditions.checkNotNull((Object)this.fileChannelManager.getPaths()[0].listFiles())).length != 0) {
            Thread.sleep(10L);
        }
        Assertions.assertThat((int)1000).isEqualTo(this.globalPool.getNumberOfAvailableMemorySegments());
    }

    @Test
    void testMinMaxNetworkBuffersTieredResultPartition() {
        int numSubpartitions = 105;
        int tieredStorageTotalExclusiveBufferNum = 103;
        Pair minMaxNetworkBuffers = NettyShuffleUtils.getMinMaxNetworkBuffersPerResultPartition((int)100, (int)5, (int)100, (int)10, (int)numSubpartitions, (boolean)true, (int)tieredStorageTotalExclusiveBufferNum, (ResultPartitionType)ResultPartitionType.HYBRID_SELECTIVE);
        Assertions.assertThat((Integer)((Integer)minMaxNetworkBuffers.getLeft())).isEqualTo(tieredStorageTotalExclusiveBufferNum);
        Assertions.assertThat((Integer)((Integer)minMaxNetworkBuffers.getRight())).isEqualTo(Integer.MAX_VALUE);
    }

    @Test
    void testCreateSubpartitionViewAfterRelease() throws Exception {
        int numBuffers = 10;
        BufferPool bufferPool = this.globalPool.createBufferPool(10, 10);
        TieredResultPartition resultPartition = this.createTieredStoreResultPartition(2, bufferPool, false);
        resultPartition.release();
        Assertions.assertThatThrownBy(() -> resultPartition.createSubpartitionView(new ResultSubpartitionIndexSet(0), (BufferAvailabilityListener)new NoOpBufferAvailablityListener())).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testEmitRecords() throws Exception {
        BufferPool bufferPool = this.globalPool.createBufferPool(3, 3);
        int bufferSize = 1024;
        try (TieredResultPartition partition = this.createTieredStoreResultPartition(2, bufferPool, false);){
            partition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
            partition.broadcastRecord(ByteBuffer.allocate(bufferSize));
            this.verifySubpartitionBytes(2L * (long)bufferSize, bufferSize);
        }
    }

    @Test
    void testMetricsUpdateForBroadcastOnlyResultPartition() throws Exception {
        BufferPool bufferPool = this.globalPool.createBufferPool(3, 3);
        int bufferSize = 1024;
        try (TieredResultPartition partition = this.createTieredStoreResultPartition(2, bufferPool, true);){
            partition.broadcastRecord(ByteBuffer.allocate(bufferSize));
            this.verifySubpartitionBytes(bufferSize, bufferSize);
        }
    }

    @Test
    void testRequestBuffersAfterPoolSizeDecreased() throws IOException {
        int numBuffers = 20;
        int numRecords = 10;
        BufferPool bufferPool = this.globalPool.createBufferPool(1, 20);
        TieredResultPartition resultPartition = this.createTieredStoreResultPartitionWithStorageManager(1, bufferPool, false);
        ResultSubpartitionView subpartitionView = resultPartition.createSubpartitionView(0, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        for (int i = 0; i < 10; ++i) {
            resultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
        }
        this.verifySubpartitionBytes(10240L);
        bufferPool.setNumBuffers(1);
        resultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
        this.verifySubpartitionBytes(11264L);
        subpartitionView.releaseAllResources();
        resultPartition.release();
    }

    private TieredResultPartition createTieredStoreResultPartition(int numSubpartitions, BufferPool bufferPool, boolean isBroadcastOnly) throws IOException {
        TestingTierProducerAgent tierProducerAgent = new TestingTierProducerAgent.Builder().build();
        TieredStorageResourceRegistry tieredStorageResourceRegistry = new TieredStorageResourceRegistry();
        TieredResultPartition tieredResultPartition = new TieredResultPartition("TieredStoreResultPartitionTest", 0, new ResultPartitionID(), ResultPartitionType.HYBRID_SELECTIVE, numSubpartitions, numSubpartitions, new ResultPartitionManager(), new BufferCompressor(1024, "LZ4"), () -> bufferPool, new TieredStorageProducerClient(numSubpartitions, isBroadcastOnly, (BufferAccumulator)new TestingBufferAccumulator(), null, Collections.singletonList(tierProducerAgent)), tieredStorageResourceRegistry, new TieredStorageNettyServiceImpl(tieredStorageResourceRegistry), Collections.emptyList(), (TieredStorageMemoryManager)new TestingTieredStorageMemoryManager.Builder().build());
        this.taskIOMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup();
        tieredResultPartition.setup();
        tieredResultPartition.setMetricGroup(this.taskIOMetricGroup);
        return tieredResultPartition;
    }

    private TieredResultPartition createTieredStoreResultPartitionWithStorageManager(int numSubpartitions, BufferPool bufferPool, boolean isBroadcastOnly) throws IOException {
        TieredStorageConfiguration tieredStorageConfiguration = TieredStorageConfiguration.builder(null).setMemoryTierSubpartitionMaxQueuedBuffers(10).build();
        TieredStorageResourceRegistry tieredStorageResourceRegistry = new TieredStorageResourceRegistry();
        TieredStorageNettyServiceImpl tieredStorageNettyService = new TieredStorageNettyServiceImpl(tieredStorageResourceRegistry);
        TieredResultPartitionFactory tieredResultPartitionFactory = new TieredResultPartitionFactory(tieredStorageConfiguration, tieredStorageNettyService, tieredStorageResourceRegistry);
        TieredResultPartition resultPartition = tieredResultPartitionFactory.createTieredResultPartition("TieredStoreResultPartitionTest", 0, new ResultPartitionID(), ResultPartitionType.HYBRID_SELECTIVE, numSubpartitions, numSubpartitions, Boolean.valueOf(isBroadcastOnly), new ResultPartitionManager(), new BufferCompressor(1024, "LZ4"), () -> bufferPool, this.fileChannelManager, this.readBufferPool, this.readIOExecutor, false);
        this.taskIOMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup();
        resultPartition.setup();
        resultPartition.setMetricGroup(this.taskIOMetricGroup);
        return resultPartition;
    }

    private void verifySubpartitionBytes(long ... expectedNumBytes) {
        IOMetrics ioMetrics = this.taskIOMetricGroup.createSnapshot();
        Assertions.assertThat((Map)ioMetrics.getResultPartitionBytes()).hasSize(1);
        ResultPartitionBytes partitionBytes = (ResultPartitionBytes)ioMetrics.getResultPartitionBytes().values().iterator().next();
        Assertions.assertThat((long[])partitionBytes.getSubpartitionBytes()).containsExactly(expectedNumBytes);
    }
}

