package org.apache.flink.runtime.operators.sort;

import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.RandomIntPairGenerator;
import org.apache.flink.runtime.operators.testutils.TestData;
import org.apache.flink.runtime.operators.testutils.types.IntPair;
import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
import org.apache.flink.util.MutableObjectIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/sort/ExternalSortITCase.class */
class ExternalSortITCase {
    private static final Logger LOG = LoggerFactory.getLogger(ExternalSortITCase.class);
    private static final long SEED = 649180756312423613L;
    private static final int KEY_MAX = Integer.MAX_VALUE;
    private static final int VALUE_LENGTH = 114;
    private static final String VAL = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
    private static final int NUM_PAIRS = 200000;
    private static final int MEMORY_SIZE = 81788928;
    private final AbstractInvokable parentTask = new DummyInvokable();
    private IOManager ioManager;
    private MemoryManager memoryManager;
    private TypeSerializerFactory<Tuple2<Integer, String>> pactRecordSerializer;
    private TypeComparator<Tuple2<Integer, String>> pactRecordComparator;
    private boolean testSuccess;

    ExternalSortITCase() {
    }

    @BeforeEach
    void beforeTest() {
        this.memoryManager = MemoryManagerBuilder.newBuilder().setMemorySize(81788928L).build();
        this.ioManager = new IOManagerAsync();
        this.pactRecordSerializer = TestData.getIntStringTupleSerializerFactory();
        this.pactRecordComparator = TestData.getIntStringTupleComparator();
    }

    @AfterEach
    void afterTest() throws Exception {
        this.ioManager.close();
        if (this.memoryManager == null || !this.testSuccess) {
            return;
        }
        Assertions.assertThat(this.memoryManager.verifyEmpty()).withFailMessage("Memory leak: not all segments have been returned to the memory manager.", new Object[0]).isTrue();
        this.memoryManager.shutdown();
        this.memoryManager = null;
    }

    @Test
    void testInMemorySort() {
        try {
            IntComparator intComparator = new IntComparator(true);
            TestData.TupleGeneratorIterator tupleGeneratorIterator = new TestData.TupleGeneratorIterator(new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.CONSTANT, VAL), NUM_PAIRS);
            LOG.debug("Initializing sortmerger...");
            ExternalSorter build = ExternalSorter.newBuilder(this.memoryManager, this.parentTask, this.pactRecordSerializer.getSerializer(), this.pactRecordComparator).maxNumFileHandles(2).enableSpilling(this.ioManager, 0.8999999761581421d).memoryFraction(0.8205128205128205d).objectReuse(true).largeRecords(true).build(tupleGeneratorIterator);
            LOG.debug("Reading and sorting data...");
            MutableObjectIterator iterator = build.getIterator();
            LOG.debug("Checking results...");
            int i = 1;
            Tuple2 tuple2 = new Tuple2();
            Tuple2 tuple22 = new Tuple2();
            Tuple2 tuple23 = (Tuple2) iterator.next(tuple2);
            Tuple2 tuple24 = tuple23;
            Assertions.assertThat(tuple23).isNotNull();
            while (true) {
                Tuple2 tuple25 = (Tuple2) iterator.next(tuple22);
                if (tuple25 == null) {
                    Assertions.assertThat(i).isEqualTo(NUM_PAIRS);
                    build.close();
                    this.testSuccess = true;
                    return;
                } else {
                    i++;
                    Assertions.assertThat(intComparator.compare(tuple24.f0, tuple25.f0)).isLessThanOrEqualTo(0);
                    Tuple2 tuple26 = tuple24;
                    tuple24 = tuple25;
                    tuple22 = tuple26;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    void testInMemorySortUsing10Buffers() {
        try {
            IntComparator intComparator = new IntComparator(true);
            TestData.TupleGeneratorIterator tupleGeneratorIterator = new TestData.TupleGeneratorIterator(new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.CONSTANT, VAL), NUM_PAIRS);
            LOG.debug("Initializing sortmerger...");
            ExternalSorter build = ExternalSorter.newBuilder(this.memoryManager, this.parentTask, this.pactRecordSerializer.getSerializer(), this.pactRecordComparator).maxNumFileHandles(2).sortBuffers(10).enableSpilling(this.ioManager, 0.8999999761581421d).memoryFraction(0.8205128205128205d).objectReuse(false).largeRecords(true).build(tupleGeneratorIterator);
            LOG.debug("Reading and sorting data...");
            MutableObjectIterator iterator = build.getIterator();
            LOG.debug("Checking results...");
            int i = 1;
            Tuple2 tuple2 = new Tuple2();
            Tuple2 tuple22 = new Tuple2();
            Tuple2 tuple23 = (Tuple2) iterator.next(tuple2);
            Tuple2 tuple24 = tuple23;
            Assertions.assertThat(tuple23).isNotNull();
            while (true) {
                Tuple2 tuple25 = (Tuple2) iterator.next(tuple22);
                if (tuple25 == null) {
                    Assertions.assertThat(i).isEqualTo(NUM_PAIRS);
                    build.close();
                    this.testSuccess = true;
                    return;
                } else {
                    i++;
                    Assertions.assertThat(intComparator.compare(tuple24.f0, tuple25.f0)).isLessThanOrEqualTo(0);
                    Tuple2 tuple26 = tuple24;
                    tuple24 = tuple25;
                    tuple22 = tuple26;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    void testSpillingSort() {
        try {
            IntComparator intComparator = new IntComparator(true);
            TestData.TupleGeneratorIterator tupleGeneratorIterator = new TestData.TupleGeneratorIterator(new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.CONSTANT, VAL), NUM_PAIRS);
            LOG.debug("Initializing sortmerger...");
            ExternalSorter build = ExternalSorter.newBuilder(this.memoryManager, this.parentTask, this.pactRecordSerializer.getSerializer(), this.pactRecordComparator).maxNumFileHandles(64).enableSpilling(this.ioManager, 0.699999988079071d).memoryFraction(0.20512820512820512d).objectReuse(true).largeRecords(true).build(tupleGeneratorIterator);
            LOG.debug("Reading and sorting data...");
            MutableObjectIterator iterator = build.getIterator();
            LOG.debug("Checking results...");
            int i = 1;
            Tuple2 tuple2 = new Tuple2();
            Tuple2 tuple22 = new Tuple2();
            Tuple2 tuple23 = (Tuple2) iterator.next(tuple2);
            Tuple2 tuple24 = tuple23;
            Assertions.assertThat(tuple23).isNotNull();
            while (true) {
                Tuple2 tuple25 = (Tuple2) iterator.next(tuple22);
                if (tuple25 == null) {
                    Assertions.assertThat(i).isEqualTo(NUM_PAIRS);
                    build.close();
                    this.testSuccess = true;
                    return;
                } else {
                    i++;
                    Assertions.assertThat(intComparator.compare(tuple24.f0, tuple25.f0)).isLessThanOrEqualTo(0);
                    Tuple2 tuple26 = tuple24;
                    tuple24 = tuple25;
                    tuple22 = tuple26;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    void testSpillingSortWithIntermediateMerge() {
        try {
            IntComparator intComparator = new IntComparator(true);
            TestData.TupleGeneratorIterator tupleGeneratorIterator = new TestData.TupleGeneratorIterator(new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.FIX_LENGTH), 10000000);
            LOG.debug("Initializing sortmerger...");
            ExternalSorter build = ExternalSorter.newBuilder(this.memoryManager, this.parentTask, this.pactRecordSerializer.getSerializer(), this.pactRecordComparator).maxNumFileHandles(16).enableSpilling(this.ioManager, 0.699999988079071d).memoryFraction(0.8205128205128205d).objectReuse(false).largeRecords(true).build(tupleGeneratorIterator);
            LOG.debug("Emitting data...");
            MutableObjectIterator iterator = build.getIterator();
            LOG.debug("Checking results...");
            int i = 1;
            int i2 = 500000;
            Tuple2 tuple2 = new Tuple2();
            Tuple2 tuple22 = new Tuple2();
            Tuple2 tuple23 = (Tuple2) iterator.next(tuple2);
            Tuple2 tuple24 = tuple23;
            Assertions.assertThat(tuple23).isNotNull();
            while (true) {
                Tuple2 tuple25 = (Tuple2) iterator.next(tuple22);
                if (tuple25 == null) {
                    Assertions.assertThat(i).withFailMessage("Not all pairs were read back in.", new Object[0]).isEqualTo(10000000);
                    build.close();
                    this.testSuccess = true;
                    return;
                } else {
                    i++;
                    Assertions.assertThat(intComparator.compare(tuple24.f0, tuple25.f0)).isLessThanOrEqualTo(0);
                    Tuple2 tuple26 = tuple24;
                    tuple24 = tuple25;
                    tuple22 = tuple26;
                    if (i == i2) {
                        i2 += 500000;
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            Assertions.fail(e.getMessage());
        }
    }

    @Test
    void testSpillingSortWithIntermediateMergeIntPair() {
        try {
            RandomIntPairGenerator randomIntPairGenerator = new RandomIntPairGenerator(12345678L, 50000000L);
            IntPairSerializer.IntPairSerializerFactory intPairSerializerFactory = new IntPairSerializer.IntPairSerializerFactory();
            TestData.IntPairComparator intPairComparator = new TestData.IntPairComparator();
            LOG.debug("Initializing sortmerger...");
            ExternalSorter build = ExternalSorter.newBuilder(this.memoryManager, this.parentTask, intPairSerializerFactory.getSerializer(), intPairComparator).maxNumFileHandles(4).enableSpilling(this.ioManager, 0.699999988079071d).memoryFraction(0.8205128205128205d).objectReuse(true).largeRecords(true).build(randomIntPairGenerator);
            LOG.debug("Emitting data...");
            MutableObjectIterator iterator = build.getIterator();
            LOG.debug("Checking results...");
            int i = 1;
            int i2 = 2500000;
            IntPair intPair = new IntPair();
            IntPair intPair2 = new IntPair();
            IntPair intPair3 = (IntPair) iterator.next(intPair);
            IntPair intPair4 = intPair3;
            Assertions.assertThat(intPair3).isNotNull();
            while (true) {
                IntPair intPair5 = (IntPair) iterator.next(intPair2);
                if (intPair5 == null) {
                    Assertions.assertThat(i).withFailMessage("Not all pairs were read back in.", new Object[0]).isEqualTo(50000000);
                    build.close();
                    this.testSuccess = true;
                    return;
                }
                i++;
                Assertions.assertThat(intPair4.getKey()).isLessThanOrEqualTo(intPair5.getKey());
                IntPair intPair6 = intPair4;
                intPair4 = intPair5;
                intPair2 = intPair6;
                if (i == i2) {
                    i2 += 2500000;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            Assertions.fail(e.getMessage());
        }
    }
}
