/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.delegate.DelegatingStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackendFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StateBackendLoader {
    private static final Logger LOG = LoggerFactory.getLogger(StateBackendLoader.class);
    private static final String CHANGELOG_STATE_BACKEND = "org.apache.flink.state.changelog.ChangelogStateBackend";
    private static final String DEACTIVATED_CHANGELOG_STATE_BACKEND = "org.apache.flink.state.changelog.DeactivatedChangelogStateBackend";
    private static final String ROCKSDB_STATE_BACKEND_FACTORY = "org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactory";
    public static final String HASHMAP_STATE_BACKEND_NAME = "hashmap";
    @Deprecated
    public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";
    @Deprecated
    public static final String FS_STATE_BACKEND_NAME = "filesystem";
    public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";

    @Nonnull
    public static StateBackend loadStateBackendFromConfig(ReadableConfig config, ClassLoader classLoader, @Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
        StateBackendFactory factory;
        String backendName;
        Preconditions.checkNotNull(config, "config");
        Preconditions.checkNotNull(classLoader, "classLoader");
        String factoryClassName = backendName = config.get(StateBackendOptions.STATE_BACKEND);
        switch (backendName.toLowerCase()) {
            case "jobmanager": {
                MemoryStateBackend backend = new MemoryStateBackendFactory().createFromConfig(config, classLoader);
                if (logger != null) {
                    logger.warn("MemoryStateBackend has been deprecated. Please use 'hashmap' state backend instead with JobManagerCheckpointStorage for equivalent functionality");
                    logger.info("State backend is set to job manager {}", (Object)backend);
                }
                return backend;
            }
            case "filesystem": {
                if (logger != null) {
                    logger.warn("{} state backend has been deprecated. Please use 'hashmap' state backend instead.", (Object)backendName.toLowerCase());
                }
            }
            case "hashmap": {
                HashMapStateBackend hashMapStateBackend = new HashMapStateBackendFactory().createFromConfig(config, classLoader);
                if (logger != null) {
                    logger.info("State backend is set to heap memory {}", (Object)hashMapStateBackend);
                }
                return hashMapStateBackend;
            }
            case "rocksdb": {
                factoryClassName = ROCKSDB_STATE_BACKEND_FACTORY;
            }
        }
        if (logger != null) {
            logger.info("Loading state backend via factory {}", (Object)factoryClassName);
        }
        try {
            Class<StateBackendFactory> clazz = Class.forName(factoryClassName, false, classLoader).asSubclass(StateBackendFactory.class);
            factory = clazz.newInstance();
        }
        catch (ClassNotFoundException e) {
            throw new DynamicCodeLoadingException("Cannot find configured state backend factory class: " + backendName, e);
        }
        catch (ClassCastException | IllegalAccessException | InstantiationException e) {
            throw new DynamicCodeLoadingException("The class configured under '" + StateBackendOptions.STATE_BACKEND.key() + "' is not a valid state backend factory (" + backendName + ')', e);
        }
        return factory.createFromConfig(config, classLoader);
    }

    private static StateBackend loadFromApplicationOrConfigOrDefaultInternal(@Nullable StateBackend fromApplication, Configuration jobConfig, Configuration clusterConfig, ClassLoader classLoader, @Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
        StateBackend backend;
        Preconditions.checkNotNull(jobConfig, "jobConfig");
        Preconditions.checkNotNull(clusterConfig, "clusterConfig");
        Preconditions.checkNotNull(classLoader, "classLoader");
        Configuration mergedConfig = new Configuration(clusterConfig);
        mergedConfig.addAll(jobConfig);
        if (fromApplication != null) {
            if (fromApplication instanceof ConfigurableStateBackend) {
                if (logger != null) {
                    logger.info("Using job/cluster config to configure application-defined state backend: {}", (Object)fromApplication);
                }
                backend = ((ConfigurableStateBackend)fromApplication).configure(clusterConfig, classLoader);
            } else {
                backend = fromApplication;
            }
            if (logger != null) {
                logger.info("Using application-defined state backend: {}", (Object)backend);
            }
        } else {
            backend = StateBackendLoader.loadStateBackendFromConfig(mergedConfig, classLoader, logger);
        }
        return backend;
    }

    public static StateBackend fromApplicationOrConfigOrDefault(@Nullable StateBackend fromApplication, Configuration jobConfig, Configuration clusterConfig, ClassLoader classLoader, @Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
        StateBackend backend;
        StateBackend rootBackend = StateBackendLoader.loadFromApplicationOrConfigOrDefaultInternal(fromApplication, jobConfig, clusterConfig, classLoader, logger);
        boolean enableChangeLog = jobConfig.getOptional(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG).orElse(clusterConfig.get(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG));
        if (enableChangeLog) {
            backend = StateBackendLoader.wrapStateBackend(rootBackend, classLoader, CHANGELOG_STATE_BACKEND);
            LOG.info("State backend loader loads {} to delegate {}", (Object)backend.getClass().getSimpleName(), (Object)rootBackend.getClass().getSimpleName());
        } else {
            backend = rootBackend;
            LOG.info("State backend loader loads the state backend as {}", (Object)backend.getClass().getSimpleName());
        }
        return backend;
    }

    public static boolean stateBackendFromApplicationOrConfigOrDefaultUseManagedMemory(Configuration config, Optional<Boolean> stateBackendFromApplicationUsesManagedMemory, ClassLoader classLoader) {
        Preconditions.checkNotNull(config, "config");
        if (stateBackendFromApplicationUsesManagedMemory.isPresent()) {
            return stateBackendFromApplicationUsesManagedMemory.get();
        }
        try {
            StateBackend fromConfig = StateBackendLoader.loadStateBackendFromConfig(config, classLoader, LOG);
            return fromConfig.useManagedMemory();
        }
        catch (IOException | IllegalConfigurationException | DynamicCodeLoadingException e) {
            LOG.warn("Cannot decide whether state backend uses managed memory. Will reserve managed memory by default.", (Throwable)e);
            return true;
        }
    }

    public static StateBackend loadStateBackendFromKeyedStateHandles(StateBackend originalStateBackend, ClassLoader classLoader, Collection<KeyedStateHandle> keyedStateHandles) throws DynamicCodeLoadingException {
        if (!StateBackendLoader.isChangelogStateBackend(originalStateBackend) && keyedStateHandles.stream().anyMatch(stateHandle -> stateHandle instanceof ChangelogStateBackendHandle)) {
            return StateBackendLoader.wrapStateBackend(originalStateBackend, classLoader, DEACTIVATED_CHANGELOG_STATE_BACKEND);
        }
        return originalStateBackend;
    }

    public static boolean isChangelogStateBackend(StateBackend backend) {
        return CHANGELOG_STATE_BACKEND.equals(backend.getClass().getName());
    }

    private static StateBackend wrapStateBackend(StateBackend backend, ClassLoader classLoader, String className) throws DynamicCodeLoadingException {
        try {
            Constructor<DelegatingStateBackend> constructor = Class.forName(className, false, classLoader).asSubclass(DelegatingStateBackend.class).getDeclaredConstructor(StateBackend.class);
            constructor.setAccessible(true);
            return constructor.newInstance(backend);
        }
        catch (ClassNotFoundException e) {
            throw new DynamicCodeLoadingException("Cannot find DelegateStateBackend class: " + className, e);
        }
        catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new DynamicCodeLoadingException("Fail to initialize: " + className, e);
        }
    }

    private StateBackendLoader() {
    }
}

