/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.sort;

import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.operators.sort.ExternalSorter;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.RandomIntPairGenerator;
import org.apache.flink.runtime.operators.testutils.TestData;
import org.apache.flink.runtime.operators.testutils.types.IntPair;
import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
import org.apache.flink.util.MutableObjectIterator;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ExternalSortITCase {
    private static final Logger LOG = LoggerFactory.getLogger(ExternalSortITCase.class);
    private static final long SEED = 649180756312423613L;
    private static final int KEY_MAX = Integer.MAX_VALUE;
    private static final int VALUE_LENGTH = 114;
    private static final String VAL = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
    private static final int NUM_PAIRS = 200000;
    private static final int MEMORY_SIZE = 0x4E00000;
    private final AbstractInvokable parentTask = new DummyInvokable();
    private IOManager ioManager;
    private MemoryManager memoryManager;
    private TypeSerializerFactory<Tuple2<Integer, String>> pactRecordSerializer;
    private TypeComparator<Tuple2<Integer, String>> pactRecordComparator;
    private boolean testSuccess;

    ExternalSortITCase() {
    }

    @BeforeEach
    void beforeTest() {
        this.memoryManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x4E00000L).build();
        this.ioManager = new IOManagerAsync();
        this.pactRecordSerializer = TestData.getIntStringTupleSerializerFactory();
        this.pactRecordComparator = TestData.getIntStringTupleComparator();
    }

    @AfterEach
    void afterTest() throws Exception {
        this.ioManager.close();
        if (this.memoryManager != null && this.testSuccess) {
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.memoryManager.verifyEmpty()).withFailMessage("Memory leak: not all segments have been returned to the memory manager.", new Object[0])).isTrue();
            this.memoryManager.shutdown();
            this.memoryManager = null;
        }
    }

    @Test
    void testInMemorySort() {
        try {
            IntComparator keyComparator = new IntComparator(true);
            TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.CONSTANT, VAL);
            TestData.TupleGeneratorIterator source = new TestData.TupleGeneratorIterator(generator, 200000);
            LOG.debug("Initializing sortmerger...");
            ExternalSorter merger = ExternalSorter.newBuilder((MemoryManager)this.memoryManager, (AbstractInvokable)this.parentTask, (TypeSerializer)this.pactRecordSerializer.getSerializer(), this.pactRecordComparator).maxNumFileHandles(2).enableSpilling(this.ioManager, (double)0.9f).memoryFraction(0.8205128205128205).objectReuse(true).largeRecords(true).build((MutableObjectIterator)source);
            LOG.debug("Reading and sorting data...");
            MutableObjectIterator iterator = merger.getIterator();
            LOG.debug("Checking results...");
            int pairsEmitted = 1;
            Tuple2 rec1 = new Tuple2();
            Tuple2 rec2 = new Tuple2();
            rec1 = (Tuple2)iterator.next((Object)rec1);
            Assertions.assertThat((Object)rec1).isNotNull();
            while ((rec2 = (Tuple2)iterator.next((Object)rec2)) != null) {
                ++pairsEmitted;
                Assertions.assertThat((int)keyComparator.compare(rec1.f0, rec2.f0)).isLessThanOrEqualTo(0);
                Tuple2 tmp = rec1;
                rec1 = rec2;
                rec2 = tmp;
            }
            Assertions.assertThat((int)pairsEmitted).isEqualTo(200000);
            merger.close();
            this.testSuccess = true;
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    void testInMemorySortUsing10Buffers() {
        try {
            IntComparator keyComparator = new IntComparator(true);
            TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.CONSTANT, VAL);
            TestData.TupleGeneratorIterator source = new TestData.TupleGeneratorIterator(generator, 200000);
            LOG.debug("Initializing sortmerger...");
            ExternalSorter merger = ExternalSorter.newBuilder((MemoryManager)this.memoryManager, (AbstractInvokable)this.parentTask, (TypeSerializer)this.pactRecordSerializer.getSerializer(), this.pactRecordComparator).maxNumFileHandles(2).sortBuffers(10).enableSpilling(this.ioManager, (double)0.9f).memoryFraction(0.8205128205128205).objectReuse(false).largeRecords(true).build((MutableObjectIterator)source);
            LOG.debug("Reading and sorting data...");
            MutableObjectIterator iterator = merger.getIterator();
            LOG.debug("Checking results...");
            int pairsEmitted = 1;
            Tuple2 rec1 = new Tuple2();
            Tuple2 rec2 = new Tuple2();
            rec1 = (Tuple2)iterator.next((Object)rec1);
            Assertions.assertThat((Object)rec1).isNotNull();
            while ((rec2 = (Tuple2)iterator.next((Object)rec2)) != null) {
                ++pairsEmitted;
                Assertions.assertThat((int)keyComparator.compare(rec1.f0, rec2.f0)).isLessThanOrEqualTo(0);
                Tuple2 tmp = rec1;
                rec1 = rec2;
                rec2 = tmp;
            }
            Assertions.assertThat((int)pairsEmitted).isEqualTo(200000);
            merger.close();
            this.testSuccess = true;
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    void testSpillingSort() {
        try {
            IntComparator keyComparator = new IntComparator(true);
            TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.CONSTANT, VAL);
            TestData.TupleGeneratorIterator source = new TestData.TupleGeneratorIterator(generator, 200000);
            LOG.debug("Initializing sortmerger...");
            ExternalSorter merger = ExternalSorter.newBuilder((MemoryManager)this.memoryManager, (AbstractInvokable)this.parentTask, (TypeSerializer)this.pactRecordSerializer.getSerializer(), this.pactRecordComparator).maxNumFileHandles(64).enableSpilling(this.ioManager, (double)0.7f).memoryFraction(0.20512820512820512).objectReuse(true).largeRecords(true).build((MutableObjectIterator)source);
            LOG.debug("Reading and sorting data...");
            MutableObjectIterator iterator = merger.getIterator();
            LOG.debug("Checking results...");
            int pairsEmitted = 1;
            Tuple2 rec1 = new Tuple2();
            Tuple2 rec2 = new Tuple2();
            rec1 = (Tuple2)iterator.next((Object)rec1);
            Assertions.assertThat((Object)rec1).isNotNull();
            while ((rec2 = (Tuple2)iterator.next((Object)rec2)) != null) {
                ++pairsEmitted;
                Assertions.assertThat((int)keyComparator.compare(rec1.f0, rec2.f0)).isLessThanOrEqualTo(0);
                Tuple2 tmp = rec1;
                rec1 = rec2;
                rec2 = tmp;
            }
            Assertions.assertThat((int)pairsEmitted).isEqualTo(200000);
            merger.close();
            this.testSuccess = true;
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    void testSpillingSortWithIntermediateMerge() {
        try {
            int PAIRS = 10000000;
            IntComparator keyComparator = new IntComparator(true);
            TestData.TupleGenerator generator = new TestData.TupleGenerator(649180756312423613L, Integer.MAX_VALUE, 114, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.FIX_LENGTH);
            TestData.TupleGeneratorIterator source = new TestData.TupleGeneratorIterator(generator, 10000000);
            LOG.debug("Initializing sortmerger...");
            ExternalSorter merger = ExternalSorter.newBuilder((MemoryManager)this.memoryManager, (AbstractInvokable)this.parentTask, (TypeSerializer)this.pactRecordSerializer.getSerializer(), this.pactRecordComparator).maxNumFileHandles(16).enableSpilling(this.ioManager, (double)0.7f).memoryFraction(0.8205128205128205).objectReuse(false).largeRecords(true).build((MutableObjectIterator)source);
            LOG.debug("Emitting data...");
            MutableObjectIterator iterator = merger.getIterator();
            LOG.debug("Checking results...");
            int pairsRead = 1;
            int nextStep = 500000;
            Tuple2 rec1 = new Tuple2();
            Tuple2 rec2 = new Tuple2();
            rec1 = (Tuple2)iterator.next((Object)rec1);
            Assertions.assertThat((Object)rec1).isNotNull();
            while ((rec2 = (Tuple2)iterator.next((Object)rec2)) != null) {
                ++pairsRead;
                Assertions.assertThat((int)keyComparator.compare(rec1.f0, rec2.f0)).isLessThanOrEqualTo(0);
                Tuple2 tmp = rec1;
                rec1 = rec2;
                rec2 = tmp;
                if (pairsRead != nextStep) continue;
                nextStep += 500000;
            }
            ((AbstractIntegerAssert)Assertions.assertThat((int)pairsRead).withFailMessage("Not all pairs were read back in.", new Object[0])).isEqualTo(10000000);
            merger.close();
            this.testSuccess = true;
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    void testSpillingSortWithIntermediateMergeIntPair() {
        try {
            int PAIRS = 50000000;
            RandomIntPairGenerator generator = new RandomIntPairGenerator(12345678L, 50000000L);
            IntPairSerializer.IntPairSerializerFactory serializerFactory = new IntPairSerializer.IntPairSerializerFactory();
            TestData.IntPairComparator comparator = new TestData.IntPairComparator();
            LOG.debug("Initializing sortmerger...");
            ExternalSorter merger = ExternalSorter.newBuilder((MemoryManager)this.memoryManager, (AbstractInvokable)this.parentTask, (TypeSerializer)serializerFactory.getSerializer(), (TypeComparator)comparator).maxNumFileHandles(4).enableSpilling(this.ioManager, (double)0.7f).memoryFraction(0.8205128205128205).objectReuse(true).largeRecords(true).build((MutableObjectIterator)generator);
            LOG.debug("Emitting data...");
            MutableObjectIterator iterator = merger.getIterator();
            LOG.debug("Checking results...");
            int pairsRead = 1;
            int nextStep = 2500000;
            IntPair rec1 = new IntPair();
            IntPair rec2 = new IntPair();
            rec1 = (IntPair)iterator.next((Object)rec1);
            Assertions.assertThat((Object)rec1).isNotNull();
            while ((rec2 = (IntPair)iterator.next((Object)rec2)) != null) {
                int k1 = rec1.getKey();
                int k2 = rec2.getKey();
                ++pairsRead;
                Assertions.assertThat((int)k1).isLessThanOrEqualTo(k2);
                IntPair tmp = rec1;
                rec1 = rec2;
                rec2 = tmp;
                if (pairsRead != nextStep) continue;
                nextStep += 2500000;
            }
            ((AbstractIntegerAssert)Assertions.assertThat((int)pairsRead).withFailMessage("Not all pairs were read back in.", new Object[0])).isEqualTo(50000000);
            merger.close();
            this.testSuccess = true;
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }
}

