/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.ThreadInfoSample;
import org.apache.flink.runtime.taskexecutor.IdleTestTask;
import org.apache.flink.runtime.taskexecutor.SampleableTask;
import org.apache.flink.runtime.taskexecutor.ThreadInfoSampleService;
import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class ThreadInfoSampleServiceTest {
    private static final int NUMBER_OF_SAMPLES = 10;
    private static final Duration DELAY_BETWEEN_SAMPLES = Duration.ofMillis(10L);
    private static final int MAX_STACK_TRACK_DEPTH = 10;
    private static final ThreadInfoSamplesRequest requestParams = new ThreadInfoSamplesRequest(1, 10, DELAY_BETWEEN_SAMPLES, 10);
    private ThreadInfoSampleService threadInfoSampleService;

    ThreadInfoSampleServiceTest() {
    }

    @BeforeEach
    void setUp() throws Exception {
        this.threadInfoSampleService = new ThreadInfoSampleService(Executors.newSingleThreadScheduledExecutor());
    }

    @AfterEach
    void tearDown() throws Exception {
        if (this.threadInfoSampleService != null) {
            this.threadInfoSampleService.close();
        }
    }

    @Test
    void testSampleTaskThreadInfo() throws Exception {
        HashSet<IdleTestTask> tasks = new HashSet<IdleTestTask>();
        IdleTestTask.executeWithTerminationGuarantee(() -> {
            tasks.add(new IdleTestTask());
            tasks.add(new IdleTestTask());
            Thread.sleep(2000L);
            Map<Long, ExecutionAttemptID> threads = ThreadInfoSampleServiceTest.collectExecutionAttempts(tasks);
            Map threadInfoSamples = (Map)this.threadInfoSampleService.requestThreadInfoSamples(threads, requestParams).get();
            int count = 0;
            for (Collection samples : threadInfoSamples.values()) {
                for (ThreadInfoSample sample : samples) {
                    ++count;
                    Object[] traces = sample.getStackTrace();
                    Assertions.assertThat((Object[])traces).hasSizeLessThanOrEqualTo(10);
                }
            }
            Assertions.assertThat((int)count).isEqualTo(20);
        }, tasks);
    }

    @Test
    void testTruncateStackTraceIfLimitIsSpecified() throws Exception {
        HashSet<IdleTestTask> tasks = new HashSet<IdleTestTask>();
        IdleTestTask.executeWithTerminationGuarantee(() -> {
            tasks.add(new IdleTestTask());
            Map<Long, ExecutionAttemptID> threads = ThreadInfoSampleServiceTest.collectExecutionAttempts(tasks);
            Map threadInfoSamples1 = (Map)this.threadInfoSampleService.requestThreadInfoSamples(threads, requestParams).get();
            Map threadInfoSamples2 = (Map)this.threadInfoSampleService.requestThreadInfoSamples(threads, new ThreadInfoSamplesRequest(1, 10, DELAY_BETWEEN_SAMPLES, 4)).get();
            for (Collection samples : threadInfoSamples1.values()) {
                for (ThreadInfoSample sample : samples) {
                    Assertions.assertThat((Object[])sample.getStackTrace()).hasSizeLessThanOrEqualTo(10);
                }
            }
            for (Collection samples : threadInfoSamples2.values()) {
                for (ThreadInfoSample sample : samples) {
                    Assertions.assertThat((Object[])sample.getStackTrace()).hasSize(4);
                }
            }
        }, tasks);
    }

    @Test
    void testThrowExceptionIfNumSamplesIsNegative() {
        HashSet tasks = new HashSet();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> IdleTestTask.executeWithTerminationGuarantee(() -> {
            tasks.add(new IdleTestTask());
            Map<Long, ExecutionAttemptID> threads = ThreadInfoSampleServiceTest.collectExecutionAttempts(tasks);
            this.threadInfoSampleService.requestThreadInfoSamples(threads, new ThreadInfoSamplesRequest(1, -1, DELAY_BETWEEN_SAMPLES, 10));
        }, tasks)).isInstanceOf(IllegalArgumentException.class)).hasMessage("numSamples must be positive");
    }

    @Test
    void testShouldThrowExceptionIfTaskIsNotRunningBeforeSampling() throws ExecutionException, InterruptedException {
        HashSet<NotRunningTask> tasks = new HashSet<NotRunningTask>();
        tasks.add(new NotRunningTask());
        Map<Long, ExecutionAttemptID> threads = ThreadInfoSampleServiceTest.collectExecutionAttempts(tasks);
        CompletableFuture sampleFuture = this.threadInfoSampleService.requestThreadInfoSamples(threads, requestParams);
        FlinkAssertions.assertThatFuture((CompletableFuture)sampleFuture).eventuallyFails();
        Assertions.assertThat((Throwable)((Throwable)((CompletableFuture)sampleFuture.handle((ignored, e) -> e)).get())).isInstanceOf(IllegalStateException.class);
    }

    private static Map<Long, ExecutionAttemptID> collectExecutionAttempts(Set<? extends SampleableTask> tasks) {
        return tasks.stream().collect(Collectors.toMap(task -> task.getExecutingThread().getId(), SampleableTask::getExecutionId));
    }

    private static class NotRunningTask
    implements SampleableTask {
        private final ExecutionAttemptID executionId = ExecutionAttemptID.randomId();

        private NotRunningTask() {
        }

        public Thread getExecutingThread() {
            return new Thread();
        }

        public ExecutionAttemptID getExecutionId() {
            return this.executionId;
        }
    }
}

