/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.testutils.CustomExtension;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRequestDecider;
import org.apache.flink.runtime.checkpoint.CheckpointRequestDeciderTest;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStoreUtil;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest;
import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils;
import org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper;
import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointStoreUtil;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryImpl;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ZooKeeperCompletedCheckpointStoreITCase
extends CompletedCheckpointStoreTest {
    private final ZooKeeperExtension zooKeeperExtension = new ZooKeeperExtension();
    @RegisterExtension
    final EachCallbackWrapper<ZooKeeperExtension> zooKeeperResource = new EachCallbackWrapper((CustomExtension)this.zooKeeperExtension);
    @RegisterExtension
    final TestingFatalErrorHandlerExtension testingFatalErrorHandlerResource = new TestingFatalErrorHandlerExtension();
    private static final String CHECKPOINT_PATH = "/checkpoints";
    private static final ZooKeeperCheckpointStoreUtil checkpointStoreUtil = ZooKeeperCheckpointStoreUtil.INSTANCE;

    ZooKeeperCompletedCheckpointStoreITCase() {
    }

    private CuratorFramework getZooKeeperClient() {
        return this.zooKeeperExtension.getZooKeeperClient(this.testingFatalErrorHandlerResource.getTestingFatalErrorHandler());
    }

    @Override
    protected CompletedCheckpointStore createRecoveredCompletedCheckpointStore(int maxNumberOfCheckpointsToRetain, Executor executor) throws Exception {
        ZooKeeperStateHandleStore checkpointsInZooKeeper = ZooKeeperUtils.createZooKeeperStateHandleStore((CuratorFramework)this.getZooKeeperClient(), (String)CHECKPOINT_PATH, new TestingRetrievableStateStorageHelper());
        return new DefaultCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, (StateHandleStore)checkpointsInZooKeeper, (CheckpointStoreUtil)checkpointStoreUtil, DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints((StateHandleStore)checkpointsInZooKeeper, (CheckpointStoreUtil)checkpointStoreUtil), SharedStateRegistry.DEFAULT_FACTORY.create(Executors.directExecutor(), Collections.emptyList(), RecoveryClaimMode.DEFAULT), executor);
    }

    @Test
    void testRecover() throws Exception {
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CompletedCheckpointStore checkpoints = this.createRecoveredCompletedCheckpointStore(3);
        CompletedCheckpointStoreTest.TestCompletedCheckpoint[] expected = new CompletedCheckpointStoreTest.TestCompletedCheckpoint[]{ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(0L, (SharedStateRegistry)sharedStateRegistry), ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(1L, (SharedStateRegistry)sharedStateRegistry), ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(2L, (SharedStateRegistry)sharedStateRegistry)};
        checkpoints.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)expected[0], new CheckpointsCleaner(), () -> {});
        checkpoints.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)expected[1], new CheckpointsCleaner(), () -> {});
        checkpoints.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)expected[2], new CheckpointsCleaner(), () -> {});
        this.verifyCheckpointRegistered(expected[0].getOperatorStates().values());
        this.verifyCheckpointRegistered(expected[1].getOperatorStates().values());
        this.verifyCheckpointRegistered(expected[2].getOperatorStates().values());
        Assertions.assertThat((List)((List)this.getZooKeeperClient().getChildren().forPath(CHECKPOINT_PATH))).hasSize(3);
        Assertions.assertThat((int)checkpoints.getNumberOfRetainedCheckpoints()).isEqualTo(3);
        sharedStateRegistry.close();
        sharedStateRegistry = new SharedStateRegistryImpl();
        Assertions.assertThat((List)((List)this.getZooKeeperClient().getChildren().forPath(CHECKPOINT_PATH))).hasSize(3);
        Assertions.assertThat((int)checkpoints.getNumberOfRetainedCheckpoints()).isEqualTo(3);
        Assertions.assertThat((Object)checkpoints.getLatestCheckpoint()).isEqualTo((Object)expected[2]);
        ArrayList<CompletedCheckpointStoreTest.TestCompletedCheckpoint> expectedCheckpoints = new ArrayList<CompletedCheckpointStoreTest.TestCompletedCheckpoint>(3);
        expectedCheckpoints.add(expected[1]);
        expectedCheckpoints.add(expected[2]);
        expectedCheckpoints.add(ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(3L, (SharedStateRegistry)sharedStateRegistry));
        checkpoints.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)expectedCheckpoints.get(2), new CheckpointsCleaner(), () -> {});
        List actualCheckpoints = checkpoints.getAllCheckpoints();
        Assertions.assertThat((List)actualCheckpoints).isEqualTo(expectedCheckpoints);
        for (CompletedCheckpoint actualCheckpoint : actualCheckpoints) {
            this.verifyCheckpointRegistered(actualCheckpoint.getOperatorStates().values());
        }
    }

    @Test
    void testShutdownDiscardsCheckpoints() throws Exception {
        CuratorFramework client = this.getZooKeeperClient();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CompletedCheckpointStore store = this.createRecoveredCompletedCheckpointStore(1);
        CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint = ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(0L, (SharedStateRegistry)sharedStateRegistry);
        store.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)checkpoint, new CheckpointsCleaner(), () -> {});
        Assertions.assertThat((int)store.getNumberOfRetainedCheckpoints()).isOne();
        Assertions.assertThat((Object)((Stat)client.checkExists().forPath(CHECKPOINT_PATH + checkpointStoreUtil.checkpointIDToName(checkpoint.getCheckpointID())))).isNotNull();
        store.shutdown(JobStatus.FINISHED, new CheckpointsCleaner());
        Assertions.assertThat((int)store.getNumberOfRetainedCheckpoints()).isZero();
        Assertions.assertThat((Object)((Stat)client.checkExists().forPath(CHECKPOINT_PATH + checkpointStoreUtil.checkpointIDToName(checkpoint.getCheckpointID())))).isNull();
        sharedStateRegistry.close();
        Assertions.assertThat((int)this.createRecoveredCompletedCheckpointStore(1).getNumberOfRetainedCheckpoints()).isZero();
    }

    @Test
    void testSuspendKeepsCheckpoints() throws Exception {
        CuratorFramework client = this.getZooKeeperClient();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CompletedCheckpointStore store = this.createRecoveredCompletedCheckpointStore(1);
        CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint = ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(0L, (SharedStateRegistry)sharedStateRegistry);
        store.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)checkpoint, new CheckpointsCleaner(), () -> {});
        Assertions.assertThat((int)store.getNumberOfRetainedCheckpoints()).isOne();
        Assertions.assertThat((Object)((Stat)client.checkExists().forPath(CHECKPOINT_PATH + checkpointStoreUtil.checkpointIDToName(checkpoint.getCheckpointID())))).isNotNull();
        store.shutdown(JobStatus.SUSPENDED, new CheckpointsCleaner());
        Assertions.assertThat((int)store.getNumberOfRetainedCheckpoints()).isZero();
        String checkpointPath = CHECKPOINT_PATH + checkpointStoreUtil.checkpointIDToName(checkpoint.getCheckpointID());
        List checkpointPathChildren = (List)client.getChildren().forPath(checkpointPath);
        ((ListAssert)Assertions.assertThat((List)checkpointPathChildren).as("The checkpoint node should not be marked for deletion.", new Object[0])).hasSize(1);
        String locksNodeName = (String)Iterables.getOnlyElement((Iterable)checkpointPathChildren);
        String locksNodePath = ZooKeeperUtils.generateZookeeperPath((String[])new String[]{checkpointPath, locksNodeName});
        Stat locksStat = (Stat)client.checkExists().forPath(locksNodePath);
        ((AbstractIntegerAssert)Assertions.assertThat((int)locksStat.getNumChildren()).as("There shouldn't be any lock node available for the checkpoint", new Object[0])).isZero();
        sharedStateRegistry.close();
        store = this.createRecoveredCompletedCheckpointStore(1);
        CompletedCheckpoint recovered = store.getLatestCheckpoint();
        Assertions.assertThat((Object)recovered).isEqualTo((Object)checkpoint);
    }

    @Test
    void testLatestCheckpointRecovery() throws Exception {
        int numCheckpoints = 3;
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CompletedCheckpointStore checkpointStore = this.createRecoveredCompletedCheckpointStore(3);
        ArrayList<CompletedCheckpointStoreTest.TestCompletedCheckpoint> checkpoints = new ArrayList<CompletedCheckpointStoreTest.TestCompletedCheckpoint>(3);
        checkpoints.add(ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(9L, (SharedStateRegistry)sharedStateRegistry));
        checkpoints.add(ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(10L, (SharedStateRegistry)sharedStateRegistry));
        checkpoints.add(ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(11L, (SharedStateRegistry)sharedStateRegistry));
        for (CompletedCheckpoint completedCheckpoint : checkpoints) {
            checkpointStore.addCheckpointAndSubsumeOldestOne(completedCheckpoint, new CheckpointsCleaner(), () -> {});
        }
        sharedStateRegistry.close();
        CompletedCheckpoint latestCheckpoint = this.createRecoveredCompletedCheckpointStore(3).getLatestCheckpoint();
        Assertions.assertThat((Object)latestCheckpoint).isEqualTo(checkpoints.get(checkpoints.size() - 1));
    }

    @Test
    void testConcurrentCheckpointOperations() throws Exception {
        boolean numberOfCheckpoints = true;
        long waitingTimeout = 50L;
        CompletedCheckpointStore zkCheckpointStore1 = this.createRecoveredCompletedCheckpointStore(1);
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CompletedCheckpointStoreTest.TestCompletedCheckpoint completedCheckpoint = ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(1L, (SharedStateRegistry)sharedStateRegistry);
        zkCheckpointStore1.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)completedCheckpoint, new CheckpointsCleaner(), () -> {});
        sharedStateRegistry.close();
        sharedStateRegistry = new SharedStateRegistryImpl();
        CompletedCheckpointStore zkCheckpointStore2 = this.createRecoveredCompletedCheckpointStore(1);
        CompletedCheckpoint recoveredCheckpoint = zkCheckpointStore2.getLatestCheckpoint();
        Assertions.assertThat((Object)recoveredCheckpoint).isInstanceOf(CompletedCheckpointStoreTest.TestCompletedCheckpoint.class);
        CompletedCheckpointStoreTest.TestCompletedCheckpoint recoveredTestCheckpoint = (CompletedCheckpointStoreTest.TestCompletedCheckpoint)recoveredCheckpoint;
        Assertions.assertThat((boolean)recoveredTestCheckpoint.isDiscarded()).isFalse();
        CompletedCheckpointStoreTest.TestCompletedCheckpoint completedCheckpoint2 = ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(2L, (SharedStateRegistry)sharedStateRegistry);
        zkCheckpointStore1.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)completedCheckpoint2, new CheckpointsCleaner(), () -> {});
        List allCheckpoints = zkCheckpointStore1.getAllCheckpoints();
        Assertions.assertThat((List)allCheckpoints).isEqualTo(Collections.singletonList(completedCheckpoint2));
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)recoveredTestCheckpoint.awaitDiscard(50L)).as("The checkpoint should not have been discarded.", new Object[0])).isFalse();
        Assertions.assertThat((boolean)recoveredTestCheckpoint.isDiscarded()).isFalse();
        CompletedCheckpointStoreTest.TestCompletedCheckpoint completedCheckpoint3 = ZooKeeperCompletedCheckpointStoreITCase.createCheckpoint(3L, (SharedStateRegistry)sharedStateRegistry);
        zkCheckpointStore2.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)completedCheckpoint3, new CheckpointsCleaner(), () -> {});
        recoveredTestCheckpoint.awaitDiscard();
    }

    @Test
    void testChekpointingPausesAndResumeWhenTooManyCheckpoints() throws Exception {
        ManualClock clock = new ManualClock();
        clock.advanceTime(1L, TimeUnit.DAYS);
        int maxCleaningCheckpoints = 1;
        CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner();
        CheckpointRequestDecider checkpointRequestDecider = new CheckpointRequestDecider(maxCleaningCheckpoints, (currentTimeMillis, tillNextMillis) -> {}, (Clock)clock, 1L, new AtomicInteger(0)::get, () -> ((CheckpointsCleaner)checkpointsCleaner).getNumberOfCheckpointsToClean());
        boolean maxCheckpointsToRetain = true;
        ManuallyTriggeredScheduledExecutor executor = new ManuallyTriggeredScheduledExecutor();
        CompletedCheckpointStore checkpointStore = this.createRecoveredCompletedCheckpointStore(1, (Executor)executor);
        int nbCheckpointsToInject = 3;
        for (int i = 1; i <= nbCheckpointsToInject; ++i) {
            CompletedCheckpointStoreTest.TestCompletedCheckpoint completedCheckpoint = new CompletedCheckpointStoreTest.TestCompletedCheckpoint(new JobID(), i, i, Collections.emptyMap(), CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.RETAIN_ON_FAILURE));
            checkpointStore.addCheckpointAndSubsumeOldestOne((CompletedCheckpoint)completedCheckpoint, checkpointsCleaner, () -> {});
        }
        int nbCheckpointsSubmittedForCleaning = nbCheckpointsToInject - 1;
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> checkpointsCleaner.getNumberOfCheckpointsToClean() == nbCheckpointsSubmittedForCleaning));
        Assertions.assertThat((int)checkpointsCleaner.getNumberOfCheckpointsToClean()).isEqualTo(nbCheckpointsSubmittedForCleaning);
        Assertions.assertThat((Optional)checkpointRequestDecider.chooseRequestToExecute(CheckpointRequestDeciderTest.regularCheckpoint(), false, 0L)).isNotPresent();
        executor.triggerAll();
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> checkpointsCleaner.getNumberOfCheckpointsToClean() < nbCheckpointsSubmittedForCleaning));
        Assertions.assertThat((int)checkpointsCleaner.getNumberOfCheckpointsToClean()).isLessThan(nbCheckpointsSubmittedForCleaning);
        Assertions.assertThat((Optional)checkpointRequestDecider.chooseRequestToExecute(CheckpointRequestDeciderTest.regularCheckpoint(), false, 0L)).isPresent();
        checkpointStore.shutdown(JobStatus.FINISHED, checkpointsCleaner);
    }
}

