package org.apache.flink.runtime.checkpoint;

import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.FlinkCompletableFutureAssert;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounterITCase.class */
class ZooKeeperCheckpointIDCounterITCase extends CheckpointIDCounterTestBase {
    private final ZooKeeperExtension zooKeeperExtension = new ZooKeeperExtension();

    @RegisterExtension
    final EachCallbackWrapper<ZooKeeperExtension> zooKeeperResource = new EachCallbackWrapper<>(this.zooKeeperExtension);

    @RegisterExtension
    final TestingFatalErrorHandlerExtension testingFatalErrorHandlerResource = new TestingFatalErrorHandlerExtension();

    ZooKeeperCheckpointIDCounterITCase() {
    }

    private CuratorFramework getZooKeeperClient() {
        return this.zooKeeperExtension.getZooKeeperClient(this.testingFatalErrorHandlerResource.getTestingFatalErrorHandler());
    }

    @Test
    void testShutdownRemovesState() throws Exception {
        ZooKeeperCheckpointIDCounter mo46createCheckpointIdCounter = mo46createCheckpointIdCounter();
        mo46createCheckpointIdCounter.start();
        CuratorFramework zooKeeperClient = getZooKeeperClient();
        Assertions.assertThat(zooKeeperClient.checkExists().forPath(mo46createCheckpointIdCounter.getPath())).isNotNull();
        mo46createCheckpointIdCounter.shutdown(JobStatus.FINISHED).join();
        Assertions.assertThat(zooKeeperClient.checkExists().forPath(mo46createCheckpointIdCounter.getPath())).isNull();
    }

    @Test
    void testIdempotentShutdown() throws Exception {
        ZooKeeperCheckpointIDCounter mo46createCheckpointIdCounter = mo46createCheckpointIdCounter();
        mo46createCheckpointIdCounter.start();
        CuratorFramework zooKeeperClient = getZooKeeperClient();
        mo46createCheckpointIdCounter.shutdown(JobStatus.FINISHED).join();
        mo46createCheckpointIdCounter.shutdown(JobStatus.FINISHED).join();
        Assertions.assertThat(zooKeeperClient.checkExists().forPath(mo46createCheckpointIdCounter.getPath())).isNull();
    }

    @Test
    void testShutdownWithFailureDueToMissingConnection() throws Exception {
        ZooKeeperCheckpointIDCounter mo46createCheckpointIdCounter = mo46createCheckpointIdCounter();
        mo46createCheckpointIdCounter.start();
        this.zooKeeperExtension.close();
        ((FlinkCompletableFutureAssert) FlinkAssertions.assertThatFuture(mo46createCheckpointIdCounter.shutdown(JobStatus.FINISHED)).as("The shutdown should fail because of the client connection being dropped.", new Object[0])).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(IllegalStateException.class);
    }

    @Test
    void testShutdownWithFailureDueToExistingChildNodes() throws Exception {
        ZooKeeperCheckpointIDCounter mo46createCheckpointIdCounter = mo46createCheckpointIdCounter();
        mo46createCheckpointIdCounter.start();
        CuratorFramework useNamespaceAndEnsurePath = ZooKeeperUtils.useNamespaceAndEnsurePath(getZooKeeperClient(), "/");
        String generateZookeeperPath = ZooKeeperUtils.generateZookeeperPath(new String[]{mo46createCheckpointIdCounter.getPath()});
        String generateZookeeperPath2 = ZooKeeperUtils.generateZookeeperPath(new String[]{generateZookeeperPath, "unexpected-child-node-causing-a-failure"});
        useNamespaceAndEnsurePath.create().forPath(generateZookeeperPath2);
        ((FlinkCompletableFutureAssert) FlinkAssertions.assertThatFuture(mo46createCheckpointIdCounter.shutdown(JobStatus.FINISHED)).as("The shutdown should fail because of a child node being present and the shutdown not performing an explicit recursive deletion.", new Object[0])).eventuallyFailsWith(ExecutionException.class).havingCause().withCause(KeeperException.create(KeeperException.Code.NOTEMPTY, ZooKeeperUtils.generateZookeeperPath(new String[]{useNamespaceAndEnsurePath.getNamespace(), generateZookeeperPath})));
        useNamespaceAndEnsurePath.delete().forPath(generateZookeeperPath2);
        mo46createCheckpointIdCounter.shutdown(JobStatus.FINISHED).join();
        Assertions.assertThat(useNamespaceAndEnsurePath.checkExists().forPath(generateZookeeperPath)).as("A retry of the shutdown should have worked now after the root cause was resolved.", new Object[0]).isNull();
    }

    @Test
    void testSuspendKeepsState() throws Exception {
        ZooKeeperCheckpointIDCounter mo46createCheckpointIdCounter = mo46createCheckpointIdCounter();
        mo46createCheckpointIdCounter.start();
        CuratorFramework zooKeeperClient = getZooKeeperClient();
        Assertions.assertThat(zooKeeperClient.checkExists().forPath(mo46createCheckpointIdCounter.getPath())).isNotNull();
        mo46createCheckpointIdCounter.shutdown(JobStatus.SUSPENDED).join();
        Assertions.assertThat(zooKeeperClient.checkExists().forPath(mo46createCheckpointIdCounter.getPath())).isNotNull();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.checkpoint.CheckpointIDCounterTestBase
    /* renamed from: createCheckpointIdCounter, reason: merged with bridge method [inline-methods] */
    public ZooKeeperCheckpointIDCounter mo46createCheckpointIdCounter() throws Exception {
        return new ZooKeeperCheckpointIDCounter(ZooKeeperUtils.useNamespaceAndEnsurePath(getZooKeeperClient(), "/"), new DefaultLastStateConnectionStateListener());
    }
}
