/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.webmonitor.threadinfo;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.runtime.util.JvmUtils;
import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoTracker;
import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoTrackerBuilder;
import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoRequestCoordinator;
import org.apache.flink.runtime.webmonitor.threadinfo.VertexThreadInfoStats;
import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava30.com.google.common.cache.RemovalListener;
import org.apache.flink.shaded.guava30.com.google.common.cache.RemovalNotification;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.AssertionsForClassTypes;
import org.assertj.core.api.OptionalAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class JobVertexThreadInfoTrackerTest
extends TestLogger {
    private static final int REQUEST_ID = 0;
    private static final ExecutionJobVertex EXECUTION_JOB_VERTEX = JobVertexThreadInfoTrackerTest.createExecutionJobVertex();
    private static final ExecutionVertex[] TASK_VERTICES = EXECUTION_JOB_VERTEX.getTaskVertices();
    private static final Set<ExecutionAttemptID> ATTEMPT_IDS = Arrays.stream(TASK_VERTICES).map(executionVertex -> executionVertex.getCurrentExecutionAttempt().getAttemptId()).collect(Collectors.toSet());
    private static final JobID JOB_ID = new JobID();
    private static VertexThreadInfoStats threadInfoStatsDefaultSample;
    private static final Duration CLEAN_UP_INTERVAL;
    private static final Duration STATS_REFRESH_INTERVAL;
    private static final Duration TIME_GAP;
    private static final Duration SMALL_TIME_GAP;
    private static final Duration REQUEST_TIMEOUT;
    private static final int NUMBER_OF_SAMPLES = 1;
    private static final int MAX_STACK_TRACE_DEPTH = 100;
    private static final Duration DELAY_BETWEEN_SAMPLES;
    private static ScheduledExecutorService executor;

    @BeforeAll
    public static void setUp() {
        threadInfoStatsDefaultSample = JobVertexThreadInfoTrackerTest.createThreadInfoStats(0, SMALL_TIME_GAP);
        executor = Executors.newScheduledThreadPool(1);
    }

    @AfterAll
    public static void tearDown() {
        if (executor != null) {
            executor.shutdownNow();
        }
    }

    @Test
    public void testGetThreadInfoStats() throws Exception {
        this.doInitialRequestAndVerifyResult(this.createThreadInfoTracker());
    }

    @Test
    public void testCachedStatsNotUpdatedWithinRefreshInterval() throws Exception {
        VertexThreadInfoStats unusedThreadInfoStats = JobVertexThreadInfoTrackerTest.createThreadInfoStats(1, TIME_GAP);
        JobVertexThreadInfoTracker<VertexThreadInfoStats> tracker = this.createThreadInfoTracker(STATS_REFRESH_INTERVAL, threadInfoStatsDefaultSample, unusedThreadInfoStats);
        this.doInitialRequestAndVerifyResult(tracker);
        Optional result = tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX);
        ((OptionalAssert)AssertionsForClassTypes.assertThat((Optional)result).isPresent()).hasValue((Object)threadInfoStatsDefaultSample);
    }

    @Test
    public void testCachedStatsUpdatedAfterRefreshInterval() throws Exception {
        Duration shortRefreshInterval = Duration.ofMillis(1L);
        VertexThreadInfoStats initialThreadInfoStats = JobVertexThreadInfoTrackerTest.createThreadInfoStats(Instant.now().minus(10L, ChronoUnit.SECONDS), 0, Duration.ofMillis(5L));
        VertexThreadInfoStats threadInfoStatsAfterRefresh = JobVertexThreadInfoTrackerTest.createThreadInfoStats(1, TIME_GAP);
        CountDownLatch cacheRefreshed = new CountDownLatch(1);
        Cache<JobVertexThreadInfoTracker.Key, VertexThreadInfoStats> vertexStatsCache = this.createCache(CLEAN_UP_INTERVAL, new LatchRemovalListener<JobVertexThreadInfoTracker.Key, VertexThreadInfoStats>(cacheRefreshed));
        JobVertexThreadInfoTracker<VertexThreadInfoStats> tracker = this.createThreadInfoTracker(CLEAN_UP_INTERVAL, shortRefreshInterval, vertexStatsCache, initialThreadInfoStats, threadInfoStatsAfterRefresh);
        AssertionsForClassTypes.assertThat((Optional)tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX)).isNotPresent();
        tracker.getResultAvailableFuture().get();
        JobVertexThreadInfoTrackerTest.assertExpectedEqualsReceived(initialThreadInfoStats, tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX));
        cacheRefreshed.await();
        Optional result = tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX);
        JobVertexThreadInfoTrackerTest.assertExpectedEqualsReceived(threadInfoStatsAfterRefresh, result);
    }

    @Test
    public void testCachedStatsCleanedAfterCleanupInterval() throws Exception {
        Duration shortCleanUpInterval = Duration.ofMillis(1L);
        CountDownLatch cacheExpired = new CountDownLatch(1);
        Cache<JobVertexThreadInfoTracker.Key, VertexThreadInfoStats> vertexStatsCache = this.createCache(shortCleanUpInterval, new LatchRemovalListener<JobVertexThreadInfoTracker.Key, VertexThreadInfoStats>(cacheExpired));
        JobVertexThreadInfoTracker<VertexThreadInfoStats> tracker = this.createThreadInfoTracker(shortCleanUpInterval, STATS_REFRESH_INTERVAL, vertexStatsCache, threadInfoStatsDefaultSample);
        AssertionsForClassTypes.assertThat((Optional)tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX)).isNotPresent();
        cacheExpired.await();
        AssertionsForClassTypes.assertThat((Optional)tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX)).isNotPresent();
    }

    @Test
    public void testCachedStatsNotCleanedWithinCleanupInterval() throws Exception {
        JobVertexThreadInfoTracker<VertexThreadInfoStats> tracker = this.createThreadInfoTracker();
        this.doInitialRequestAndVerifyResult(tracker);
        tracker.cleanUpVertexStatsCache();
        JobVertexThreadInfoTrackerTest.assertExpectedEqualsReceived(threadInfoStatsDefaultSample, tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX));
    }

    @Test
    public void testShutDown() throws Exception {
        JobVertexThreadInfoTracker<VertexThreadInfoStats> tracker = this.createThreadInfoTracker();
        this.doInitialRequestAndVerifyResult(tracker);
        tracker.shutDown();
        AssertionsForClassTypes.assertThat((Optional)tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX)).isNotPresent();
        AssertionsForClassTypes.assertThat((Optional)tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX)).isNotPresent();
    }

    private Cache<JobVertexThreadInfoTracker.Key, VertexThreadInfoStats> createCache(Duration cleanUpInterval, RemovalListener<JobVertexThreadInfoTracker.Key, VertexThreadInfoStats> removalListener) {
        return CacheBuilder.newBuilder().concurrencyLevel(1).expireAfterAccess(cleanUpInterval.toMillis(), TimeUnit.MILLISECONDS).removalListener(removalListener).build();
    }

    private void doInitialRequestAndVerifyResult(JobVertexThreadInfoTracker<VertexThreadInfoStats> tracker) throws InterruptedException, ExecutionException {
        AssertionsForClassTypes.assertThat((Optional)tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX)).isNotPresent();
        tracker.getResultAvailableFuture().get();
        JobVertexThreadInfoTrackerTest.assertExpectedEqualsReceived(threadInfoStatsDefaultSample, tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX));
    }

    private static void assertExpectedEqualsReceived(VertexThreadInfoStats expected, Optional<VertexThreadInfoStats> receivedOptional) {
        AssertionsForClassTypes.assertThat(receivedOptional).isPresent();
        VertexThreadInfoStats received = receivedOptional.get();
        AssertionsForClassTypes.assertThat((int)expected.getRequestId()).isEqualTo(received.getRequestId());
        AssertionsForClassTypes.assertThat((long)expected.getEndTime()).isEqualTo(received.getEndTime());
        AssertionsForClassTypes.assertThat((int)TASK_VERTICES.length).isEqualTo(received.getNumberOfSubtasks());
        for (Collection samples : received.getSamplesBySubtask().values()) {
            AssertionsForClassTypes.assertThat((boolean)samples.isEmpty()).isFalse();
        }
    }

    private JobVertexThreadInfoTracker<VertexThreadInfoStats> createThreadInfoTracker() {
        return this.createThreadInfoTracker(STATS_REFRESH_INTERVAL, threadInfoStatsDefaultSample);
    }

    private JobVertexThreadInfoTracker<VertexThreadInfoStats> createThreadInfoTracker(Duration statsRefreshInterval, VertexThreadInfoStats ... stats) {
        return this.createThreadInfoTracker(CLEAN_UP_INTERVAL, statsRefreshInterval, null, stats);
    }

    private JobVertexThreadInfoTracker<VertexThreadInfoStats> createThreadInfoTracker(Duration cleanUpInterval, Duration statsRefreshInterval, Cache<JobVertexThreadInfoTracker.Key, VertexThreadInfoStats> vertexStatsCache, VertexThreadInfoStats ... stats) {
        TestingThreadInfoRequestCoordinator coordinator = new TestingThreadInfoRequestCoordinator(Runnable::run, REQUEST_TIMEOUT, stats);
        return JobVertexThreadInfoTrackerBuilder.newBuilder(JobVertexThreadInfoTrackerTest::createMockResourceManagerGateway, Function.identity(), (ScheduledExecutorService)executor, (Time)TestingUtils.TIMEOUT).setCoordinator((ThreadInfoRequestCoordinator)coordinator).setCleanUpInterval(cleanUpInterval).setNumSamples(1).setStatsRefreshInterval(statsRefreshInterval).setDelayBetweenSamples(DELAY_BETWEEN_SAMPLES).setMaxThreadInfoDepth(100).setVertexStatsCache(vertexStatsCache).build();
    }

    private static VertexThreadInfoStats createThreadInfoStats(int requestId, Duration timeGap) {
        return JobVertexThreadInfoTrackerTest.createThreadInfoStats(Instant.now(), requestId, timeGap);
    }

    private static VertexThreadInfoStats createThreadInfoStats(Instant startTime, int requestId, Duration timeGap) {
        Instant endTime = startTime.plus(timeGap);
        HashMap samples = new HashMap();
        for (ExecutionVertex vertex : TASK_VERTICES) {
            Optional threadInfoSample = JvmUtils.createThreadInfoSample((long)Thread.currentThread().getId(), (int)100);
            Preconditions.checkState((boolean)threadInfoSample.isPresent(), (Object)"The threadInfoSample should be empty.");
            samples.put(vertex.getCurrentExecutionAttempt().getAttemptId(), Collections.singletonList(threadInfoSample.get()));
        }
        return new VertexThreadInfoStats(requestId, startTime.toEpochMilli(), endTime.toEpochMilli(), samples);
    }

    private static ExecutionJobVertex createExecutionJobVertex() {
        try {
            JobVertex jobVertex = new JobVertex("testVertex");
            jobVertex.setParallelism(10);
            jobVertex.setInvokableClass(AbstractInvokable.class);
            DefaultScheduler scheduler = SchedulerTestingUtils.createScheduler(JobGraphTestUtils.streamingJobGraph(jobVertex), ComponentMainThreadExecutorServiceAdapter.forMainThread(), new DirectScheduledExecutorService());
            ExecutionGraph eg = scheduler.getExecutionGraph();
            scheduler.startScheduling();
            ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
            return scheduler.getExecutionJobVertex(jobVertex.getID());
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to create ExecutionJobVertex.");
        }
    }

    private static CompletableFuture<ResourceManagerGateway> createMockResourceManagerGateway() {
        Function<ResourceID, CompletableFuture<TaskExecutorThreadInfoGateway>> function = resourceID -> CompletableFuture.completedFuture(null);
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setRequestTaskExecutorGatewayFunction(function);
        return CompletableFuture.completedFuture(testingResourceManagerGateway);
    }

    static {
        CLEAN_UP_INTERVAL = Duration.ofSeconds(60L);
        STATS_REFRESH_INTERVAL = Duration.ofSeconds(60L);
        TIME_GAP = Duration.ofSeconds(60L);
        SMALL_TIME_GAP = Duration.ofMillis(1L);
        REQUEST_TIMEOUT = Duration.ofSeconds(10L);
        DELAY_BETWEEN_SAMPLES = Duration.ofMillis(50L);
    }

    private static class LatchRemovalListener<K, V>
    implements RemovalListener<K, V> {
        private final CountDownLatch latch;

        private LatchRemovalListener(CountDownLatch latch) {
            this.latch = latch;
        }

        public void onRemoval(@Nonnull RemovalNotification<K, V> removalNotification) {
            this.latch.countDown();
        }
    }

    private static class TestingThreadInfoRequestCoordinator
    extends ThreadInfoRequestCoordinator {
        private final VertexThreadInfoStats[] vertexThreadInfoStats;
        private int counter = 0;

        TestingThreadInfoRequestCoordinator(Executor executor, Duration requestTimeout, VertexThreadInfoStats ... vertexThreadInfoStats) {
            super(executor, requestTimeout);
            this.vertexThreadInfoStats = vertexThreadInfoStats;
        }

        public CompletableFuture<VertexThreadInfoStats> triggerThreadInfoRequest(Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>> executionsWithGateways, int ignored2, Duration ignored3, int ignored4) {
            AssertionsForClassTypes.assertThat((executionsWithGateways.size() == 1 ? 1 : 0) != 0).isTrue();
            AssertionsForClassTypes.assertThat(executionsWithGateways.keySet().iterator().next()).isEqualTo((Object)ATTEMPT_IDS);
            return CompletableFuture.completedFuture(this.vertexThreadInfoStats[this.counter++ % this.vertexThreadInfoStats.length]);
        }
    }
}

