/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.memory.OpaqueMemoryResource;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractManagedMemoryStateBackend;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.forst.ConfigurableForStOptionsFactory;
import org.apache.flink.state.forst.ForStConfigurableOptions;
import org.apache.flink.state.forst.ForStKeyedStateBackend;
import org.apache.flink.state.forst.ForStKeyedStateBackendBuilder;
import org.apache.flink.state.forst.ForStMemoryConfiguration;
import org.apache.flink.state.forst.ForStMemoryControllerUtils;
import org.apache.flink.state.forst.ForStNativeMetricOptions;
import org.apache.flink.state.forst.ForStOperationUtils;
import org.apache.flink.state.forst.ForStOptions;
import org.apache.flink.state.forst.ForStOptionsFactory;
import org.apache.flink.state.forst.ForStResourceContainer;
import org.apache.flink.state.forst.ForStSharedResources;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.rocksdb.NativeLibraryLoader;
import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
public class ForStStateBackend
extends AbstractManagedMemoryStateBackend
implements ConfigurableStateBackend {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(ForStStateBackend.class);
    private static final int FORST_LIB_LOADING_ATTEMPTS = 3;
    private static boolean forStInitialized = false;
    @Nullable
    private Path remoteForStDirectory;
    @Nullable
    private File[] localForStDirectories;
    @Nullable
    private ReadableConfig configurableOptions;
    @Nullable
    private ForStOptionsFactory forStOptionsFactory;
    private final ForStMemoryConfiguration memoryConfiguration;
    private final ForStNativeMetricOptions nativeMetricOptions;
    private transient File[] initializedDbBasePaths;
    private transient JobID jobId;
    private transient int nextDirectory;
    private transient boolean isInitialized;
    private final ForStMemoryControllerUtils.ForStMemoryFactory forStMemoryFactory;

    public ForStStateBackend() {
        this.nativeMetricOptions = new ForStNativeMetricOptions();
        this.memoryConfiguration = new ForStMemoryConfiguration();
        this.forStMemoryFactory = ForStMemoryControllerUtils.ForStMemoryFactory.DEFAULT;
    }

    private ForStStateBackend(ForStStateBackend original, ReadableConfig config, ClassLoader classLoader) {
        this.memoryConfiguration = ForStMemoryConfiguration.fromOtherAndConfiguration(original.memoryConfiguration, config);
        this.memoryConfiguration.validate();
        if (original.remoteForStDirectory != null) {
            this.remoteForStDirectory = original.remoteForStDirectory;
        } else {
            String remoteDirStr = (String)config.get(ForStOptions.REMOTE_DIRECTORY);
            Path path = this.remoteForStDirectory = remoteDirStr == null ? null : new Path(remoteDirStr);
        }
        if (original.localForStDirectories != null) {
            this.localForStDirectories = original.localForStDirectories;
        } else {
            String forStLocalPaths = (String)config.get(ForStOptions.LOCAL_DIRECTORIES);
            if (forStLocalPaths != null) {
                String[] directories = forStLocalPaths.split(",|" + File.pathSeparator);
                try {
                    this.setLocalDbStoragePaths(directories);
                }
                catch (IllegalArgumentException e) {
                    throw new IllegalConfigurationException("Invalid configuration for ForSt state backend's local storage directories: " + e.getMessage(), (Throwable)e);
                }
            }
        }
        this.nativeMetricOptions = ForStNativeMetricOptions.fromConfig(config);
        this.configurableOptions = this.mergeConfigurableOptions(original.configurableOptions, config);
        try {
            this.forStOptionsFactory = this.configureOptionsFactory(original.forStOptionsFactory, (String)config.get(ForStOptions.OPTIONS_FACTORY), config, classLoader);
        }
        catch (DynamicCodeLoadingException e) {
            throw new FlinkRuntimeException((Throwable)e);
        }
        this.latencyTrackingConfigBuilder = original.latencyTrackingConfigBuilder.configure(config);
        this.forStMemoryFactory = original.forStMemoryFactory;
    }

    public ForStStateBackend configure(ReadableConfig config, ClassLoader classLoader) {
        return new ForStStateBackend(this, config, classLoader);
    }

    private void lazyInitializeForJob(Environment env, String operatorIdentifier) throws IOException {
        if (this.isInitialized) {
            return;
        }
        this.jobId = env.getJobID();
        if (this.localForStDirectories == null) {
            this.initializedDbBasePaths = new File[]{env.getTaskManagerInfo().getTmpWorkingDirectory()};
        } else {
            ArrayList<File> dirs = new ArrayList<File>(this.localForStDirectories.length);
            StringBuilder errorMessage = new StringBuilder();
            for (File f : this.localForStDirectories) {
                File testDir = new File(f, UUID.randomUUID().toString());
                if (!testDir.mkdirs()) {
                    String msg = "Local DB files directory '" + f + "' does not exist and cannot be created. ";
                    LOG.error(msg);
                    errorMessage.append(msg);
                } else {
                    dirs.add(f);
                }
                testDir.delete();
            }
            if (dirs.isEmpty()) {
                throw new IOException("No local storage directories available. " + errorMessage);
            }
            this.initializedDbBasePaths = dirs.toArray(new File[0]);
        }
        this.nextDirectory = new Random().nextInt(this.initializedDbBasePaths.length);
        this.isInitialized = true;
    }

    private File getNextStoragePath() {
        int ni = this.nextDirectory + 1;
        this.nextDirectory = ni = ni >= this.initializedDbBasePaths.length ? 0 : ni;
        return this.initializedDbBasePaths[ni];
    }

    public boolean supportsAsyncKeyedStateBackend() {
        return true;
    }

    public <K> ForStKeyedStateBackend<K> createAsyncKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> parameters) throws IOException {
        Environment env = parameters.getEnv();
        String tempDir = env.getTaskManagerInfo().getTmpWorkingDirectory().getAbsolutePath();
        ForStStateBackend.ensureForStIsLoaded(tempDir);
        String fileCompatibleIdentifier = parameters.getOperatorIdentifier().replaceAll("[^a-zA-Z0-9\\-]", "_");
        this.lazyInitializeForJob(env, fileCompatibleIdentifier);
        String opChildPath = String.format("op_%s_attempt_%s", fileCompatibleIdentifier, env.getTaskInfo().getAttemptNumber());
        File localBasePath = new File(new File(this.getNextStoragePath(), this.jobId.toHexString()), opChildPath);
        Path remoteBasePath = this.remoteForStDirectory != null ? new Path(new Path(this.remoteForStDirectory, this.jobId.toHexString()), opChildPath) : null;
        OpaqueMemoryResource<ForStSharedResources> sharedResources = ForStOperationUtils.allocateSharedCachesIfConfigured(this.memoryConfiguration, env, parameters.getManagedMemoryFraction(), LOG, this.forStMemoryFactory);
        if (sharedResources != null) {
            LOG.info("Obtained shared ForSt cache of size {} bytes", (Object)sharedResources.getSize());
        }
        ForStResourceContainer resourceContainer = this.createOptionsAndResourceContainer(sharedResources, localBasePath, remoteBasePath, this.nativeMetricOptions.isStatisticsEnabled());
        ForStKeyedStateBackendBuilder builder = new ForStKeyedStateBackendBuilder(resourceContainer, stateName -> resourceContainer.getColumnOptions(), parameters.getKeySerializer(), parameters.getNumberOfKeyGroups(), parameters.getMetricGroup(), parameters.getStateHandles()).setNativeMetricOptions(resourceContainer.getMemoryWatcherOptions(this.nativeMetricOptions));
        return builder.build();
    }

    public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> parameters) {
        throw new UnsupportedOperationException("Don't support createKeyedStateBackend yet");
    }

    public OperatorStateBackend createOperatorStateBackend(StateBackend.OperatorStateBackendParameters parameters) throws Exception {
        boolean asyncSnapshots = true;
        return new DefaultOperatorStateBackendBuilder(parameters.getEnv().getUserCodeClassLoader().asClassLoader(), parameters.getEnv().getExecutionConfig(), true, parameters.getStateHandles(), parameters.getCancelStreamRegistry()).build();
    }

    private ForStOptionsFactory configureOptionsFactory(@Nullable ForStOptionsFactory originalOptionsFactory, @Nullable String factoryClassName, ReadableConfig config, ClassLoader classLoader) throws DynamicCodeLoadingException {
        ForStOptionsFactory optionsFactory = null;
        if (originalOptionsFactory != null) {
            if (originalOptionsFactory instanceof ConfigurableForStOptionsFactory) {
                originalOptionsFactory = ((ConfigurableForStOptionsFactory)originalOptionsFactory).configure(config);
            }
            LOG.info("Using application-defined options factory: {}.", (Object)originalOptionsFactory);
            optionsFactory = originalOptionsFactory;
        } else if (factoryClassName != null) {
            try {
                Class<ForStOptionsFactory> clazz = Class.forName(factoryClassName, false, classLoader).asSubclass(ForStOptionsFactory.class);
                optionsFactory = clazz.newInstance();
                if (optionsFactory instanceof ConfigurableForStOptionsFactory) {
                    optionsFactory = ((ConfigurableForStOptionsFactory)optionsFactory).configure(config);
                }
                LOG.info("Using configured options factory: {}.", (Object)optionsFactory);
            }
            catch (ClassNotFoundException e) {
                throw new DynamicCodeLoadingException("Cannot find configured options factory class: " + factoryClassName, (Throwable)e);
            }
            catch (ClassCastException | IllegalAccessException | InstantiationException e) {
                throw new DynamicCodeLoadingException("The class configured under '" + ForStOptions.OPTIONS_FACTORY.key() + "' is not a valid options factory (" + factoryClassName + ')', (Throwable)e);
            }
        }
        return optionsFactory;
    }

    public void setLocalDbStoragePath(String path) {
        String[] stringArray;
        if (path == null) {
            stringArray = null;
        } else {
            String[] stringArray2 = new String[1];
            stringArray = stringArray2;
            stringArray2[0] = path;
        }
        this.setLocalDbStoragePaths(stringArray);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void setLocalDbStoragePaths(String ... paths) {
        if (paths == null) {
            this.localForStDirectories = null;
            return;
        }
        if (paths.length == 0) {
            throw new IllegalArgumentException("empty paths");
        }
        File[] pp = new File[paths.length];
        for (int i = 0; i < paths.length; ++i) {
            String path;
            String rawPath = paths[i];
            if (rawPath == null) {
                throw new IllegalArgumentException("null path");
            }
            URI uri = null;
            try {
                uri = new Path(rawPath).toUri();
            }
            catch (Exception exception) {
                // empty catch block
            }
            if (uri != null && uri.getScheme() != null) {
                if (!"file".equalsIgnoreCase(uri.getScheme())) throw new IllegalArgumentException("Path " + rawPath + " has a non-local scheme");
                path = uri.getPath();
            } else {
                path = rawPath;
            }
            pp[i] = new File(path);
            if (pp[i].isAbsolute()) continue;
            throw new IllegalArgumentException("Relative paths are not supported");
        }
        this.localForStDirectories = pp;
    }

    public String[] getLocalDbStoragePaths() {
        if (this.localForStDirectories == null) {
            return null;
        }
        String[] paths = new String[this.localForStDirectories.length];
        for (int i = 0; i < paths.length; ++i) {
            paths[i] = this.localForStDirectories[i].toString();
        }
        return paths;
    }

    public void setForStOptions(ForStOptionsFactory optionsFactory) {
        this.forStOptionsFactory = optionsFactory;
    }

    @Nullable
    public ForStOptionsFactory getForStOptions() {
        return this.forStOptionsFactory;
    }

    private ReadableConfig mergeConfigurableOptions(ReadableConfig base, ReadableConfig onTop) {
        if (base == null) {
            base = new Configuration();
        }
        Configuration configuration = new Configuration();
        Map baseMap = base.toMap();
        Map onTopMap = onTop.toMap();
        for (ConfigOption<?> option : ForStConfigurableOptions.CANDIDATE_CONFIGS) {
            Optional baseValue = base.getOptional(option);
            Optional topValue = onTop.getOptional(option);
            if (!topValue.isPresent() && !baseValue.isPresent()) continue;
            Object validValue = topValue.isPresent() ? topValue.get() : baseValue.get();
            ForStConfigurableOptions.checkArgumentValid(option, validValue);
            configuration.setString(option.key(), validValue.toString());
            String valueString = topValue.isPresent() ? (String)onTopMap.get(option.key()) : (String)baseMap.get(option.key());
            configuration.setString(option.key(), valueString);
        }
        return configuration;
    }

    @VisibleForTesting
    ForStResourceContainer createOptionsAndResourceContainer(@Nullable File localBasePath) {
        return this.createOptionsAndResourceContainer(null, localBasePath, null, false);
    }

    @VisibleForTesting
    private ForStResourceContainer createOptionsAndResourceContainer(@Nullable OpaqueMemoryResource<ForStSharedResources> sharedResources, @Nullable File localBasePath, @Nullable Path remoteBasePath, boolean enableStatistics) {
        return new ForStResourceContainer((ReadableConfig)(this.configurableOptions != null ? this.configurableOptions : new Configuration()), this.forStOptionsFactory, sharedResources, localBasePath, remoteBasePath, enableStatistics);
    }

    public String toString() {
        return "ForStStateBackend{, localForStDirectories=" + Arrays.toString(this.localForStDirectories) + ", remoteForStDirectory=" + this.remoteForStDirectory + '}';
    }

    @VisibleForTesting
    static void ensureForStIsLoaded(String tempDirectory) throws IOException {
        ForStStateBackend.ensureForStIsLoaded(tempDirectory, NativeLibraryLoader::getInstance);
    }

    @VisibleForTesting
    static void setForStInitialized(boolean initialized) {
        forStInitialized = initialized;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    static void ensureForStIsLoaded(String tempDirectory, Supplier<NativeLibraryLoader> nativeLibraryLoaderSupplier) throws IOException {
        Class<ForStStateBackend> clazz = ForStStateBackend.class;
        synchronized (ForStStateBackend.class) {
            if (!forStInitialized) {
                File tempDirParent = new File(tempDirectory).getAbsoluteFile();
                LOG.info("Attempting to load ForSt native library and store it under '{}'", (Object)tempDirParent);
                Throwable lastException = null;
                for (int attempt = 1; attempt <= 3; ++attempt) {
                    File rocksLibFolder = null;
                    try {
                        rocksLibFolder = new File(tempDirParent, "rocksdb-lib-" + new AbstractID());
                        LOG.debug("Attempting to create ForSt native library folder {}", (Object)rocksLibFolder);
                        rocksLibFolder.mkdirs();
                        nativeLibraryLoaderSupplier.get().loadLibrary(rocksLibFolder.getAbsolutePath());
                        RocksDB.loadLibrary();
                        LOG.info("Successfully loaded ForSt native library");
                        forStInitialized = true;
                        // ** MonitorExit[var2_2] (shouldn't be in output)
                        return;
                    }
                    catch (Throwable t) {
                        lastException = t;
                        LOG.debug("ForSt JNI library loading attempt {} failed", (Object)attempt, (Object)t);
                        try {
                            ForStStateBackend.resetForStLoadedFlag();
                        }
                        catch (Throwable tt) {
                            LOG.debug("Failed to reset 'initialized' flag in ForSt native code loader", tt);
                        }
                        FileUtils.deleteDirectoryQuietly((File)rocksLibFolder);
                        continue;
                    }
                }
                throw new IOException("Could not load the native ForSt library", lastException);
            }
            // ** MonitorExit[var2_2] (shouldn't be in output)
            return;
        }
    }

    @VisibleForTesting
    static void resetForStLoadedFlag() throws Exception {
        Field initField = NativeLibraryLoader.class.getDeclaredField("initialized");
        initField.setAccessible(true);
        initField.setBoolean(null, false);
    }
}

