/*
 * Decompiled with CFR 0.152.
 */
package org.apache.linkis.engineconnplugin.flink.client.context;

import java.net.URL;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.util.TemporaryClassLoaderContext;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.linkis.engineconnplugin.flink.client.factory.LinkisKubernetesClusterClientFactory;
import org.apache.linkis.engineconnplugin.flink.client.factory.LinkisYarnClusterClientFactory;
import org.apache.linkis.engineconnplugin.flink.client.shims.FlinkShims;
import org.apache.linkis.engineconnplugin.flink.client.shims.SessionState;
import org.apache.linkis.engineconnplugin.flink.client.shims.config.Environment;
import org.apache.linkis.engineconnplugin.flink.client.shims.exception.SqlExecutionException;
import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutionContext {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutionContext.class);
    private final Environment environment;
    private final ClassLoader classLoader;
    private final Configuration flinkConfig;
    private final String flinkVersion;
    private FlinkShims flinkShims;
    private LinkisYarnClusterClientFactory clusterClientFactory;
    private LinkisKubernetesClusterClientFactory kubernetesClusterClientFactory;
    private TableEnvironment tableEnv;
    private ExecutionEnvironment execEnv;
    private StreamExecutionEnvironment streamExecEnv;
    private Executor executor;
    private SessionState sessionState;

    private ExecutionContext(Environment environment, @Nullable SessionState sessionState, List<URL> dependencies, Configuration flinkConfig, String flinkVersion) {
        this(environment, sessionState, dependencies, flinkConfig, new LinkisYarnClusterClientFactory(), new LinkisKubernetesClusterClientFactory(), flinkVersion);
    }

    private ExecutionContext(Environment environment, @Nullable SessionState sessionState, List<URL> dependencies, Configuration flinkConfig, LinkisYarnClusterClientFactory clusterClientFactory, String flinkVersion) {
        this(environment, sessionState, dependencies, flinkConfig, clusterClientFactory, new LinkisKubernetesClusterClientFactory(), flinkVersion);
    }

    private ExecutionContext(Environment environment, @Nullable SessionState sessionState, List<URL> dependencies, Configuration flinkConfig, LinkisYarnClusterClientFactory clusterClientFactory, LinkisKubernetesClusterClientFactory linkisKubernetesClusterClientFactory, String flinkVersion) {
        this.flinkVersion = flinkVersion;
        try {
            this.flinkShims = FlinkShims.getInstance((String)flinkVersion);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        this.classLoader = ClientUtils.buildUserCodeClassLoader(dependencies, Collections.emptyList(), (ClassLoader)this.getClass().getClassLoader(), (Configuration)flinkConfig);
        this.environment = environment;
        this.flinkConfig = flinkConfig;
        this.sessionState = sessionState;
        if (dependencies == null) {
            dependencies = Collections.emptyList();
        }
        LOG.debug("Deployment descriptor: {}", (Object)environment.getDeployment());
        LOG.info("flinkConfig config: {}", (Object)flinkConfig);
        this.clusterClientFactory = clusterClientFactory;
        this.kubernetesClusterClientFactory = linkisKubernetesClusterClientFactory;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TableEnvironment getTableEnvironment() {
        if (this.tableEnv == null) {
            ExecutionContext executionContext = this;
            synchronized (executionContext) {
                if (this.tableEnv == null) {
                    if (this.flinkVersion.equals(FlinkEnvConfiguration.FLINK_1_12_2_VERSION())) {
                        this.streamExecEnv = this.createStreamExecutionEnvironment();
                        try {
                            this.tableEnv = (TableEnvironment)this.flinkShims.initializeTableEnvironment((Object)this.environment, (Object)this.flinkConfig, (Object)this.streamExecEnv, (Object)this.sessionState, this.classLoader);
                        }
                        catch (SqlExecutionException e) {
                            throw new RuntimeException(e);
                        }
                    } else if (this.flinkVersion.equals(FlinkEnvConfiguration.FLINK_1_16_2_VERSION())) {
                        this.streamExecEnv = new StreamExecutionEnvironment(new Configuration(this.flinkConfig), this.classLoader);
                        this.tableEnv = (TableEnvironment)this.flinkShims.createTableEnvironment((Object)this.flinkConfig, (Object)this.streamExecEnv, (Object)this.sessionState, this.classLoader);
                    } else {
                        throw new RuntimeException("Unsupported flink versions, Currently  only 1.12.2 and 1.16.2 are supported");
                    }
                }
            }
        }
        return this.tableEnv;
    }

    private StreamExecutionEnvironment createStreamExecutionEnvironment() {
        StreamContextEnvironment.setAsContext((PipelineExecutorServiceLoader)new DefaultExecutorServiceLoader(), (Configuration)this.flinkConfig, (ClassLoader)this.classLoader, (boolean)false, (boolean)false);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)this.flinkConfig);
        env.setRestartStrategy(this.environment.getExecution().getRestartStrategy());
        env.setParallelism(this.environment.getExecution().getParallelism());
        env.setMaxParallelism(this.environment.getExecution().getMaxParallelism());
        env.setStreamTimeCharacteristic(this.environment.getExecution().getTimeCharacteristic());
        if (env.getStreamTimeCharacteristic() == TimeCharacteristic.EventTime) {
            env.getConfig().setAutoWatermarkInterval(this.environment.getExecution().getPeriodicWatermarksInterval());
        }
        return env;
    }

    public StreamExecutionEnvironment getStreamExecutionEnvironment() throws SqlExecutionException {
        if (this.streamExecEnv == null) {
            this.getTableEnvironment();
        }
        return this.streamExecEnv;
    }

    public void setString(String key, String value) {
        this.flinkConfig.setString(key, value);
    }

    public void setBoolean(String key, boolean value) {
        this.flinkConfig.setBoolean(key, value);
    }

    public Configuration getFlinkConfig() {
        return this.flinkConfig;
    }

    public ClassLoader getClassLoader() {
        return this.classLoader;
    }

    public Environment getEnvironment() {
        return this.environment;
    }

    public YarnClusterDescriptor createClusterDescriptor() {
        return this.clusterClientFactory.createClusterDescriptor(this.flinkConfig);
    }

    public KubernetesClusterDescriptor createKubernetesClusterDescriptor() {
        return this.kubernetesClusterClientFactory.createClusterDescriptor(this.flinkConfig);
    }

    public Map<String, Catalog> getCatalogs() {
        HashMap<String, Catalog> catalogs = new HashMap<String, Catalog>();
        for (String name : this.tableEnv.listCatalogs()) {
            this.tableEnv.getCatalog(name).ifPresent(c -> catalogs.put(name, (Catalog)c));
        }
        return catalogs;
    }

    public SessionState getSessionState() {
        return this.sessionState;
    }

    public <R> R wrapClassLoader(Supplier<R> supplier) {
        try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of((ClassLoader)this.classLoader);){
            R r = supplier.get();
            return r;
        }
    }

    public <R> R wrapClassLoader(Function<TableEnvironmentInternal, R> function) throws SqlExecutionException {
        try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of((ClassLoader)this.classLoader);){
            R r = function.apply((TableEnvironmentInternal)this.getTableEnvironment());
            return r;
        }
    }

    void wrapClassLoader(Runnable runnable) {
        try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of((ClassLoader)this.classLoader);){
            runnable.run();
        }
    }

    public ExecutionConfig getExecutionConfig() {
        if (this.streamExecEnv != null) {
            return this.streamExecEnv.getConfig();
        }
        return this.execEnv.getConfig();
    }

    public LinkisYarnClusterClientFactory getClusterClientFactory() {
        return this.clusterClientFactory;
    }

    public static Builder builder(Environment defaultEnv, Environment sessionEnv, List<URL> dependencies, Configuration configuration, String flinkVersion) {
        return new Builder(defaultEnv, sessionEnv, dependencies, configuration, flinkVersion);
    }

    public ExecutionContext cloneExecutionContext(Builder builder) {
        ExecutionContext newExecutionContext = builder.clusterClientFactory(this.clusterClientFactory).build();
        if (this.tableEnv != null) {
            newExecutionContext.tableEnv = this.tableEnv;
            newExecutionContext.execEnv = this.execEnv;
            newExecutionContext.streamExecEnv = this.streamExecEnv;
            newExecutionContext.executor = this.executor;
        }
        return newExecutionContext;
    }

    public CompletableFuture<String> triggerSavepoint(ClusterClient<ApplicationId> clusterClient, JobID jobId, String savepoint) {
        return this.flinkShims.triggerSavepoint(clusterClient, (Object)jobId, savepoint);
    }

    public CompletableFuture<String> cancelWithSavepoint(ClusterClient<ApplicationId> clusterClient, JobID jobId, String savepoint) {
        return this.flinkShims.cancelWithSavepoint(clusterClient, (Object)jobId, savepoint);
    }

    public CompletableFuture<String> stopWithSavepoint(ClusterClient<ApplicationId> clusterClient, JobID jobId, boolean advanceToEndOfEventTime, String savepoint) {
        return this.flinkShims.stopWithSavepoint(clusterClient, (Object)jobId, advanceToEndOfEventTime, savepoint);
    }

    public static class Builder {
        private final Environment sessionEnv;
        private final List<URL> dependencies;
        private final Configuration configuration;
        private Environment defaultEnv;
        private Environment currentEnv;
        private String flinkVersion;
        private LinkisYarnClusterClientFactory clusterClientFactory;
        @Nullable
        private SessionState sessionState;

        private Builder(Environment defaultEnv, @Nullable Environment sessionEnv, List<URL> dependencies, Configuration configuration, String flinkVersion) {
            this.defaultEnv = defaultEnv;
            this.sessionEnv = sessionEnv;
            this.dependencies = dependencies;
            this.configuration = configuration;
            this.flinkVersion = flinkVersion;
        }

        public Builder env(Environment environment) {
            this.currentEnv = environment;
            return this;
        }

        public Builder sessionState(SessionState sessionState) {
            this.sessionState = sessionState;
            return this;
        }

        Builder clusterClientFactory(LinkisYarnClusterClientFactory clusterClientFactory) {
            this.clusterClientFactory = clusterClientFactory;
            return this;
        }

        public ExecutionContext build() {
            if (this.sessionEnv == null) {
                this.currentEnv = this.defaultEnv;
            }
            if (this.clusterClientFactory == null) {
                return new ExecutionContext(this.currentEnv == null ? Environment.merge((Environment)this.defaultEnv, (Environment)this.sessionEnv) : this.currentEnv, this.sessionState, this.dependencies, this.configuration, this.flinkVersion);
            }
            return new ExecutionContext(this.currentEnv == null ? Environment.merge((Environment)this.defaultEnv, (Environment)this.sessionEnv) : this.currentEnv, this.sessionState, this.dependencies, this.configuration, this.clusterClientFactory, this.flinkVersion);
        }
    }
}

