package org.apache.flink.runtime.checkpoint;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.class */
public class CheckpointStatsTrackerTest {

    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();

    @Test
    public void testTrackerWithoutHistory() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionJobVertex jobVertex = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 3, 256).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).getJobVertex(jobVertexID);
        CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(0, new UnregisteredMetricsGroup());
        PendingCheckpointStats reportPendingCheckpoint = checkpointStatsTracker.reportPendingCheckpoint(0L, 1L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.singletonMap(jobVertexID, Integer.valueOf(jobVertex.getParallelism())));
        reportPendingCheckpoint.reportSubtaskStats(jobVertexID, createSubtaskStats(0));
        reportPendingCheckpoint.reportSubtaskStats(jobVertexID, createSubtaskStats(1));
        reportPendingCheckpoint.reportSubtaskStats(jobVertexID, createSubtaskStats(2));
        checkpointStatsTracker.reportCompletedCheckpoint(reportPendingCheckpoint.toCompletedCheckpointStats((String) null));
        CheckpointStatsSnapshot createSnapshot = checkpointStatsTracker.createSnapshot();
        Assert.assertFalse(createSnapshot.getHistory().getCheckpoints().iterator().hasNext());
        CheckpointStatsCounts counts = createSnapshot.getCounts();
        Assert.assertEquals(1L, counts.getNumberOfCompletedCheckpoints());
        Assert.assertEquals(1L, counts.getTotalNumberOfCheckpoints());
        CompletedCheckpointStatsSummarySnapshot summaryStats = createSnapshot.getSummaryStats();
        Assert.assertEquals(1L, summaryStats.getStateSizeStats().getCount());
        Assert.assertEquals(1L, summaryStats.getEndToEndDurationStats().getCount());
        Assert.assertNotNull(createSnapshot.getHistory().getLatestCompletedCheckpoint());
        Assert.assertEquals(0L, createSnapshot.getHistory().getLatestCompletedCheckpoint().getCheckpointId());
    }

    @Test
    public void testCheckpointTracking() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        Map singletonMap = Collections.singletonMap(jobVertexID, Integer.valueOf(new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 3, 256).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).getJobVertex(jobVertexID).getParallelism()));
        CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(10, new UnregisteredMetricsGroup());
        PendingCheckpointStats reportPendingCheckpoint = checkpointStatsTracker.reportPendingCheckpoint(0L, 1L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), singletonMap);
        reportPendingCheckpoint.reportSubtaskStats(jobVertexID, createSubtaskStats(0));
        reportPendingCheckpoint.reportSubtaskStats(jobVertexID, createSubtaskStats(1));
        reportPendingCheckpoint.reportSubtaskStats(jobVertexID, createSubtaskStats(2));
        checkpointStatsTracker.reportCompletedCheckpoint(reportPendingCheckpoint.toCompletedCheckpointStats((String) null));
        PendingCheckpointStats reportPendingCheckpoint2 = checkpointStatsTracker.reportPendingCheckpoint(1L, 1L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), singletonMap);
        checkpointStatsTracker.reportFailedCheckpoint(reportPendingCheckpoint2.toFailedCheckpoint(12L, (Throwable) null));
        PendingCheckpointStats reportPendingCheckpoint3 = checkpointStatsTracker.reportPendingCheckpoint(2L, 1L, CheckpointProperties.forSavepoint(true, SavepointFormatType.CANONICAL), singletonMap);
        reportPendingCheckpoint3.reportSubtaskStats(jobVertexID, createSubtaskStats(0));
        reportPendingCheckpoint3.reportSubtaskStats(jobVertexID, createSubtaskStats(1));
        reportPendingCheckpoint3.reportSubtaskStats(jobVertexID, createSubtaskStats(2));
        checkpointStatsTracker.reportCompletedCheckpoint(reportPendingCheckpoint3.toCompletedCheckpointStats((String) null));
        checkpointStatsTracker.reportPendingCheckpoint(3L, 1L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), singletonMap);
        RestoredCheckpointStats restoredCheckpointStats = new RestoredCheckpointStats(81L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), 123L, (String) null);
        checkpointStatsTracker.reportRestoredCheckpoint(restoredCheckpointStats);
        CheckpointStatsSnapshot createSnapshot = checkpointStatsTracker.createSnapshot();
        CheckpointStatsCounts counts = createSnapshot.getCounts();
        Assert.assertEquals(4L, counts.getTotalNumberOfCheckpoints());
        Assert.assertEquals(1L, counts.getNumberOfInProgressCheckpoints());
        Assert.assertEquals(2L, counts.getNumberOfCompletedCheckpoints());
        Assert.assertEquals(1L, counts.getNumberOfFailedCheckpoints());
        checkpointStatsTracker.reportFailedCheckpointsWithoutInProgress();
        CheckpointStatsCounts counts2 = checkpointStatsTracker.createSnapshot().getCounts();
        Assert.assertEquals(5L, counts2.getTotalNumberOfCheckpoints());
        Assert.assertEquals(1L, counts2.getNumberOfInProgressCheckpoints());
        Assert.assertEquals(2L, counts2.getNumberOfCompletedCheckpoints());
        Assert.assertEquals(2L, counts2.getNumberOfFailedCheckpoints());
        CompletedCheckpointStatsSummarySnapshot summaryStats = createSnapshot.getSummaryStats();
        Assert.assertEquals(2L, summaryStats.getStateSizeStats().getCount());
        Assert.assertEquals(2L, summaryStats.getEndToEndDurationStats().getCount());
        Iterator it = createSnapshot.getHistory().getCheckpoints().iterator();
        Assert.assertTrue(it.hasNext());
        AbstractCheckpointStats abstractCheckpointStats = (AbstractCheckpointStats) it.next();
        Assert.assertEquals(3L, abstractCheckpointStats.getCheckpointId());
        Assert.assertTrue(abstractCheckpointStats.getStatus().isInProgress());
        Assert.assertTrue(it.hasNext());
        AbstractCheckpointStats abstractCheckpointStats2 = (AbstractCheckpointStats) it.next();
        Assert.assertEquals(2L, abstractCheckpointStats2.getCheckpointId());
        Assert.assertTrue(abstractCheckpointStats2.getStatus().isCompleted());
        Assert.assertTrue(it.hasNext());
        AbstractCheckpointStats abstractCheckpointStats3 = (AbstractCheckpointStats) it.next();
        Assert.assertEquals(1L, abstractCheckpointStats3.getCheckpointId());
        Assert.assertTrue(abstractCheckpointStats3.getStatus().isFailed());
        Assert.assertTrue(it.hasNext());
        AbstractCheckpointStats abstractCheckpointStats4 = (AbstractCheckpointStats) it.next();
        Assert.assertEquals(0L, abstractCheckpointStats4.getCheckpointId());
        Assert.assertTrue(abstractCheckpointStats4.getStatus().isCompleted());
        Assert.assertFalse(it.hasNext());
        Assert.assertEquals(reportPendingCheckpoint.getCheckpointId(), createSnapshot.getHistory().getLatestCompletedCheckpoint().getCheckpointId());
        Assert.assertEquals(reportPendingCheckpoint3.getCheckpointId(), createSnapshot.getHistory().getLatestSavepoint().getCheckpointId());
        Assert.assertEquals(reportPendingCheckpoint2.getCheckpointId(), createSnapshot.getHistory().getLatestFailedCheckpoint().getCheckpointId());
        Assert.assertEquals(restoredCheckpointStats, createSnapshot.getLatestRestoredCheckpoint());
    }

    @Test
    public void testCreateSnapshot() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(10, new UnregisteredMetricsGroup());
        CheckpointStatsSnapshot createSnapshot = checkpointStatsTracker.createSnapshot();
        PendingCheckpointStats reportPendingCheckpoint = checkpointStatsTracker.reportPendingCheckpoint(0L, 1L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.singletonMap(jobVertexID, 1));
        reportPendingCheckpoint.reportSubtaskStats(jobVertexID, createSubtaskStats(0));
        CheckpointStatsSnapshot createSnapshot2 = checkpointStatsTracker.createSnapshot();
        Assert.assertNotEquals(createSnapshot, createSnapshot2);
        Assert.assertEquals(createSnapshot2, checkpointStatsTracker.createSnapshot());
        checkpointStatsTracker.reportCompletedCheckpoint(reportPendingCheckpoint.toCompletedCheckpointStats((String) null));
        CheckpointStatsSnapshot createSnapshot3 = checkpointStatsTracker.createSnapshot();
        Assert.assertNotEquals(createSnapshot2, createSnapshot3);
        checkpointStatsTracker.reportRestoredCheckpoint(new RestoredCheckpointStats(12L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), 12L, (String) null));
        CheckpointStatsSnapshot createSnapshot4 = checkpointStatsTracker.createSnapshot();
        Assert.assertNotEquals(createSnapshot3, createSnapshot4);
        Assert.assertEquals(createSnapshot4, checkpointStatsTracker.createSnapshot());
    }

    @Test
    public void testMetricsRegistration() throws Exception {
        final ArrayList arrayList = new ArrayList();
        new CheckpointStatsTracker(0, new UnregisteredMetricsGroup() { // from class: org.apache.flink.runtime.checkpoint.CheckpointStatsTrackerTest.1
            public <T, G extends Gauge<T>> G gauge(String str, G g) {
                if (g != null) {
                    arrayList.add(str);
                }
                return g;
            }
        });
        Assert.assertTrue(arrayList.containsAll(Arrays.asList("totalNumberOfCheckpoints", "numberOfInProgressCheckpoints", "numberOfCompletedCheckpoints", "numberOfFailedCheckpoints", "lastCheckpointRestoreTimestamp", "lastCheckpointSize", "lastCheckpointFullSize", "lastCheckpointDuration", "lastCheckpointProcessedData", "lastCheckpointPersistedData", "lastCheckpointExternalPath", "lastCompletedCheckpointId")));
        Assert.assertEquals(12L, arrayList.size());
    }

    @Test
    public void testMetricsAreUpdated() throws Exception {
        final HashMap hashMap = new HashMap();
        UnregisteredMetricsGroup unregisteredMetricsGroup = new UnregisteredMetricsGroup() { // from class: org.apache.flink.runtime.checkpoint.CheckpointStatsTrackerTest.2
            public <T, G extends Gauge<T>> G gauge(String str, G g) {
                hashMap.put(str, g);
                return g;
            }
        };
        JobVertexID jobVertexID = new JobVertexID();
        new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).getJobVertex(jobVertexID);
        CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(0, unregisteredMetricsGroup);
        Assert.assertEquals(12L, hashMap.size());
        Gauge gauge = (Gauge) hashMap.get("totalNumberOfCheckpoints");
        Gauge gauge2 = (Gauge) hashMap.get("numberOfInProgressCheckpoints");
        Gauge gauge3 = (Gauge) hashMap.get("numberOfCompletedCheckpoints");
        Gauge gauge4 = (Gauge) hashMap.get("numberOfFailedCheckpoints");
        Gauge gauge5 = (Gauge) hashMap.get("lastCheckpointRestoreTimestamp");
        Gauge gauge6 = (Gauge) hashMap.get("lastCheckpointSize");
        Gauge gauge7 = (Gauge) hashMap.get("lastCheckpointFullSize");
        Gauge gauge8 = (Gauge) hashMap.get("lastCheckpointDuration");
        Gauge gauge9 = (Gauge) hashMap.get("lastCheckpointProcessedData");
        Gauge gauge10 = (Gauge) hashMap.get("lastCheckpointPersistedData");
        Gauge gauge11 = (Gauge) hashMap.get("lastCheckpointExternalPath");
        Gauge gauge12 = (Gauge) hashMap.get("lastCompletedCheckpointId");
        Assert.assertEquals(0L, gauge.getValue());
        Assert.assertEquals(0, gauge2.getValue());
        Assert.assertEquals(0L, gauge3.getValue());
        Assert.assertEquals(0L, gauge4.getValue());
        Assert.assertEquals(-1L, gauge5.getValue());
        Assert.assertEquals(-1L, gauge6.getValue());
        Assert.assertEquals(-1L, gauge7.getValue());
        Assert.assertEquals(-1L, gauge8.getValue());
        Assert.assertEquals(-1L, gauge9.getValue());
        Assert.assertEquals(-1L, gauge10.getValue());
        Assert.assertEquals("n/a", gauge11.getValue());
        Assert.assertEquals(-1L, gauge12.getValue());
        PendingCheckpointStats reportPendingCheckpoint = checkpointStatsTracker.reportPendingCheckpoint(0L, 0L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.singletonMap(jobVertexID, 1));
        Assert.assertEquals(1L, gauge.getValue());
        Assert.assertEquals(1, gauge2.getValue());
        Assert.assertEquals(0L, gauge3.getValue());
        Assert.assertEquals(0L, gauge4.getValue());
        SubtaskStateStats subtaskStateStats = new SubtaskStateStats(0, 11231230L, 123L, 12381238L, 0L, 0L, 4242L, 4444L, 0L, 0L, false, true);
        Assert.assertTrue(reportPendingCheckpoint.reportSubtaskStats(jobVertexID, subtaskStateStats));
        checkpointStatsTracker.reportCompletedCheckpoint(reportPendingCheckpoint.toCompletedCheckpointStats("myexternalpath"));
        Assert.assertEquals(1L, gauge.getValue());
        Assert.assertEquals(0, gauge2.getValue());
        Assert.assertEquals(1L, gauge3.getValue());
        Assert.assertEquals(0L, gauge4.getValue());
        Assert.assertEquals(-1L, gauge5.getValue());
        Assert.assertEquals(123L, gauge6.getValue());
        Assert.assertEquals(12381238L, gauge7.getValue());
        Assert.assertEquals(4242L, gauge9.getValue());
        Assert.assertEquals(4444L, gauge10.getValue());
        Assert.assertEquals(11231230L, gauge8.getValue());
        Assert.assertEquals("myexternalpath", gauge11.getValue());
        Assert.assertEquals(0L, gauge12.getValue());
        checkpointStatsTracker.reportFailedCheckpoint(checkpointStatsTracker.reportPendingCheckpoint(1L, 11L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.singletonMap(jobVertexID, 1)).toFailedCheckpoint(1230123L, (Throwable) null));
        Assert.assertEquals(2L, gauge.getValue());
        Assert.assertEquals(0, gauge2.getValue());
        Assert.assertEquals(1L, gauge3.getValue());
        Assert.assertEquals(1L, gauge4.getValue());
        Assert.assertEquals(0L, gauge12.getValue());
        checkpointStatsTracker.reportRestoredCheckpoint(new RestoredCheckpointStats(1L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), 183419283L, (String) null));
        Assert.assertEquals(2L, gauge.getValue());
        Assert.assertEquals(0, gauge2.getValue());
        Assert.assertEquals(1L, gauge3.getValue());
        Assert.assertEquals(1L, gauge4.getValue());
        Assert.assertEquals(0L, gauge12.getValue());
        Assert.assertEquals(183419283L, gauge5.getValue());
        PendingCheckpointStats reportPendingCheckpoint2 = checkpointStatsTracker.reportPendingCheckpoint(2L, 5000L, CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.singletonMap(jobVertexID, 1));
        reportPendingCheckpoint2.reportSubtaskStats(jobVertexID, subtaskStateStats);
        checkpointStatsTracker.reportCompletedCheckpoint(reportPendingCheckpoint2.toCompletedCheckpointStats((String) null));
        Assert.assertEquals(2L, gauge12.getValue());
        Assert.assertEquals("n/a", gauge11.getValue());
    }

    private SubtaskStateStats createSubtaskStats(int i) {
        return new SubtaskStateStats(i, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, false, true);
    }
}
