/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptivebatch.util;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flink.runtime.executiongraph.IndexRange;
import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingInputInfo;
import org.apache.flink.runtime.scheduler.adaptivebatch.VertexInputInfoComputerTestUtil;
import org.apache.flink.runtime.scheduler.adaptivebatch.util.PointwiseVertexInputInfoComputer;
import org.junit.jupiter.api.Test;

class PointwiseVertexInputInfoComputerTest {
    PointwiseVertexInputInfoComputerTest() {
    }

    @Test
    void testComputeNormalInput() {
        PointwiseVertexInputInfoComputer computer = PointwiseVertexInputInfoComputerTest.createPointwiseVertexInputInfoComputer();
        List<BlockingInputInfo> inputInfos = PointwiseVertexInputInfoComputerTest.createBlockingInputInfos(2, List.of(), false);
        Map vertexInputs = computer.compute(inputInfos, 2, 2, 2, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(vertexInputs, inputInfos.get(0), 2);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{3L, 3L}, inputInfos, vertexInputs);
        Map vertexInputs2 = computer.compute(inputInfos, 3, 3, 3, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(vertexInputs2, inputInfos.get(0), 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{2L, 2L, 2L}, inputInfos, vertexInputs2);
    }

    @Test
    void testComputeSkewedInputsWithDifferentSkewedPartitions() {
        PointwiseVertexInputInfoComputer computer = PointwiseVertexInputInfoComputerTest.createPointwiseVertexInputInfoComputer();
        ArrayList<BlockingInputInfo> inputInfosWithDifferentSkewedPartitions = new ArrayList<BlockingInputInfo>();
        BlockingInputInfo inputInfo1 = PointwiseVertexInputInfoComputerTest.createBlockingInputInfo(3, 3, List.of(Integer.valueOf(0)), false);
        BlockingInputInfo inputInfo2 = PointwiseVertexInputInfoComputerTest.createBlockingInputInfo(3, 3, List.of(Integer.valueOf(1)), false);
        inputInfosWithDifferentSkewedPartitions.add(inputInfo1);
        inputInfosWithDifferentSkewedPartitions.add(inputInfo2);
        Map vertexInputs = computer.compute(inputInfosWithDifferentSkewedPartitions, 3, 3, 3, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(vertexInputs, inputInfo1, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{20L, 11L, 5L}, List.of(inputInfo1), vertexInputs);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(vertexInputs, inputInfo2, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{2L, 11L, 23L}, List.of(inputInfo2), vertexInputs);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{22L, 22L, 28L}, List.of(inputInfo1, inputInfo2), vertexInputs);
    }

    @Test
    void testComputeSkewedInputsWithDifferentNumPartitions() {
        PointwiseVertexInputInfoComputer computer = PointwiseVertexInputInfoComputerTest.createPointwiseVertexInputInfoComputer();
        ArrayList<BlockingInputInfo> inputInfosWithDifferentNumPartitions = new ArrayList<BlockingInputInfo>();
        BlockingInputInfo inputInfo1 = PointwiseVertexInputInfoComputerTest.createBlockingInputInfo(3, 3, List.of(Integer.valueOf(1)), false);
        BlockingInputInfo inputInfo2 = PointwiseVertexInputInfoComputerTest.createBlockingInputInfo(2, 3, List.of(Integer.valueOf(1)), false);
        inputInfosWithDifferentNumPartitions.add(inputInfo1);
        inputInfosWithDifferentNumPartitions.add(inputInfo2);
        Map vertexInputs = computer.compute(inputInfosWithDifferentNumPartitions, 3, 3, 3, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(vertexInputs, inputInfo1, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{13L, 20L, 3L}, List.of(inputInfo1), vertexInputs);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(vertexInputs, inputInfo2, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{3L, 10L, 20L}, List.of(inputInfo2), vertexInputs);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{16L, 30L, 23L}, List.of(inputInfo1, inputInfo2), vertexInputs);
    }

    @Test
    void testComputeSkewedInputsWithDifferentNumSubpartitions() {
        PointwiseVertexInputInfoComputer computer = PointwiseVertexInputInfoComputerTest.createPointwiseVertexInputInfoComputer();
        ArrayList<BlockingInputInfo> inputInfosWithDifferentNumSubpartitions = new ArrayList<BlockingInputInfo>();
        BlockingInputInfo inputInfo1 = PointwiseVertexInputInfoComputerTest.createBlockingInputInfo(3, 3, List.of(Integer.valueOf(1)), false);
        BlockingInputInfo inputInfo2 = PointwiseVertexInputInfoComputerTest.createBlockingInputInfo(3, 5, List.of(Integer.valueOf(1)), false);
        inputInfosWithDifferentNumSubpartitions.add(inputInfo1);
        inputInfosWithDifferentNumSubpartitions.add(inputInfo2);
        Map vertexInputs = computer.compute(inputInfosWithDifferentNumSubpartitions, 3, 3, 3, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(vertexInputs, inputInfo1, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{13L, 10L, 13L}, List.of(inputInfo1), vertexInputs);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(vertexInputs, inputInfo2, 3);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{15L, 20L, 25L}, List.of(inputInfo2), vertexInputs);
        VertexInputInfoComputerTestUtil.checkConsumedDataVolumePerSubtask(new long[]{28L, 30L, 38L}, List.of(inputInfo1, inputInfo2), vertexInputs);
    }

    @Test
    void testComputeInputWithIntraCorrelation() {
        PointwiseVertexInputInfoComputer computer = PointwiseVertexInputInfoComputerTest.createPointwiseVertexInputInfoComputer();
        List<BlockingInputInfo> inputInfos = PointwiseVertexInputInfoComputerTest.createBlockingInputInfos(3, List.of(), true);
        Map vertexInputs = computer.compute(inputInfos, 3, 3, 3, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(vertexInputs, inputInfos.get(0), 3);
        VertexInputInfoComputerTestUtil.checkConsumedSubpartitionGroups(List.of(Map.of(new IndexRange(0, 0), new IndexRange(0, 2)), Map.of(new IndexRange(1, 1), new IndexRange(0, 2)), Map.of(new IndexRange(2, 2), new IndexRange(0, 2))), inputInfos, vertexInputs);
        Map vertexInputs2 = computer.compute(inputInfos, 2, 2, 2, 10L);
        VertexInputInfoComputerTestUtil.checkCorrectnessForNonCorrelatedInput(vertexInputs2, inputInfos.get(0), 2);
        VertexInputInfoComputerTestUtil.checkConsumedSubpartitionGroups(List.of(Map.of(new IndexRange(0, 1), new IndexRange(0, 2)), Map.of(new IndexRange(2, 2), new IndexRange(0, 2))), inputInfos, vertexInputs2);
    }

    private static List<BlockingInputInfo> createBlockingInputInfos(int numPartitions, List<Integer> skewedPartitionIndex, boolean existIntraInputKeyCorrelation) {
        return List.of(PointwiseVertexInputInfoComputerTest.createBlockingInputInfo(numPartitions, 3, skewedPartitionIndex, existIntraInputKeyCorrelation));
    }

    private static BlockingInputInfo createBlockingInputInfo(int numPartitions, int numSubpartitions, List<Integer> skewedPartitionIndex, boolean existIntraInputKeyCorrelation) {
        return VertexInputInfoComputerTestUtil.createBlockingInputInfos(1, 1, numPartitions, numSubpartitions, existIntraInputKeyCorrelation, false, 1, 10.0, skewedPartitionIndex, List.of(), true).get(0);
    }

    private static PointwiseVertexInputInfoComputer createPointwiseVertexInputInfoComputer() {
        return new PointwiseVertexInputInfoComputer();
    }
}

