/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.sort;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeutils.GenericPairComparator;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.operators.sort.MergeIterator;
import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.Match;
import org.apache.flink.runtime.operators.testutils.MatchRemovingJoiner;
import org.apache.flink.runtime.operators.testutils.TestData;
import org.apache.flink.util.MutableObjectIterator;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class ReusingSortMergeInnerJoinIteratorITCase {
    private static final int MEMORY_SIZE = 0x1000000;
    private static final int PAGES_FOR_BNLJN = 2;
    private static final int INPUT_1_SIZE = 20000;
    private static final int INPUT_2_SIZE = 1000;
    private static final long SEED1 = 561349061987311L;
    private static final long SEED2 = 231434613412342L;
    private final AbstractInvokable parentTask = new DummyInvokable();
    private IOManager ioManager;
    private MemoryManager memoryManager;
    private TypeSerializer<Tuple2<Integer, String>> serializer1;
    private TypeSerializer<Tuple2<Integer, String>> serializer2;
    private TypeComparator<Tuple2<Integer, String>> comparator1;
    private TypeComparator<Tuple2<Integer, String>> comparator2;
    private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> pairComparator;

    ReusingSortMergeInnerJoinIteratorITCase() {
    }

    @BeforeEach
    void beforeTest() {
        this.serializer1 = new TupleSerializer(Tuple2.class, new TypeSerializer[]{IntSerializer.INSTANCE, StringSerializer.INSTANCE});
        this.serializer2 = new TupleSerializer(Tuple2.class, new TypeSerializer[]{IntSerializer.INSTANCE, StringSerializer.INSTANCE});
        this.comparator1 = new TupleComparator(new int[]{0}, new TypeComparator[]{new IntComparator(true)}, new TypeSerializer[]{IntSerializer.INSTANCE});
        this.comparator2 = new TupleComparator(new int[]{0}, new TypeComparator[]{new IntComparator(true)}, new TypeSerializer[]{IntSerializer.INSTANCE});
        this.pairComparator = new GenericPairComparator(this.comparator1, this.comparator2);
        this.memoryManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x1000000L).build();
        this.ioManager = new IOManagerAsync();
    }

    @AfterEach
    void afterTest() throws Exception {
        if (this.ioManager != null) {
            this.ioManager.close();
            this.ioManager = null;
        }
        if (this.memoryManager != null) {
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.memoryManager.verifyEmpty()).withFailMessage("Memory Leak: Not all memory has been returned to the memory manager.", new Object[0])).isTrue();
            this.memoryManager.shutdown();
            this.memoryManager = null;
        }
    }

    @Test
    void testMerge() {
        try {
            TestData.TupleGenerator generator1 = new TestData.TupleGenerator(561349061987311L, 500, 4096, TestData.TupleGenerator.KeyMode.SORTED, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
            TestData.TupleGenerator generator2 = new TestData.TupleGenerator(231434613412342L, 500, 2048, TestData.TupleGenerator.KeyMode.SORTED, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
            TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, 20000);
            TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, 1000);
            Map<Integer, Collection<Match>> expectedMatchesMap = this.matchValues(this.collectData(input1), this.collectData(input2));
            MatchRemovingJoiner joinFunction = new MatchRemovingJoiner(expectedMatchesMap);
            DiscardingOutputCollector collector = new DiscardingOutputCollector();
            generator1.reset();
            generator2.reset();
            input1.reset();
            input2.reset();
            ReusingMergeInnerJoinIterator iterator = new ReusingMergeInnerJoinIterator((MutableObjectIterator)input1, (MutableObjectIterator)input2, this.serializer1, this.comparator1, this.serializer2, this.comparator2, this.pairComparator, this.memoryManager, this.ioManager, 2, this.parentTask);
            iterator.open();
            while (iterator.callWithNextKey((FlatJoinFunction)joinFunction, collector)) {
            }
            iterator.close();
            for (Map.Entry<Integer, Collection<Match>> entry : expectedMatchesMap.entrySet()) {
                ((AbstractCollectionAssert)Assertions.assertThat(entry.getValue()).withFailMessage("Collection for key %d is not empty", new Object[]{entry.getKey()})).isEmpty();
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)("An exception occurred during the test: " + e.getMessage()));
        }
    }

    @Test
    void testMergeWithHighNumberOfCommonKeys() {
        int INPUT_1_SIZE = 200;
        int INPUT_2_SIZE = 100;
        int INPUT_1_DUPLICATES = 10;
        int INPUT_2_DUPLICATES = 4000;
        int DUPLICATE_KEY = 13;
        try {
            TestData.TupleGenerator generator1 = new TestData.TupleGenerator(561349061987311L, 500, 4096, TestData.TupleGenerator.KeyMode.SORTED, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
            TestData.TupleGenerator generator2 = new TestData.TupleGenerator(231434613412342L, 500, 2048, TestData.TupleGenerator.KeyMode.SORTED, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
            TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, 200);
            TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, 100);
            TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(13, "LEFT String for Duplicate Keys", 10);
            TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(13, "RIGHT String for Duplicate Keys", 4000);
            ArrayList<Object> inList1 = new ArrayList<Object>();
            inList1.add(gen1Iter);
            inList1.add(const1Iter);
            ArrayList<Object> inList2 = new ArrayList<Object>();
            inList2.add(gen2Iter);
            inList2.add(const2Iter);
            MergeIterator input1 = new MergeIterator(inList1, this.comparator1.duplicate());
            MergeIterator input2 = new MergeIterator(inList2, this.comparator2.duplicate());
            Map<Integer, Collection<Match>> expectedMatchesMap = this.matchValues(this.collectData((MutableObjectIterator<Tuple2<Integer, String>>)input1), this.collectData((MutableObjectIterator<Tuple2<Integer, String>>)input2));
            generator1.reset();
            generator2.reset();
            const1Iter.reset();
            const2Iter.reset();
            gen1Iter.reset();
            gen2Iter.reset();
            inList1.clear();
            inList1.add(gen1Iter);
            inList1.add(const1Iter);
            inList2.clear();
            inList2.add(gen2Iter);
            inList2.add(const2Iter);
            input1 = new MergeIterator(inList1, this.comparator1.duplicate());
            input2 = new MergeIterator(inList2, this.comparator2.duplicate());
            MatchRemovingJoiner matcher = new MatchRemovingJoiner(expectedMatchesMap);
            DiscardingOutputCollector collector = new DiscardingOutputCollector();
            ReusingMergeInnerJoinIterator iterator = new ReusingMergeInnerJoinIterator((MutableObjectIterator)input1, (MutableObjectIterator)input2, this.serializer1, this.comparator1, this.serializer2, this.comparator2, this.pairComparator, this.memoryManager, this.ioManager, 2, this.parentTask);
            iterator.open();
            while (iterator.callWithNextKey((FlatJoinFunction)matcher, collector)) {
            }
            iterator.close();
            for (Map.Entry<Integer, Collection<Match>> entry : expectedMatchesMap.entrySet()) {
                ((AbstractCollectionAssert)Assertions.assertThat(entry.getValue()).withFailMessage("Collection for key %d is not empty", new Object[]{entry.getKey()})).isEmpty();
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)("An exception occurred during the test: " + e.getMessage()));
        }
    }

    private Map<Integer, Collection<Match>> matchValues(Map<Integer, Collection<String>> leftMap, Map<Integer, Collection<String>> rightMap) {
        HashMap<Integer, Collection<Match>> map = new HashMap<Integer, Collection<Match>>();
        for (Integer key : leftMap.keySet()) {
            Collection<String> leftValues = leftMap.get(key);
            Collection<String> rightValues = rightMap.get(key);
            if (rightValues == null) continue;
            if (!map.containsKey(key)) {
                map.put(key, new ArrayList());
            }
            Collection matchedValues = (Collection)map.get(key);
            for (String leftValue : leftValues) {
                for (String rightValue : rightValues) {
                    matchedValues.add(new Match(leftValue, rightValue));
                }
            }
        }
        return map;
    }

    private Map<Integer, Collection<String>> collectData(MutableObjectIterator<Tuple2<Integer, String>> iter) throws Exception {
        HashMap<Integer, Collection<String>> map = new HashMap<Integer, Collection<String>>();
        Tuple2 pair = new Tuple2();
        while ((pair = (Tuple2)iter.next((Object)pair)) != null) {
            Integer key = (Integer)pair.getField(0);
            if (!map.containsKey(key)) {
                map.put(key, new ArrayList());
            }
            Collection values = (Collection)map.get(key);
            String value = (String)pair.getField(1);
            values.add(value);
        }
        return map;
    }
}

