/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryImpl;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.util.concurrent.Executors;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.Test;

abstract class CompletedCheckpointStoreTest {
    CompletedCheckpointStoreTest() {
    }

    protected abstract CompletedCheckpointStore createRecoveredCompletedCheckpointStore(int var1, Executor var2) throws Exception;

    protected CompletedCheckpointStore createRecoveredCompletedCheckpointStore(int maxNumberOfCheckpointsToRetain) throws Exception {
        return this.createRecoveredCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, Executors.directExecutor());
    }

    @Test
    void testExceptionOnNoRetainedCheckpoints() {
        Assertions.assertThatExceptionOfType(Exception.class).isThrownBy(() -> this.createRecoveredCompletedCheckpointStore(0));
    }

    @Test
    void testAddAndGetLatestCheckpoint() throws Exception {
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CompletedCheckpointStore checkpoints = this.createRecoveredCompletedCheckpointStore(4);
        Assertions.assertThat((int)checkpoints.getNumberOfRetainedCheckpoints()).isZero();
        Assertions.assertThat((List)checkpoints.getAllCheckpoints()).isEmpty();
        TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[]{CompletedCheckpointStoreTest.createCheckpoint(0L, (SharedStateRegistry)sharedStateRegistry), CompletedCheckpointStoreTest.createCheckpoint(1L, (SharedStateRegistry)sharedStateRegistry)};
        checkpoints.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)expected[0], new CheckpointsCleaner(), () -> {});
        Assertions.assertThat((int)checkpoints.getNumberOfRetainedCheckpoints()).isOne();
        this.verifyCheckpoint(expected[0], checkpoints.getLatestCheckpoint());
        checkpoints.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)expected[1], new CheckpointsCleaner(), () -> {});
        Assertions.assertThat((int)checkpoints.getNumberOfRetainedCheckpoints()).isEqualTo(2);
        this.verifyCheckpoint(expected[1], checkpoints.getLatestCheckpoint());
    }

    @Test
    void testAddCheckpointMoreThanMaxRetained() throws Exception {
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CompletedCheckpointStore checkpoints = this.createRecoveredCompletedCheckpointStore(1);
        CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner();
        TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[]{CompletedCheckpointStoreTest.createCheckpoint(0L, (SharedStateRegistry)sharedStateRegistry), CompletedCheckpointStoreTest.createCheckpoint(1L, (SharedStateRegistry)sharedStateRegistry), CompletedCheckpointStoreTest.createCheckpoint(2L, (SharedStateRegistry)sharedStateRegistry), CompletedCheckpointStoreTest.createCheckpoint(3L, (SharedStateRegistry)sharedStateRegistry)};
        checkpoints.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)expected[0], checkpointsCleaner, () -> {});
        Assertions.assertThat((int)checkpoints.getNumberOfRetainedCheckpoints()).isOne();
        for (int i = 1; i < expected.length; ++i) {
            checkpoints.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)expected[i], checkpointsCleaner, () -> {});
            expected[i - 1].awaitDiscard();
            Assertions.assertThat((boolean)expected[i - 1].isDiscarded()).isTrue();
            Assertions.assertThat((int)checkpoints.getNumberOfRetainedCheckpoints()).isOne();
        }
    }

    @Test
    void testEmptyState() throws Exception {
        CompletedCheckpointStore checkpoints = this.createRecoveredCompletedCheckpointStore(1);
        Assertions.assertThat((Object)checkpoints.getLatestCheckpoint()).isNull();
        Assertions.assertThat((List)checkpoints.getAllCheckpoints()).isEmpty();
        Assertions.assertThat((int)checkpoints.getNumberOfRetainedCheckpoints()).isZero();
    }

    @Test
    void testGetAllCheckpoints() throws Exception {
        TestCompletedCheckpoint[] expected;
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CompletedCheckpointStore checkpoints = this.createRecoveredCompletedCheckpointStore(4);
        for (TestCompletedCheckpoint checkpoint : expected = new TestCompletedCheckpoint[]{CompletedCheckpointStoreTest.createCheckpoint(0L, (SharedStateRegistry)sharedStateRegistry), CompletedCheckpointStoreTest.createCheckpoint(1L, (SharedStateRegistry)sharedStateRegistry), CompletedCheckpointStoreTest.createCheckpoint(2L, (SharedStateRegistry)sharedStateRegistry), CompletedCheckpointStoreTest.createCheckpoint(3L, (SharedStateRegistry)sharedStateRegistry)}) {
            checkpoints.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)checkpoint, new CheckpointsCleaner(), () -> {});
        }
        List actual = checkpoints.getAllCheckpoints();
        ((ListAssert)Assertions.assertThat((List)actual).hasSameSizeAs((Object)expected)).containsExactly((Object[])expected);
    }

    @Test
    void testDiscardAllCheckpoints() throws Exception {
        TestCompletedCheckpoint[] expected;
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CompletedCheckpointStore checkpoints = this.createRecoveredCompletedCheckpointStore(4);
        for (TestCompletedCheckpoint checkpoint : expected = new TestCompletedCheckpoint[]{CompletedCheckpointStoreTest.createCheckpoint(0L, (SharedStateRegistry)sharedStateRegistry), CompletedCheckpointStoreTest.createCheckpoint(1L, (SharedStateRegistry)sharedStateRegistry), CompletedCheckpointStoreTest.createCheckpoint(2L, (SharedStateRegistry)sharedStateRegistry), CompletedCheckpointStoreTest.createCheckpoint(3L, (SharedStateRegistry)sharedStateRegistry)}) {
            checkpoints.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)checkpoint, new CheckpointsCleaner(), () -> {});
        }
        checkpoints.shutdown(JobStatus.FINISHED, new CheckpointsCleaner());
        Assertions.assertThat((Object)checkpoints.getLatestCheckpoint()).isNull();
        Assertions.assertThat((List)checkpoints.getAllCheckpoints()).isEmpty();
        Assertions.assertThat((int)checkpoints.getNumberOfRetainedCheckpoints()).isZero();
        for (TestCompletedCheckpoint checkpoint : expected) {
            checkpoint.awaitDiscard();
            Assertions.assertThat((boolean)checkpoint.isDiscarded()).isTrue();
        }
    }

    @Test
    void testAcquireLatestCompletedCheckpointId() throws Exception {
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CompletedCheckpointStore checkpoints = this.createRecoveredCompletedCheckpointStore(1);
        Assertions.assertThat((long)checkpoints.getLatestCheckpointId()).isZero();
        checkpoints.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)CompletedCheckpointStoreTest.createCheckpoint(2L, (SharedStateRegistry)sharedStateRegistry), new CheckpointsCleaner(), () -> {});
        Assertions.assertThat((long)checkpoints.getLatestCheckpointId()).isEqualTo(2L);
        checkpoints.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)CompletedCheckpointStoreTest.createCheckpoint(4L, (SharedStateRegistry)sharedStateRegistry), new CheckpointsCleaner(), () -> {});
        Assertions.assertThat((long)checkpoints.getLatestCheckpointId()).isEqualTo(4L);
    }

    public static TestCompletedCheckpoint createCheckpoint(long id, SharedStateRegistry sharedStateRegistry) {
        return CompletedCheckpointStoreTest.createCheckpoint(id, sharedStateRegistry, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
    }

    public static TestCompletedCheckpoint createCheckpoint(long id, SharedStateRegistry sharedStateRegistry, CheckpointProperties props) {
        int numberOfStates = 4;
        OperatorID operatorID = new OperatorID();
        HashMap<OperatorID, OperatorState> operatorGroupState = new HashMap<OperatorID, OperatorState>();
        OperatorState operatorState = new OperatorState(null, null, operatorID, numberOfStates, numberOfStates);
        operatorGroupState.put(operatorID, operatorState);
        for (int i = 0; i < numberOfStates; ++i) {
            TestOperatorSubtaskState subtaskState = new TestOperatorSubtaskState();
            operatorState.putState(i, (OperatorSubtaskState)subtaskState);
        }
        operatorState.registerSharedStates(sharedStateRegistry, id);
        return new TestCompletedCheckpoint(new JobID(), id, 0L, operatorGroupState, props);
    }

    protected void verifyCheckpointRegistered(Collection<OperatorState> operatorStates) {
        for (OperatorState operatorState : operatorStates) {
            for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
                Assertions.assertThat((boolean)((TestOperatorSubtaskState)subtaskState).registered).isTrue();
            }
        }
    }

    public static void verifyCheckpointDiscarded(TestCompletedCheckpoint completedCheckpoint) {
        Assertions.assertThat((boolean)completedCheckpoint.isDiscarded()).isTrue();
        CompletedCheckpointStoreTest.verifyCheckpointDiscarded(completedCheckpoint.getOperatorStates().values());
    }

    protected static void verifyCheckpointDiscarded(Collection<OperatorState> operatorStates) {
        for (OperatorState operatorState : operatorStates) {
            for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
                Assertions.assertThat((boolean)((TestOperatorSubtaskState)subtaskState).discarded).isTrue();
            }
        }
    }

    private void verifyCheckpoint(CompletedCheckpoint expected, CompletedCheckpoint actual) {
        Assertions.assertThat((Object)actual).isEqualTo((Object)expected);
    }

    public static class TestOperatorSubtaskState
    extends OperatorSubtaskState {
        private static final long serialVersionUID = 522580433699164230L;
        boolean registered = false;
        boolean discarded = false;

        public void discardState() {
            super.discardState();
            Assertions.assertThat((boolean)this.discarded).isFalse();
            this.discarded = true;
            this.registered = false;
        }

        public void registerSharedStates(SharedStateRegistry sharedStateRegistry, long checkpointID) {
            super.registerSharedStates(sharedStateRegistry, checkpointID);
            Assertions.assertThat((boolean)this.discarded).isFalse();
            this.registered = true;
        }

        public void reset() {
            this.registered = false;
            this.discarded = false;
        }

        public boolean isRegistered() {
            return this.registered;
        }

        public boolean isDiscarded() {
            return this.discarded;
        }
    }

    protected static class TestCompletedCheckpoint
    extends CompletedCheckpoint {
        private static final long serialVersionUID = 4211419809665983026L;
        private boolean isDiscarded;
        private final transient CountDownLatch discardLatch = new CountDownLatch(1);

        public TestCompletedCheckpoint(JobID jobId, long checkpointId, long timestamp, Map<OperatorID, OperatorState> operatorGroupState, CheckpointProperties props) {
            super(jobId, checkpointId, timestamp, Long.MAX_VALUE, operatorGroupState, null, props, (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation(), null);
        }

        public CompletedCheckpoint.CompletedCheckpointDiscardObject markAsDiscarded() {
            return new TestCompletedCheckpointDiscardObject();
        }

        public boolean isDiscarded() {
            return this.isDiscarded;
        }

        public void awaitDiscard() throws InterruptedException {
            if (this.discardLatch != null) {
                this.discardLatch.await();
            }
        }

        public boolean awaitDiscard(long timeout) throws InterruptedException {
            if (this.discardLatch != null) {
                return this.discardLatch.await(timeout, TimeUnit.MILLISECONDS);
            }
            return false;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || ((Object)((Object)this)).getClass() != o.getClass()) {
                return false;
            }
            TestCompletedCheckpoint that = (TestCompletedCheckpoint)((Object)o);
            return this.getJobId().equals((Object)that.getJobId()) && this.getCheckpointID() == that.getCheckpointID();
        }

        public int hashCode() {
            return this.getJobId().hashCode() + (int)this.getCheckpointID();
        }

        public class TestCompletedCheckpointDiscardObject
        extends CompletedCheckpoint.CompletedCheckpointDiscardObject {
            public TestCompletedCheckpointDiscardObject() {
                super((CompletedCheckpoint)TestCompletedCheckpoint.this);
            }

            public void discard() throws Exception {
                super.discard();
                this.updateDiscards();
            }

            public CompletableFuture<Void> discardAsync(Executor executor) {
                return super.discardAsync(executor).thenRun(this::updateDiscards);
            }

            private void updateDiscards() {
                if (!TestCompletedCheckpoint.this.isDiscarded) {
                    TestCompletedCheckpoint.this.isDiscarded = true;
                    if (TestCompletedCheckpoint.this.discardLatch != null) {
                        TestCompletedCheckpoint.this.discardLatch.countDown();
                    }
                }
            }
        }
    }
}

