/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.job;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.dump.MetricDump;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobVertexBackPressureHeaders;
import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo;
import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class JobVertexBackPressureHandlerTest {
    private static final JobID TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE = new JobID();
    private static final JobVertexID TEST_JOB_VERTEX_ID = new JobVertexID();
    private static final JobID TEST_JOB_ID_BACK_PRESSURE_STATS_ABSENT = new JobID();
    private TestingRestfulGateway restfulGateway;
    private JobVertexBackPressureHandler jobVertexBackPressureHandler;
    private MetricStore metricStore;

    JobVertexBackPressureHandlerTest() {
    }

    private static Collection<MetricDump> getMetricDumps() {
        ArrayList<MetricDump> dumps = new ArrayList<MetricDump>();
        QueryScopeInfo.TaskQueryScopeInfo task0 = new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), TEST_JOB_VERTEX_ID.toString(), 0, 0);
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task0, "backPressuredTimeMsPerSecond", "1000"));
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task0, "idleTimeMsPerSecond", "0"));
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task0, "busyTimeMsPerSecond", "0"));
        QueryScopeInfo.TaskQueryScopeInfo task1 = new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), TEST_JOB_VERTEX_ID.toString(), 1, 0);
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task1, "backPressuredTimeMsPerSecond", "500"));
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task1, "idleTimeMsPerSecond", "100"));
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task1, "busyTimeMsPerSecond", "900"));
        QueryScopeInfo.TaskQueryScopeInfo task3 = new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), TEST_JOB_VERTEX_ID.toString(), 3, 0);
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task3, "backPressuredTimeMsPerSecond", "100"));
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task3, "idleTimeMsPerSecond", "200"));
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task3, "busyTimeMsPerSecond", "700"));
        return dumps;
    }

    @BeforeEach
    void setUp() {
        this.metricStore = new MetricStore();
        for (MetricDump metricDump : JobVertexBackPressureHandlerTest.getMetricDumps()) {
            this.metricStore.add(metricDump);
        }
        this.jobVertexBackPressureHandler = new JobVertexBackPressureHandler(() -> CompletableFuture.completedFuture(this.restfulGateway), Duration.ofSeconds(10L), Collections.emptyMap(), (MessageHeaders)JobVertexBackPressureHeaders.getInstance(), new MetricFetcher(){
            private long updateCount = 0L;

            public MetricStore getMetricStore() {
                return JobVertexBackPressureHandlerTest.this.metricStore;
            }

            public void update() {
                ++this.updateCount;
            }

            public long getLastUpdateTime() {
                return this.updateCount;
            }
        });
    }

    private static Collection<MetricDump> getMultipleAttemptsMetricDumps() {
        ArrayList<MetricDump> dumps = new ArrayList<MetricDump>();
        QueryScopeInfo.TaskQueryScopeInfo task0 = new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), TEST_JOB_VERTEX_ID.toString(), 0, 0);
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task0, "backPressuredTimeMsPerSecond", "1000"));
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task0, "idleTimeMsPerSecond", "0"));
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task0, "busyTimeMsPerSecond", "0"));
        QueryScopeInfo.TaskQueryScopeInfo speculativeTask0 = new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), TEST_JOB_VERTEX_ID.toString(), 0, 1);
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)speculativeTask0, "backPressuredTimeMsPerSecond", "200"));
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)speculativeTask0, "idleTimeMsPerSecond", "100"));
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)speculativeTask0, "busyTimeMsPerSecond", "800"));
        QueryScopeInfo.TaskQueryScopeInfo task1 = new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), TEST_JOB_VERTEX_ID.toString(), 1, 0);
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task1, "backPressuredTimeMsPerSecond", "500"));
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task1, "idleTimeMsPerSecond", "100"));
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task1, "busyTimeMsPerSecond", "900"));
        QueryScopeInfo.TaskQueryScopeInfo speculativeTask1 = new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), TEST_JOB_VERTEX_ID.toString(), 1, 1);
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)speculativeTask1, "backPressuredTimeMsPerSecond", "900"));
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)speculativeTask1, "idleTimeMsPerSecond", "0"));
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)speculativeTask1, "busyTimeMsPerSecond", "100"));
        QueryScopeInfo.TaskQueryScopeInfo task3 = new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), TEST_JOB_VERTEX_ID.toString(), 3, 0);
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task3, "backPressuredTimeMsPerSecond", "100"));
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task3, "idleTimeMsPerSecond", "200"));
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task3, "busyTimeMsPerSecond", "700"));
        return dumps;
    }

    @Test
    void testGetBackPressure() throws Exception {
        HashMap<String, String> pathParameters = new HashMap<String, String>();
        pathParameters.put("jobid", TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString());
        pathParameters.put("vertexid", TEST_JOB_VERTEX_ID.toString());
        HandlerRequest request = HandlerRequest.resolveParametersAndCreate((RequestBody)EmptyRequestBody.getInstance(), (MessageParameters)new JobVertexMessageParameters(), pathParameters, Collections.emptyMap(), Collections.emptyList());
        CompletableFuture jobVertexBackPressureInfoCompletableFuture = this.jobVertexBackPressureHandler.handleRequest(request, (RestfulGateway)this.restfulGateway);
        JobVertexBackPressureInfo jobVertexBackPressureInfo = (JobVertexBackPressureInfo)jobVertexBackPressureInfoCompletableFuture.get();
        Assertions.assertThat((Comparable)jobVertexBackPressureInfo.getStatus()).isEqualTo((Object)JobVertexBackPressureInfo.VertexBackPressureStatus.OK);
        Assertions.assertThat((Comparable)jobVertexBackPressureInfo.getBackpressureLevel()).isEqualTo((Object)JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH);
        Assertions.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getBackPressuredRatio).collect(Collectors.toList())).containsExactly((Object[])new Double[]{1.0, 0.5, 0.1});
        Assertions.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getIdleRatio).collect(Collectors.toList())).containsExactly((Object[])new Double[]{0.0, 0.1, 0.2});
        Assertions.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getBusyRatio).collect(Collectors.toList())).containsExactly((Object[])new Double[]{0.0, 0.9, 0.7});
        Assertions.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getBackpressureLevel).collect(Collectors.toList())).containsExactly((Object[])new JobVertexBackPressureInfo.VertexBackPressureLevel[]{JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH, JobVertexBackPressureInfo.VertexBackPressureLevel.LOW, JobVertexBackPressureInfo.VertexBackPressureLevel.OK});
        Assertions.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getSubtask).collect(Collectors.toList())).containsExactly((Object[])new Integer[]{0, 1, 3});
    }

    @Test
    void testAbsentBackPressure() throws Exception {
        HashMap<String, String> pathParameters = new HashMap<String, String>();
        pathParameters.put("jobid", TEST_JOB_ID_BACK_PRESSURE_STATS_ABSENT.toString());
        pathParameters.put("vertexid", new JobVertexID().toString());
        HandlerRequest request = HandlerRequest.resolveParametersAndCreate((RequestBody)EmptyRequestBody.getInstance(), (MessageParameters)new JobVertexMessageParameters(), pathParameters, Collections.emptyMap(), Collections.emptyList());
        CompletableFuture jobVertexBackPressureInfoCompletableFuture = this.jobVertexBackPressureHandler.handleRequest(request, (RestfulGateway)this.restfulGateway);
        JobVertexBackPressureInfo jobVertexBackPressureInfo = (JobVertexBackPressureInfo)jobVertexBackPressureInfoCompletableFuture.get();
        Assertions.assertThat((Comparable)jobVertexBackPressureInfo.getStatus()).isEqualTo((Object)JobVertexBackPressureInfo.VertexBackPressureStatus.DEPRECATED);
    }

    @Test
    void testGetBackPressureFromMultipleCurrentAttempts() throws Exception {
        final MetricStore multipleAttemptsMetricStore = new MetricStore();
        for (MetricDump metricDump : JobVertexBackPressureHandlerTest.getMultipleAttemptsMetricDumps()) {
            multipleAttemptsMetricStore.add(metricDump);
        }
        HashMap<Integer, Integer> representativeAttempts = new HashMap<Integer, Integer>();
        representativeAttempts.put(0, 1);
        representativeAttempts.put(1, 0);
        multipleAttemptsMetricStore.getRepresentativeAttempts().put(TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), Collections.singletonMap(TEST_JOB_VERTEX_ID.toString(), representativeAttempts));
        JobVertexBackPressureHandler jobVertexBackPressureHandler = new JobVertexBackPressureHandler(() -> CompletableFuture.completedFuture(this.restfulGateway), Duration.ofSeconds(10L), Collections.emptyMap(), (MessageHeaders)JobVertexBackPressureHeaders.getInstance(), new MetricFetcher(){
            private long updateCount = 0L;

            public MetricStore getMetricStore() {
                return multipleAttemptsMetricStore;
            }

            public void update() {
                ++this.updateCount;
            }

            public long getLastUpdateTime() {
                return this.updateCount;
            }
        });
        HashMap<String, String> pathParameters = new HashMap<String, String>();
        pathParameters.put("jobid", TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString());
        pathParameters.put("vertexid", TEST_JOB_VERTEX_ID.toString());
        HandlerRequest request = HandlerRequest.resolveParametersAndCreate((RequestBody)EmptyRequestBody.getInstance(), (MessageParameters)new JobVertexMessageParameters(), pathParameters, Collections.emptyMap(), Collections.emptyList());
        CompletableFuture jobVertexBackPressureInfoCompletableFuture = jobVertexBackPressureHandler.handleRequest(request, (RestfulGateway)this.restfulGateway);
        JobVertexBackPressureInfo jobVertexBackPressureInfo = (JobVertexBackPressureInfo)jobVertexBackPressureInfoCompletableFuture.get();
        Assertions.assertThat((Comparable)jobVertexBackPressureInfo.getStatus()).isEqualTo((Object)JobVertexBackPressureInfo.VertexBackPressureStatus.OK);
        Assertions.assertThat((Comparable)jobVertexBackPressureInfo.getBackpressureLevel()).isEqualTo((Object)JobVertexBackPressureInfo.VertexBackPressureLevel.LOW);
        Assertions.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getAttemptNumber).collect(Collectors.toList())).containsExactly((Object[])new Integer[]{1, 0, null});
        Assertions.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getOtherConcurrentAttempts).filter(Objects::nonNull).flatMap(Collection::stream).map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getAttemptNumber).collect(Collectors.toList())).containsExactly((Object[])new Integer[]{0, 1});
        Assertions.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getBackPressuredRatio).collect(Collectors.toList())).containsExactly((Object[])new Double[]{0.2, 0.5, 0.1});
        Assertions.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getOtherConcurrentAttempts).filter(Objects::nonNull).flatMap(Collection::stream).map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getBackPressuredRatio).collect(Collectors.toList())).containsExactly((Object[])new Double[]{1.0, 0.9});
        Assertions.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getIdleRatio).collect(Collectors.toList())).containsExactly((Object[])new Double[]{0.1, 0.1, 0.2});
        Assertions.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getOtherConcurrentAttempts).filter(Objects::nonNull).flatMap(Collection::stream).map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getIdleRatio).collect(Collectors.toList())).containsExactly((Object[])new Double[]{0.0, 0.0});
        Assertions.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getBusyRatio).collect(Collectors.toList())).containsExactly((Object[])new Double[]{0.8, 0.9, 0.7});
        Assertions.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getOtherConcurrentAttempts).filter(Objects::nonNull).flatMap(Collection::stream).map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getBusyRatio).collect(Collectors.toList())).containsExactly((Object[])new Double[]{0.0, 0.1});
        Assertions.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getBackpressureLevel).collect(Collectors.toList())).containsExactly((Object[])new JobVertexBackPressureInfo.VertexBackPressureLevel[]{JobVertexBackPressureInfo.VertexBackPressureLevel.LOW, JobVertexBackPressureInfo.VertexBackPressureLevel.LOW, JobVertexBackPressureInfo.VertexBackPressureLevel.OK});
        Assertions.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getOtherConcurrentAttempts).filter(Objects::nonNull).flatMap(Collection::stream).map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getBackpressureLevel).collect(Collectors.toList())).containsExactly((Object[])new JobVertexBackPressureInfo.VertexBackPressureLevel[]{JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH, JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH});
        Assertions.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getSubtask).collect(Collectors.toList())).containsExactly((Object[])new Integer[]{0, 1, 3});
        Assertions.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getOtherConcurrentAttempts).filter(Objects::nonNull).flatMap(Collection::stream).map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getSubtask).collect(Collectors.toList())).containsExactly((Object[])new Integer[]{0, 1});
    }
}

