/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph.failover;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.flink.core.failure.TestingFailureEnricher;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.FailureHandlingResult;
import org.apache.flink.runtime.executiongraph.failover.FailureHandlingResultTest;
import org.apache.flink.runtime.executiongraph.failover.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
import org.apache.flink.runtime.topology.Vertex;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.IterableUtils;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ExecutionFailureHandlerTest {
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private static final long RESTART_DELAY_MS = 1234L;
    private SchedulingTopology schedulingTopology;
    private TestFailoverStrategy failoverStrategy;
    private AtomicBoolean isNewAttempt;
    private TestRestartBackoffTimeStrategy backoffTimeStrategy;
    private ExecutionFailureHandler executionFailureHandler;
    private TestingFailureEnricher testingFailureEnricher;

    ExecutionFailureHandlerTest() {
    }

    @BeforeEach
    void setUp() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        topology.newExecutionVertex();
        this.schedulingTopology = topology;
        this.failoverStrategy = new TestFailoverStrategy();
        this.testingFailureEnricher = new TestingFailureEnricher();
        this.isNewAttempt = new AtomicBoolean(true);
        this.backoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, 1234L, this.isNewAttempt::get);
        this.executionFailureHandler = new ExecutionFailureHandler(this.schedulingTopology, (FailoverStrategy)this.failoverStrategy, (RestartBackoffTimeStrategy)this.backoffTimeStrategy, ComponentMainThreadExecutorServiceAdapter.forMainThread(), Collections.singleton(this.testingFailureEnricher), null, null);
    }

    @Test
    void testNormalFailureHandling() throws Exception {
        Set<ExecutionVertexID> tasksToRestart = Collections.singleton(new ExecutionVertexID(new JobVertexID(), 0));
        this.failoverStrategy.setTasksToRestart(tasksToRestart);
        Execution execution = FailureHandlingResultTest.createExecution((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        Exception cause = new Exception("test failure");
        long timestamp = System.currentTimeMillis();
        FailureHandlingResult result = this.executionFailureHandler.getFailureHandlingResult(execution, (Throwable)cause, timestamp);
        Assertions.assertThat((boolean)result.canRestart()).isTrue();
        Assertions.assertThat((Optional)result.getFailedExecution()).isPresent();
        Assertions.assertThat(result.getFailedExecution().get()).isSameAs((Object)execution);
        Assertions.assertThat((long)result.getRestartDelayMS()).isEqualTo(1234L);
        Assertions.assertThat((Collection)result.getVerticesToRestart()).isEqualTo(tasksToRestart);
        Assertions.assertThat((Throwable)result.getError()).isSameAs((Object)cause);
        Assertions.assertThat((long)result.getTimestamp()).isEqualTo(timestamp);
        Assertions.assertThat((Collection)this.testingFailureEnricher.getSeenThrowables()).containsExactly((Object[])new Throwable[]{cause});
        Assertions.assertThat((Map)((Map)result.getFailureLabels().get())).isEqualTo((Object)this.testingFailureEnricher.getFailureLabels());
        Assertions.assertThat((long)this.executionFailureHandler.getNumberOfRestarts()).isOne();
    }

    @Test
    void testRestartingSuppressedFailureHandlingResult() throws Exception {
        this.backoffTimeStrategy.setCanRestart(false);
        Execution execution = FailureHandlingResultTest.createExecution((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        Exception error = new Exception("expected test failure");
        long timestamp = System.currentTimeMillis();
        FailureHandlingResult result = this.executionFailureHandler.getFailureHandlingResult(execution, (Throwable)error, timestamp);
        Assertions.assertThat((boolean)result.canRestart()).isFalse();
        Assertions.assertThat((Optional)result.getFailedExecution()).isPresent();
        Assertions.assertThat(result.getFailedExecution().get()).isSameAs((Object)execution);
        Assertions.assertThat((Throwable)result.getError()).hasCause((Throwable)error);
        Assertions.assertThat((long)result.getTimestamp()).isEqualTo(timestamp);
        Assertions.assertThat((Collection)this.testingFailureEnricher.getSeenThrowables()).containsExactly((Object[])new Throwable[]{error});
        Assertions.assertThat((Map)((Map)result.getFailureLabels().get())).isEqualTo((Object)this.testingFailureEnricher.getFailureLabels());
        Assertions.assertThat((boolean)ExecutionFailureHandler.isUnrecoverableError((Throwable)result.getError())).isFalse();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((FailureHandlingResult)result).getVerticesToRestart()).as("getVerticesToRestart is not allowed when restarting is suppressed", new Object[0])).isInstanceOf(IllegalStateException.class);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((FailureHandlingResult)result).getRestartDelayMS()).as("getRestartDelayMS is not allowed when restarting is suppressed", new Object[0])).isInstanceOf(IllegalStateException.class);
        Assertions.assertThat((long)this.executionFailureHandler.getNumberOfRestarts()).isZero();
    }

    @Test
    void testNonRecoverableFailureHandlingResult() throws Exception {
        Execution execution = FailureHandlingResultTest.createExecution((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        Exception error = new Exception((Throwable)new SuppressRestartsException((Throwable)new Exception("test failure")));
        long timestamp = System.currentTimeMillis();
        this.isNewAttempt.set(false);
        FailureHandlingResult result = this.executionFailureHandler.getFailureHandlingResult(execution, (Throwable)error, timestamp);
        Assertions.assertThat((boolean)result.canRestart()).isFalse();
        Assertions.assertThat((Optional)result.getFailedExecution()).isPresent();
        Assertions.assertThat(result.getFailedExecution().get()).isSameAs((Object)execution);
        Assertions.assertThat((Throwable)result.getError()).isNotNull();
        Assertions.assertThat((boolean)ExecutionFailureHandler.isUnrecoverableError((Throwable)result.getError())).isTrue();
        Assertions.assertThat((Collection)this.testingFailureEnricher.getSeenThrowables()).containsExactly((Object[])new Throwable[]{error});
        Assertions.assertThat((Map)((Map)result.getFailureLabels().get())).isEqualTo((Object)this.testingFailureEnricher.getFailureLabels());
        Assertions.assertThat((long)result.getTimestamp()).isEqualTo(timestamp);
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)result.isRootCause()).as("A NonRecoverableFailure should be new attempt even if RestartBackoffTimeStrategy consider it's not new attempt.", new Object[0])).isTrue();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((FailureHandlingResult)result).getVerticesToRestart()).as("getVerticesToRestart is not allowed when restarting is suppressed", new Object[0])).isInstanceOf(IllegalStateException.class);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((FailureHandlingResult)result).getRestartDelayMS()).as("getRestartDelayMS is not allowed when restarting is suppressed", new Object[0])).isInstanceOf(IllegalStateException.class);
        Assertions.assertThat((long)this.executionFailureHandler.getNumberOfRestarts()).isZero();
    }

    @Test
    void testNewAttemptAndNumberOfRestarts() throws Exception {
        Set<ExecutionVertexID> tasksToRestart = Collections.singleton(new ExecutionVertexID(new JobVertexID(), 0));
        this.failoverStrategy.setTasksToRestart(tasksToRestart);
        Execution execution = FailureHandlingResultTest.createExecution((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        Exception error = new Exception("expected test failure");
        this.testHandlingRootException(execution, error);
        this.isNewAttempt.set(false);
        this.testHandlingConcurrentException(execution, error);
        this.testHandlingConcurrentException(execution, error);
        this.isNewAttempt.set(true);
        this.testHandlingRootException(execution, error);
        this.testHandlingRootException(execution, error);
        this.isNewAttempt.set(false);
        this.testHandlingConcurrentException(execution, error);
        this.testHandlingConcurrentException(execution, error);
    }

    private void testHandlingRootException(Execution execution, Throwable error) {
        long originalNumberOfRestarts = this.executionFailureHandler.getNumberOfRestarts();
        FailureHandlingResult result = this.executionFailureHandler.getFailureHandlingResult(execution, error, System.currentTimeMillis());
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)result.isRootCause()).as("The FailureHandlingResult should be the root cause if exception is new attempt.", new Object[0])).isTrue();
        ((AbstractLongAssert)Assertions.assertThat((long)this.executionFailureHandler.getNumberOfRestarts()).as("The numberOfRestarts should be increased when it's a root exception.", new Object[0])).isEqualTo(originalNumberOfRestarts + 1L);
    }

    private void testHandlingConcurrentException(Execution execution, Throwable error) {
        long originalNumberOfRestarts = this.executionFailureHandler.getNumberOfRestarts();
        FailureHandlingResult result = this.executionFailureHandler.getFailureHandlingResult(execution, error, System.currentTimeMillis());
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)result.isRootCause()).as("The FailureHandlingResult shouldn't be the root cause if exception isn't new attempt.", new Object[0])).isFalse();
        ((AbstractLongAssert)Assertions.assertThat((long)this.executionFailureHandler.getNumberOfRestarts()).as("The numberOfRestarts shouldn't be increased when it isn't a root exception.", new Object[0])).isEqualTo(originalNumberOfRestarts);
    }

    @Test
    void testUnrecoverableErrorCheck() {
        Assertions.assertThat((boolean)ExecutionFailureHandler.isUnrecoverableError((Throwable)new Exception())).isFalse();
        Assertions.assertThat((boolean)ExecutionFailureHandler.isUnrecoverableError((Throwable)new SuppressRestartsException((Throwable)new Exception()))).isTrue();
        Assertions.assertThat((boolean)ExecutionFailureHandler.isUnrecoverableError((Throwable)new Exception((Throwable)new SuppressRestartsException((Throwable)new Exception())))).isTrue();
    }

    @Test
    void testGlobalFailureHandling() throws ExecutionException, InterruptedException {
        Exception error = new Exception("Expected test failure");
        long timestamp = System.currentTimeMillis();
        FailureHandlingResult result = this.executionFailureHandler.getGlobalFailureHandlingResult((Throwable)error, timestamp);
        Assertions.assertThat((Collection)result.getVerticesToRestart()).isEqualTo(IterableUtils.toStream((Iterable)this.schedulingTopology.getVertices()).map(Vertex::getId).collect(Collectors.toSet()));
        Assertions.assertThat((Throwable)result.getError()).isSameAs((Object)error);
        Assertions.assertThat((long)result.getTimestamp()).isEqualTo(timestamp);
        Assertions.assertThat((Collection)this.testingFailureEnricher.getSeenThrowables()).containsExactly((Object[])new Throwable[]{error});
        Assertions.assertThat((Map)((Map)result.getFailureLabels().get())).isEqualTo((Object)this.testingFailureEnricher.getFailureLabels());
    }

    private static class TestFailoverStrategy
    implements FailoverStrategy {
        private Set<ExecutionVertexID> tasksToRestart;

        public void setTasksToRestart(Set<ExecutionVertexID> tasksToRestart) {
            this.tasksToRestart = tasksToRestart;
        }

        public Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID executionVertexId, Throwable cause) {
            return this.tasksToRestart;
        }
    }
}

