/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.AbstractCompleteCheckpointStore;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Mockito;

class CheckpointCoordinatorFailureTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    CheckpointCoordinatorFailureTest() {
    }

    @Test
    void testFailingCompletedCheckpointStoreAdd() throws Exception {
        JobVertexID jobVertexId = new JobVertexID();
        ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        ExecutionGraph testGraph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexId).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex = testGraph.getJobVertex(jobVertexId).getTaskVertices()[0];
        CheckpointCoordinator coord = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore((CompletedCheckpointStore)new FailingCompletedCheckpointStore(new Exception("The failing completed checkpoint store failed again... :-("))).setTimer((ScheduledExecutor)manuallyTriggeredScheduledExecutor).build(testGraph);
        coord.triggerCheckpoint(false);
        manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat((int)coord.getNumberOfPendingCheckpoints()).isOne();
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint)coord.getPendingCheckpoints().values().iterator().next();
        Assertions.assertThat((boolean)pendingCheckpoint.isDisposed()).isFalse();
        long checkpointId = (Long)coord.getPendingCheckpoints().keySet().iterator().next();
        KeyedStateHandle managedKeyedHandle = (KeyedStateHandle)Mockito.mock(KeyedStateHandle.class);
        Mockito.when((Object)managedKeyedHandle.getStateHandleId()).thenReturn((Object)StateHandleID.randomStateHandleId());
        KeyedStateHandle rawKeyedHandle = (KeyedStateHandle)Mockito.mock(KeyedStateHandle.class);
        Mockito.when((Object)rawKeyedHandle.getStateHandleId()).thenReturn((Object)StateHandleID.randomStateHandleId());
        OperatorStateHandle managedOpHandle = (OperatorStateHandle)Mockito.mock(OperatorStreamStateHandle.class);
        OperatorStateHandle rawOpHandle = (OperatorStateHandle)Mockito.mock(OperatorStreamStateHandle.class);
        InputChannelStateHandle inputChannelStateHandle = new InputChannelStateHandle(new InputChannelInfo(0, 1), (StreamStateHandle)Mockito.mock(StreamStateHandle.class), Collections.singletonList(1L));
        ResultSubpartitionStateHandle resultSubpartitionStateHandle = new ResultSubpartitionStateHandle(new ResultSubpartitionInfo(0, 1), (StreamStateHandle)Mockito.mock(StreamStateHandle.class), Collections.singletonList(1L));
        OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState)Mockito.spy((Object)OperatorSubtaskState.builder().setManagedOperatorState(managedOpHandle).setRawOperatorState(rawOpHandle).setManagedKeyedState(managedKeyedHandle).setRawKeyedState(rawKeyedHandle).setInputChannelState(StateObjectCollection.singleton((StateObject)inputChannelStateHandle)).setResultSubpartitionState(StateObjectCollection.singleton((StateObject)resultSubpartitionStateHandle)).build());
        TaskStateSnapshot subtaskState = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
        subtaskState.putSubtaskStateByOperatorID(new OperatorID(), operatorSubtaskState);
        Mockito.when((Object)subtaskState.getSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)vertex.getJobvertexId()))).thenReturn((Object)operatorSubtaskState);
        AcknowledgeCheckpoint acknowledgeMessage = new AcknowledgeCheckpoint(testGraph.getJobID(), vertex.getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskState);
        try {
            coord.receiveAcknowledgeMessage(acknowledgeMessage, "Unknown location");
            Assertions.fail((String)"Expected a checkpoint exception because the completed checkpoint store could not store the completed checkpoint.");
        }
        catch (CheckpointException checkpointException) {
            // empty catch block
        }
        Assertions.assertThat((boolean)pendingCheckpoint.isDisposed()).isTrue();
        ((OperatorStateHandle)Mockito.verify(operatorSubtaskState.getManagedOperatorState().iterator().next())).discardState();
        ((OperatorStateHandle)Mockito.verify(operatorSubtaskState.getRawOperatorState().iterator().next())).discardState();
        ((KeyedStateHandle)Mockito.verify(operatorSubtaskState.getManagedKeyedState().iterator().next())).discardState();
        ((KeyedStateHandle)Mockito.verify(operatorSubtaskState.getRawKeyedState().iterator().next())).discardState();
        ((StreamStateHandle)Mockito.verify((Object)((InputChannelStateHandle)operatorSubtaskState.getInputChannelState().iterator().next()).getDelegate())).discardState();
        ((StreamStateHandle)Mockito.verify((Object)((ResultSubpartitionStateHandle)operatorSubtaskState.getResultSubpartitionState().iterator().next()).getDelegate())).discardState();
    }

    @Test
    void testCleanupForGenericFailure() throws Exception {
        this.testStoringFailureHandling((Exception)new FlinkRuntimeException("Expected exception"), 1);
    }

    @Test
    void testCleanupOmissionForPossibleInconsistentStateException() throws Exception {
        this.testStoringFailureHandling((Exception)new PossibleInconsistentStateException(), 0);
    }

    private void testStoringFailureHandling(Exception failure, int expectedCleanupCalls) throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
        StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
        ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        FailingCompletedCheckpointStore completedCheckpointStore = new FailingCompletedCheckpointStore(failure);
        CheckpointStatsTracker statsTracker = new CheckpointStatsTracker(Integer.MAX_VALUE, (MetricGroup)new UnregisteredMetricsGroup(), new JobID());
        final AtomicInteger cleanupCallCount = new AtomicInteger(0);
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointIDCounter((CheckpointIDCounter)checkpointIDCounter).setCheckpointsCleaner(new CheckpointsCleaner(){
            private static final long serialVersionUID = 2029876992397573325L;

            public void cleanCheckpointOnFailedStoring(CompletedCheckpoint completedCheckpoint, Executor executor) {
                cleanupCallCount.incrementAndGet();
                super.cleanCheckpointOnFailedStoring(completedCheckpoint, executor);
            }
        }).setCompletedCheckpointStore((CompletedCheckpointStore)completedCheckpointStore).setTimer((ScheduledExecutor)manuallyTriggeredScheduledExecutor).setCheckpointStatsTracker(statsTracker).build(graph);
        checkpointCoordinator.triggerCheckpoint(false);
        manuallyTriggeredScheduledExecutor.triggerAll();
        CheckpointMetrics expectedReportedMetrics = new CheckpointMetricsBuilder().setTotalBytesPersisted(18L).setBytesPersistedOfThisCheckpoint(18L).setBytesProcessedDuringAlignment(19L).setAsyncDurationMillis(20L).setAlignmentDurationNanos(123000000L).setCheckpointStartDelayNanos(567000000L).build();
        try {
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptId, checkpointIDCounter.getLast(), expectedReportedMetrics, new TaskStateSnapshot()), "unknown location");
            Assertions.fail((String)"CheckpointException should have been thrown.");
        }
        catch (CheckpointException e) {
            Assertions.assertThat((Comparable)e.getCheckpointFailureReason()).isEqualTo((Object)CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE);
        }
        AbstractCheckpointStats actualStats = statsTracker.createSnapshot().getHistory().getCheckpointById(checkpointIDCounter.getLast());
        Assertions.assertThat((long)actualStats.getCheckpointId()).isEqualTo(checkpointIDCounter.getLast());
        Assertions.assertThat((Comparable)actualStats.getStatus()).isEqualTo((Object)CheckpointStatsStatus.FAILED);
        CheckpointCoordinatorTest.assertStatsMetrics(vertex.getJobvertexId(), 0, expectedReportedMetrics, actualStats);
        Assertions.assertThat((int)cleanupCallCount.get()).isEqualTo(expectedCleanupCalls);
    }

    private static final class FailingCompletedCheckpointStore
    extends AbstractCompleteCheckpointStore {
        private final Exception addCheckpointFailure;

        public FailingCompletedCheckpointStore(Exception addCheckpointFailure) {
            super(SharedStateRegistry.DEFAULT_FACTORY.create(Executors.directExecutor(), Collections.emptyList(), RestoreMode.DEFAULT));
            this.addCheckpointFailure = addCheckpointFailure;
        }

        public CompletedCheckpoint addCheckpointAndSubsumeOldestOne(CompletedCheckpoint checkpoint, CheckpointsCleaner checkpointsCleaner, Runnable postCleanup) throws Exception {
            throw this.addCheckpointFailure;
        }

        public CompletedCheckpoint getLatestCheckpoint() throws Exception {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public void shutdown(JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner) throws Exception {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
            return Collections.emptyList();
        }

        public int getNumberOfRetainedCheckpoints() {
            return -1;
        }

        public int getMaxNumberOfRetainedCheckpoints() {
            return 1;
        }

        public boolean requiresExternalizedCheckpoints() {
            return false;
        }
    }
}

