/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsListener;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummarySnapshot;
import org.apache.flink.runtime.checkpoint.DefaultCheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.InitializationStatus;
import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder;
import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.traces.Span;
import org.apache.flink.traces.SpanBuilder;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class DefaultCheckpointStatsTrackerTest {
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    DefaultCheckpointStatsTrackerTest() {
    }

    @Test
    void testTrackerWithoutHistory() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 3, 256).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionJobVertex jobVertex = graph.getJobVertex(jobVertexID);
        DefaultCheckpointStatsTracker tracker = new DefaultCheckpointStatsTracker(0, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
        PendingCheckpointStats pending = tracker.reportPendingCheckpoint(0L, 1L, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.singletonMap(jobVertexID, jobVertex.getParallelism()));
        pending.reportSubtaskStats(jobVertexID, this.createSubtaskStats(0));
        pending.reportSubtaskStats(jobVertexID, this.createSubtaskStats(1));
        pending.reportSubtaskStats(jobVertexID, this.createSubtaskStats(2));
        tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null));
        CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
        Assertions.assertThat(snapshot.getHistory().getCheckpoints().iterator()).isExhausted();
        CheckpointStatsCounts counts = snapshot.getCounts();
        Assertions.assertThat((long)counts.getNumberOfCompletedCheckpoints()).isOne();
        Assertions.assertThat((long)counts.getTotalNumberOfCheckpoints()).isOne();
        CompletedCheckpointStatsSummarySnapshot summary = snapshot.getSummaryStats();
        Assertions.assertThat((long)summary.getStateSizeStats().getCount()).isOne();
        Assertions.assertThat((long)summary.getEndToEndDurationStats().getCount()).isOne();
        Assertions.assertThat((Object)snapshot.getHistory().getLatestCompletedCheckpoint()).isNotNull();
        Assertions.assertThat((long)snapshot.getHistory().getLatestCompletedCheckpoint().getCheckpointId()).isZero();
    }

    @Test
    void testCheckpointTracking() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID, 3, 256).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionJobVertex jobVertex = graph.getJobVertex(jobVertexID);
        Map<JobVertexID, Integer> vertexToDop = Collections.singletonMap(jobVertexID, jobVertex.getParallelism());
        DefaultCheckpointStatsTracker tracker = new DefaultCheckpointStatsTracker(10, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
        PendingCheckpointStats completed1 = tracker.reportPendingCheckpoint(0L, 1L, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), vertexToDop);
        completed1.reportSubtaskStats(jobVertexID, this.createSubtaskStats(0));
        completed1.reportSubtaskStats(jobVertexID, this.createSubtaskStats(1));
        completed1.reportSubtaskStats(jobVertexID, this.createSubtaskStats(2));
        tracker.reportCompletedCheckpoint(completed1.toCompletedCheckpointStats(null));
        PendingCheckpointStats failed = tracker.reportPendingCheckpoint(1L, 1L, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), vertexToDop);
        tracker.reportFailedCheckpoint(failed.toFailedCheckpoint(12L, null));
        PendingCheckpointStats savepoint = tracker.reportPendingCheckpoint(2L, 1L, CheckpointProperties.forSavepoint((boolean)true, (SavepointFormatType)SavepointFormatType.CANONICAL), vertexToDop);
        savepoint.reportSubtaskStats(jobVertexID, this.createSubtaskStats(0));
        savepoint.reportSubtaskStats(jobVertexID, this.createSubtaskStats(1));
        savepoint.reportSubtaskStats(jobVertexID, this.createSubtaskStats(2));
        tracker.reportCompletedCheckpoint(savepoint.toCompletedCheckpointStats(null));
        PendingCheckpointStats inProgress = tracker.reportPendingCheckpoint(3L, 1L, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), vertexToDop);
        RestoredCheckpointStats restored = new RestoredCheckpointStats(81L, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), 123L, null, 42L);
        tracker.reportInitializationStarted(Collections.emptySet(), 123L);
        this.reportRestoredCheckpoint((CheckpointStatsTracker)tracker, restored);
        CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
        CheckpointStatsCounts counts = snapshot.getCounts();
        Assertions.assertThat((long)counts.getTotalNumberOfCheckpoints()).isEqualTo(4L);
        Assertions.assertThat((int)counts.getNumberOfInProgressCheckpoints()).isOne();
        Assertions.assertThat((long)counts.getNumberOfCompletedCheckpoints()).isEqualTo(2L);
        Assertions.assertThat((long)counts.getNumberOfFailedCheckpoints()).isOne();
        tracker.reportFailedCheckpointsWithoutInProgress();
        CheckpointStatsSnapshot snapshot1 = tracker.createSnapshot();
        counts = snapshot1.getCounts();
        Assertions.assertThat((long)counts.getTotalNumberOfCheckpoints()).isEqualTo(5L);
        Assertions.assertThat((int)counts.getNumberOfInProgressCheckpoints()).isOne();
        Assertions.assertThat((long)counts.getNumberOfCompletedCheckpoints()).isEqualTo(2L);
        Assertions.assertThat((long)counts.getNumberOfFailedCheckpoints()).isEqualTo(2L);
        CompletedCheckpointStatsSummarySnapshot summary = snapshot.getSummaryStats();
        Assertions.assertThat((long)summary.getStateSizeStats().getCount()).isEqualTo(2L);
        Assertions.assertThat((long)summary.getEndToEndDurationStats().getCount()).isEqualTo(2L);
        CheckpointStatsHistory history = snapshot.getHistory();
        Iterator it = history.getCheckpoints().iterator();
        Assertions.assertThat(it).hasNext();
        AbstractCheckpointStats stats = (AbstractCheckpointStats)it.next();
        Assertions.assertThat((long)stats.getCheckpointId()).isEqualTo(3L);
        Assertions.assertThat((boolean)stats.getStatus().isInProgress()).isTrue();
        Assertions.assertThat(it).hasNext();
        stats = (AbstractCheckpointStats)it.next();
        Assertions.assertThat((long)stats.getCheckpointId()).isEqualTo(2L);
        Assertions.assertThat((boolean)stats.getStatus().isCompleted()).isTrue();
        Assertions.assertThat(it).hasNext();
        stats = (AbstractCheckpointStats)it.next();
        Assertions.assertThat((long)stats.getCheckpointId()).isOne();
        Assertions.assertThat((boolean)stats.getStatus().isFailed()).isTrue();
        Assertions.assertThat(it).hasNext();
        stats = (AbstractCheckpointStats)it.next();
        Assertions.assertThat((long)stats.getCheckpointId()).isZero();
        Assertions.assertThat((boolean)stats.getStatus().isCompleted()).isTrue();
        Assertions.assertThat(it).isExhausted();
        Assertions.assertThat((long)snapshot.getHistory().getLatestCompletedCheckpoint().getCheckpointId()).isEqualTo(completed1.getCheckpointId());
        Assertions.assertThat((long)snapshot.getHistory().getLatestSavepoint().getCheckpointId()).isEqualTo(savepoint.getCheckpointId());
        Assertions.assertThat((long)snapshot.getHistory().getLatestFailedCheckpoint().getCheckpointId()).isEqualTo(failed.getCheckpointId());
        Assertions.assertThat((Object)snapshot.getLatestRestoredCheckpoint()).isEqualTo((Object)restored);
    }

    @Test
    void testCheckpointStatsListenerOnCompletedCheckpoint() {
        this.testCheckpointStatsListener((checkpointStatsTracker, pendingCheckpointStats) -> checkpointStatsTracker.reportCompletedCheckpoint(pendingCheckpointStats.toCompletedCheckpointStats("random-external-pointer")), 1, 0);
    }

    @Test
    void testCheckpointStatsListenerOnFailedCheckpoint() {
        this.testCheckpointStatsListener((checkpointStatsTracker, pendingCheckpointStats) -> checkpointStatsTracker.reportFailedCheckpoint(pendingCheckpointStats.toFailedCheckpoint(System.currentTimeMillis(), null)), 0, 1);
    }

    private void testCheckpointStatsListener(BiConsumer<CheckpointStatsTracker, PendingCheckpointStats> testCodeCallback, int expectedOnCompletedCheckpointCount, int expectedOnFailedCheckpointCount) {
        final AtomicInteger onCompletedCheckpointCount = new AtomicInteger();
        final AtomicInteger onFailedCheckpointCount = new AtomicInteger();
        CheckpointStatsListener listener = new CheckpointStatsListener(){

            public void onCompletedCheckpoint() {
                onCompletedCheckpointCount.incrementAndGet();
            }

            public void onFailedCheckpoint() {
                onFailedCheckpointCount.incrementAndGet();
            }
        };
        DefaultCheckpointStatsTracker statsTracker = new DefaultCheckpointStatsTracker(10, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(), listener);
        JobVertexID jobVertexID = new JobVertexID();
        PendingCheckpointStats pending = statsTracker.reportPendingCheckpoint(0L, 1L, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.singletonMap(jobVertexID, 1));
        pending.reportSubtaskStats(jobVertexID, this.createSubtaskStats(0));
        testCodeCallback.accept((CheckpointStatsTracker)statsTracker, pending);
        Assertions.assertThat((AtomicInteger)onCompletedCheckpointCount).hasValue(expectedOnCompletedCheckpointCount);
        Assertions.assertThat((AtomicInteger)onFailedCheckpointCount).hasValue(expectedOnFailedCheckpointCount);
    }

    @Test
    void testCreateSnapshot() {
        JobVertexID jobVertexID = new JobVertexID();
        DefaultCheckpointStatsTracker tracker = new DefaultCheckpointStatsTracker(10, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
        CheckpointStatsSnapshot snapshot1 = tracker.createSnapshot();
        PendingCheckpointStats pending = tracker.reportPendingCheckpoint(0L, 1L, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.singletonMap(jobVertexID, 1));
        pending.reportSubtaskStats(jobVertexID, this.createSubtaskStats(0));
        CheckpointStatsSnapshot snapshot2 = tracker.createSnapshot();
        Assertions.assertThat((Object)snapshot2).isNotEqualTo((Object)snapshot1);
        Assertions.assertThat((Object)tracker.createSnapshot()).isEqualTo((Object)snapshot2);
        tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null));
        CheckpointStatsSnapshot snapshot3 = tracker.createSnapshot();
        Assertions.assertThat((Object)snapshot3).isNotEqualTo((Object)snapshot2);
        tracker.reportInitializationStarted(Collections.emptySet(), 0L);
        this.reportRestoredCheckpoint((CheckpointStatsTracker)tracker, new RestoredCheckpointStats(12L, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), 12L, null, 42L));
        CheckpointStatsSnapshot snapshot4 = tracker.createSnapshot();
        Assertions.assertThat((Object)snapshot4).isNotEqualTo((Object)snapshot3);
        Assertions.assertThat((Object)tracker.createSnapshot()).isEqualTo((Object)snapshot4);
    }

    @Test
    public void testSpanCreation() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        final ArrayList reportedSpans = new ArrayList();
        UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup metricGroup = new UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup(){

            public void addSpan(SpanBuilder spanBuilder) {
                reportedSpans.add(spanBuilder.build());
            }
        };
        DefaultCheckpointStatsTracker tracker = new DefaultCheckpointStatsTracker(10, (JobManagerJobMetricGroup)metricGroup);
        PendingCheckpointStats pending = tracker.reportPendingCheckpoint(42L, 1L, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.singletonMap(jobVertexID, 1));
        pending.reportSubtaskStats(jobVertexID, this.createSubtaskStats(0, false));
        tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null));
        Assertions.assertThat((int)reportedSpans.size()).isEqualTo(1);
        Span reportedSpan = (Span)Iterables.getOnlyElement(reportedSpans);
        Assertions.assertThat(reportedSpan.getAttributes().get("checkpointId")).isEqualTo((Object)42L);
        Assertions.assertThat(reportedSpan.getAttributes().get("checkpointType")).isEqualTo((Object)"Checkpoint");
        Assertions.assertThat(reportedSpan.getAttributes().get("isUnaligned")).isEqualTo((Object)"false");
        reportedSpans.clear();
        pending = tracker.reportPendingCheckpoint(43L, 1L, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.singletonMap(jobVertexID, 1));
        pending.reportSubtaskStats(jobVertexID, this.createSubtaskStats(0, true));
        tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null));
        Assertions.assertThat((int)reportedSpans.size()).isEqualTo(1);
        reportedSpan = (Span)Iterables.getOnlyElement(reportedSpans);
        Assertions.assertThat(reportedSpan.getAttributes().get("checkpointId")).isEqualTo((Object)43L);
        Assertions.assertThat(reportedSpan.getAttributes().get("checkpointType")).isEqualTo((Object)"Checkpoint");
        Assertions.assertThat(reportedSpan.getAttributes().get("isUnaligned")).isEqualTo((Object)"true");
    }

    @Test
    public void testInitializationSpanCreation() throws Exception {
        final ArrayList reportedSpans = new ArrayList();
        UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup metricGroup = new UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup(){

            public void addSpan(SpanBuilder spanBuilder) {
                reportedSpans.add(spanBuilder.build());
            }
        };
        DefaultCheckpointStatsTracker tracker = new DefaultCheckpointStatsTracker(10, (JobManagerJobMetricGroup)metricGroup);
        ExecutionAttemptID executionAttemptId3 = ExecutionAttemptID.randomId();
        ExecutionAttemptID executionAttemptId2 = ExecutionAttemptID.randomId();
        tracker.reportInitializationStarted(new HashSet<ExecutionAttemptID>(Arrays.asList(executionAttemptId3, executionAttemptId2)), 100L);
        this.reportRestoredCheckpoint((CheckpointStatsTracker)tracker, new RestoredCheckpointStats(42L, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.RETAIN_ON_FAILURE), 100L, null, 1024L));
        SubTaskInitializationMetricsBuilder subTaskInitializationMetricsBuilder3 = new SubTaskInitializationMetricsBuilder(110L).setStatus(InitializationStatus.COMPLETED);
        subTaskInitializationMetricsBuilder3.addDurationMetric("MailboxStartDurationMs", 10L);
        subTaskInitializationMetricsBuilder3.addDurationMetric("ReadOutputDataDurationMs", 20L);
        subTaskInitializationMetricsBuilder3.addDurationMetric("InitializeStateDurationMs", 30L);
        subTaskInitializationMetricsBuilder3.addDurationMetric("GateRestoreDurationMs", 40L);
        tracker.reportInitializationMetrics(executionAttemptId3, subTaskInitializationMetricsBuilder3.build(215L));
        Assertions.assertThat(reportedSpans).isEmpty();
        SubTaskInitializationMetricsBuilder subTaskInitializationMetricsBuilder2 = new SubTaskInitializationMetricsBuilder(110L).setStatus(InitializationStatus.COMPLETED);
        subTaskInitializationMetricsBuilder2.addDurationMetric("MailboxStartDurationMs", 10L);
        subTaskInitializationMetricsBuilder2.addDurationMetric("ReadOutputDataDurationMs", 20L);
        subTaskInitializationMetricsBuilder2.addDurationMetric("InitializeStateDurationMs", 30L);
        subTaskInitializationMetricsBuilder2.addDurationMetric("GateRestoreDurationMs", 40L);
        tracker.reportInitializationMetrics(executionAttemptId2, subTaskInitializationMetricsBuilder2.build(215L));
        Assertions.assertThat((int)reportedSpans.size()).isEqualTo(1);
        Span reportedSpan = (Span)Iterables.getOnlyElement(reportedSpans);
        Assertions.assertThat((long)reportedSpan.getStartTsMillis()).isEqualTo(100L);
        Assertions.assertThat((long)reportedSpan.getEndTsMillis()).isEqualTo(215L);
        Assertions.assertThat(reportedSpan.getAttributes().get("checkpointId")).isEqualTo((Object)42L);
        Assertions.assertThat(reportedSpan.getAttributes().get("fullSize")).isEqualTo((Object)1024L);
        reportedSpans.clear();
        ExecutionAttemptID executionAttemptId1 = ExecutionAttemptID.randomId();
        ExecutionAttemptID executionAttemptId = ExecutionAttemptID.randomId();
        tracker.reportInitializationStarted(new HashSet<ExecutionAttemptID>(Arrays.asList(executionAttemptId1, executionAttemptId)), 100L);
        this.reportRestoredCheckpoint((CheckpointStatsTracker)tracker, new RestoredCheckpointStats(44L, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.RETAIN_ON_FAILURE), 100L, null, 1024L));
        SubTaskInitializationMetricsBuilder subTaskInitializationMetricsBuilder1 = new SubTaskInitializationMetricsBuilder(110L).setStatus(InitializationStatus.COMPLETED);
        subTaskInitializationMetricsBuilder1.addDurationMetric("MailboxStartDurationMs", 10L);
        subTaskInitializationMetricsBuilder1.addDurationMetric("ReadOutputDataDurationMs", 20L);
        subTaskInitializationMetricsBuilder1.addDurationMetric("InitializeStateDurationMs", 30L);
        subTaskInitializationMetricsBuilder1.addDurationMetric("GateRestoreDurationMs", 40L);
        tracker.reportInitializationMetrics(executionAttemptId1, subTaskInitializationMetricsBuilder1.build(215L));
        Assertions.assertThat(reportedSpans).isEmpty();
        SubTaskInitializationMetricsBuilder subTaskInitializationMetricsBuilder = new SubTaskInitializationMetricsBuilder(110L).setStatus(InitializationStatus.COMPLETED);
        subTaskInitializationMetricsBuilder.addDurationMetric("MailboxStartDurationMs", 10L);
        subTaskInitializationMetricsBuilder.addDurationMetric("ReadOutputDataDurationMs", 20L);
        subTaskInitializationMetricsBuilder.addDurationMetric("InitializeStateDurationMs", 30L);
        subTaskInitializationMetricsBuilder.addDurationMetric("GateRestoreDurationMs", 40L);
        tracker.reportInitializationMetrics(executionAttemptId, subTaskInitializationMetricsBuilder.build(215L));
        Assertions.assertThat((int)reportedSpans.size()).isEqualTo(1);
        reportedSpan = (Span)Iterables.getOnlyElement(reportedSpans);
        Assertions.assertThat(reportedSpan.getAttributes().get("checkpointId")).isEqualTo((Object)44L);
    }

    @Test
    void testMetricsRegistration() {
        final ArrayList registeredGaugeNames = new ArrayList();
        UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup metricGroup = new UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup(){

            public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
                if (gauge != null) {
                    registeredGaugeNames.add(name);
                }
                return gauge;
            }
        };
        new DefaultCheckpointStatsTracker(0, (JobManagerJobMetricGroup)metricGroup);
        Assertions.assertThat(registeredGaugeNames).containsAll(Arrays.asList("totalNumberOfCheckpoints", "numberOfInProgressCheckpoints", "numberOfCompletedCheckpoints", "numberOfFailedCheckpoints", "lastCheckpointRestoreTimestamp", "lastCheckpointSize", "lastCheckpointFullSize", "lastCheckpointDuration", "lastCheckpointProcessedData", "lastCheckpointPersistedData", "lastCheckpointExternalPath", "lastCompletedCheckpointId", "lastCheckpointCompletedTimestamp"));
        Assertions.assertThat(registeredGaugeNames).hasSize(13);
    }

    @Test
    void testMetricsAreUpdated() throws Exception {
        final HashMap registeredGauges = new HashMap();
        UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup metricGroup = new UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup(){

            public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
                registeredGauges.put(name, gauge);
                return gauge;
            }
        };
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        DefaultCheckpointStatsTracker stats = new DefaultCheckpointStatsTracker(0, (JobManagerJobMetricGroup)metricGroup);
        Assertions.assertThat(registeredGauges).hasSize(13);
        Gauge numCheckpoints = (Gauge)registeredGauges.get("totalNumberOfCheckpoints");
        Gauge numInProgressCheckpoints = (Gauge)registeredGauges.get("numberOfInProgressCheckpoints");
        Gauge numCompletedCheckpoints = (Gauge)registeredGauges.get("numberOfCompletedCheckpoints");
        Gauge numFailedCheckpoints = (Gauge)registeredGauges.get("numberOfFailedCheckpoints");
        Gauge latestRestoreTimestamp = (Gauge)registeredGauges.get("lastCheckpointRestoreTimestamp");
        Gauge latestCompletedSize = (Gauge)registeredGauges.get("lastCheckpointSize");
        Gauge latestCompletedFullSize = (Gauge)registeredGauges.get("lastCheckpointFullSize");
        Gauge latestCompletedDuration = (Gauge)registeredGauges.get("lastCheckpointDuration");
        Gauge latestProcessedData = (Gauge)registeredGauges.get("lastCheckpointProcessedData");
        Gauge latestPersistedData = (Gauge)registeredGauges.get("lastCheckpointPersistedData");
        Gauge latestCompletedExternalPath = (Gauge)registeredGauges.get("lastCheckpointExternalPath");
        Gauge latestCompletedId = (Gauge)registeredGauges.get("lastCompletedCheckpointId");
        Gauge latestCompletedTimestamp = (Gauge)registeredGauges.get("lastCheckpointCompletedTimestamp");
        Assertions.assertThat((Long)((Long)numCheckpoints.getValue())).isZero();
        Assertions.assertThat((Integer)((Integer)numInProgressCheckpoints.getValue())).isZero();
        Assertions.assertThat((Long)((Long)numCompletedCheckpoints.getValue())).isZero();
        Assertions.assertThat((Long)((Long)numFailedCheckpoints.getValue())).isZero();
        Assertions.assertThat((Long)((Long)latestRestoreTimestamp.getValue())).isEqualTo(-1L);
        Assertions.assertThat((Long)((Long)latestCompletedSize.getValue())).isEqualTo(-1L);
        Assertions.assertThat((Long)((Long)latestCompletedFullSize.getValue())).isEqualTo(-1L);
        Assertions.assertThat((Long)((Long)latestCompletedDuration.getValue())).isEqualTo(-1L);
        Assertions.assertThat((Long)((Long)latestProcessedData.getValue())).isEqualTo(-1L);
        Assertions.assertThat((Long)((Long)latestPersistedData.getValue())).isEqualTo(-1L);
        Assertions.assertThat((String)((String)latestCompletedExternalPath.getValue())).isEqualTo("n/a");
        Assertions.assertThat((Long)((Long)latestCompletedId.getValue())).isEqualTo(-1L);
        Assertions.assertThat((Long)((Long)latestCompletedTimestamp.getValue())).isEqualTo(-1L);
        PendingCheckpointStats pending = stats.reportPendingCheckpoint(0L, 0L, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.singletonMap(jobVertexID, 1));
        Assertions.assertThat((Long)((Long)numCheckpoints.getValue())).isOne();
        Assertions.assertThat((Integer)((Integer)numInProgressCheckpoints.getValue())).isOne();
        Assertions.assertThat((Long)((Long)numCompletedCheckpoints.getValue())).isZero();
        Assertions.assertThat((Long)((Long)numFailedCheckpoints.getValue())).isZero();
        long ackTimestamp = 11231230L;
        long checkpointedSize = 123L;
        long fullCheckpointSize = 12381238L;
        long processedData = 4242L;
        long persistedData = 4444L;
        long ignored = 0L;
        String externalPath = "myexternalpath";
        SubtaskStateStats subtaskStats = new SubtaskStateStats(0, ackTimestamp, checkpointedSize, fullCheckpointSize, ignored, ignored, processedData, persistedData, ignored, ignored, false, true);
        Assertions.assertThat((boolean)pending.reportSubtaskStats(jobVertexID, subtaskStats)).isTrue();
        stats.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(externalPath));
        Assertions.assertThat((Long)((Long)numCheckpoints.getValue())).isOne();
        Assertions.assertThat((Integer)((Integer)numInProgressCheckpoints.getValue())).isZero();
        Assertions.assertThat((Long)((Long)numCompletedCheckpoints.getValue())).isOne();
        Assertions.assertThat((Long)((Long)numFailedCheckpoints.getValue())).isZero();
        Assertions.assertThat((Long)((Long)latestRestoreTimestamp.getValue())).isEqualTo(-1L);
        Assertions.assertThat((Long)((Long)latestCompletedSize.getValue())).isEqualTo(checkpointedSize);
        Assertions.assertThat((Long)((Long)latestCompletedFullSize.getValue())).isEqualTo(fullCheckpointSize);
        Assertions.assertThat((Long)((Long)latestProcessedData.getValue())).isEqualTo(processedData);
        Assertions.assertThat((Long)((Long)latestPersistedData.getValue())).isEqualTo(persistedData);
        Assertions.assertThat((Long)((Long)latestCompletedDuration.getValue())).isEqualTo(ackTimestamp);
        Assertions.assertThat((String)((String)latestCompletedExternalPath.getValue())).isEqualTo(externalPath);
        Assertions.assertThat((Long)((Long)latestCompletedId.getValue())).isZero();
        Assertions.assertThat((Long)((Long)latestCompletedTimestamp.getValue())).isEqualTo(ackTimestamp);
        PendingCheckpointStats nextPending = stats.reportPendingCheckpoint(1L, 11L, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.singletonMap(jobVertexID, 1));
        long failureTimestamp = 1230123L;
        stats.reportFailedCheckpoint(nextPending.toFailedCheckpoint(failureTimestamp, null));
        Assertions.assertThat((Long)((Long)numCheckpoints.getValue())).isEqualTo(2L);
        Assertions.assertThat((Integer)((Integer)numInProgressCheckpoints.getValue())).isZero();
        Assertions.assertThat((Long)((Long)numCompletedCheckpoints.getValue())).isOne();
        Assertions.assertThat((Long)((Long)numFailedCheckpoints.getValue())).isOne();
        Assertions.assertThat((Long)((Long)latestCompletedId.getValue())).isZero();
        long restoreTimestamp = 183419283L;
        RestoredCheckpointStats restored = new RestoredCheckpointStats(1L, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), restoreTimestamp, null, 42L);
        stats.reportInitializationStarted(Collections.emptySet(), restoreTimestamp);
        this.reportRestoredCheckpoint((CheckpointStatsTracker)stats, restored);
        Assertions.assertThat((Long)((Long)numCheckpoints.getValue())).isEqualTo(2L);
        Assertions.assertThat((Integer)((Integer)numInProgressCheckpoints.getValue())).isZero();
        Assertions.assertThat((Long)((Long)numCompletedCheckpoints.getValue())).isOne();
        Assertions.assertThat((Long)((Long)numFailedCheckpoints.getValue())).isOne();
        Assertions.assertThat((Long)((Long)latestCompletedId.getValue())).isZero();
        Assertions.assertThat((Long)((Long)latestRestoreTimestamp.getValue())).isEqualTo(restoreTimestamp);
        PendingCheckpointStats thirdPending = stats.reportPendingCheckpoint(2L, 5000L, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), Collections.singletonMap(jobVertexID, 1));
        thirdPending.reportSubtaskStats(jobVertexID, subtaskStats);
        stats.reportCompletedCheckpoint(thirdPending.toCompletedCheckpointStats(null));
        Assertions.assertThat((Long)((Long)latestCompletedId.getValue())).isEqualTo(2L);
        Assertions.assertThat((String)((String)latestCompletedExternalPath.getValue())).isEqualTo("n/a");
    }

    private SubtaskStateStats createSubtaskStats(int index) {
        return this.createSubtaskStats(index, false);
    }

    private SubtaskStateStats createSubtaskStats(int index, boolean unaligned) {
        return new SubtaskStateStats(index, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, unaligned, true);
    }

    private void reportRestoredCheckpoint(CheckpointStatsTracker tracker, RestoredCheckpointStats restored) {
        tracker.reportRestoredCheckpoint(restored.getCheckpointId(), restored.getProperties(), restored.getExternalPath(), restored.getStateSize());
    }
}

