package org.apache.flink.runtime.state.changelog.inmemory;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageFactory;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.class */
class StateChangelogStorageLoaderTest {

    /* loaded from: input_file:org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest$TestStateChangelogStorage.class */
    private static class TestStateChangelogStorage implements StateChangelogStorage<ChangelogStateHandle> {
        private TestStateChangelogStorage() {
        }

        public StateChangelogWriter<ChangelogStateHandle> createWriter(String str, KeyGroupRange keyGroupRange, MailboxExecutor mailboxExecutor) {
            return null;
        }

        public StateChangelogHandleReader<ChangelogStateHandle> createReader() {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest$TestStateChangelogStorageFactory.class */
    private static class TestStateChangelogStorageFactory implements StateChangelogStorageFactory {
        private TestStateChangelogStorageFactory() {
        }

        public String getIdentifier() {
            return "memory";
        }

        public StateChangelogStorage<?> createStorage(JobID jobID, Configuration configuration, TaskManagerJobMetricGroup taskManagerJobMetricGroup, LocalRecoveryConfig localRecoveryConfig) {
            return new TestStateChangelogStorage();
        }

        public StateChangelogStorageView<?> createStorageView(Configuration configuration) throws IOException {
            return new TestStateChangelogStorage();
        }
    }

    StateChangelogStorageLoaderTest() {
    }

    @Test
    void testLoadSpiImplementation() throws IOException {
        StateChangelogStorageLoader.initialize(getPluginManager(Collections.emptyIterator()));
        Assertions.assertThat(StateChangelogStorageLoader.load(JobID.generate(), new Configuration(), UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled())).isNotNull();
    }

    @Test
    void testLoadNotExist() throws IOException {
        StateChangelogStorageLoader.initialize(getPluginManager(Collections.emptyIterator()));
        Assertions.assertThat(StateChangelogStorageLoader.load(JobID.generate(), new Configuration().set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, "not_exist"), UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled())).isNull();
    }

    @Test
    void testLoadPluginImplementation() throws IOException {
        StateChangelogStorageLoader.initialize(getPluginManager(Collections.singletonList(new TestStateChangelogStorageFactory()).iterator()));
        Assertions.assertThat(StateChangelogStorageLoader.load(JobID.generate(), new Configuration(), UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled())).isInstanceOf(TestStateChangelogStorage.class);
    }

    private PluginManager getPluginManager(final Iterator<? extends StateChangelogStorageFactory> it) {
        return new PluginManager() { // from class: org.apache.flink.runtime.state.changelog.inmemory.StateChangelogStorageLoaderTest.1
            public <P> Iterator<P> load(Class<P> cls) {
                Preconditions.checkArgument(cls.equals(StateChangelogStorageFactory.class));
                return it;
            }
        };
    }
}
