/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.util;

import java.net.URL;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.PseudoRandomValueSelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.test.util.MiniClusterPipelineExecutorServiceLoader;

public class TestStreamEnvironment
extends StreamExecutionEnvironment {
    private static final String STATE_CHANGE_LOG_CONFIG_ON = "on";
    private static final String STATE_CHANGE_LOG_CONFIG_UNSET = "unset";
    private static final String STATE_CHANGE_LOG_CONFIG_RAND = "random";
    private static final boolean RANDOMIZE_CHECKPOINTING_CONFIG = Boolean.parseBoolean(System.getProperty("checkpointing.randomization", "false"));
    private static final String STATE_CHANGE_LOG_CONFIG = System.getProperty("checkpointing.changelog", "unset").trim();
    private static AtomicReference<JobExecutionResult> lastJobExecutionResult = new AtomicReference<Object>(null);
    private final MiniCluster miniCluster;
    private final int parallelism;
    private final Collection<Path> jarFiles;
    private final Collection<URL> classPaths;

    public TestStreamEnvironment(MiniCluster miniCluster, Configuration config, int parallelism, Collection<Path> jarFiles, Collection<URL> classPaths) {
        super((PipelineExecutorServiceLoader)new MiniClusterPipelineExecutorServiceLoader(miniCluster), MiniClusterPipelineExecutorServiceLoader.updateConfigurationForMiniCluster(config, jarFiles, classPaths), null);
        this.setParallelism(parallelism);
        this.miniCluster = miniCluster;
        this.parallelism = parallelism;
        this.jarFiles = jarFiles;
        this.classPaths = classPaths;
    }

    public TestStreamEnvironment(MiniCluster miniCluster, int parallelism) {
        this(miniCluster, new Configuration(), parallelism, Collections.emptyList(), Collections.emptyList());
    }

    public static void setAsContext(MiniCluster miniCluster, int parallelism, Collection<Path> jarFiles, Collection<URL> classpaths) {
        StreamExecutionEnvironmentFactory factory = conf -> {
            TestStreamEnvironment env = new TestStreamEnvironment(miniCluster, conf, parallelism, jarFiles, classpaths);
            TestStreamEnvironment.randomizeConfiguration(miniCluster, conf);
            env.configure((ReadableConfig)conf, env.getUserClassloader());
            return env;
        };
        TestStreamEnvironment.initializeContextEnvironment((StreamExecutionEnvironmentFactory)factory);
    }

    public void setAsContext() {
        StreamExecutionEnvironmentFactory factory = conf -> {
            TestStreamEnvironment env = new TestStreamEnvironment(this.miniCluster, conf, this.parallelism, this.jarFiles, this.classPaths);
            TestStreamEnvironment.randomizeConfiguration(this.miniCluster, conf);
            env.configure((ReadableConfig)conf, env.getUserClassloader());
            return env;
        };
        TestStreamEnvironment.initializeContextEnvironment((StreamExecutionEnvironmentFactory)factory);
    }

    private static void randomizeConfiguration(MiniCluster miniCluster, Configuration conf) {
        if (RANDOMIZE_CHECKPOINTING_CONFIG) {
            PseudoRandomValueSelector.randomize((Configuration)conf, (ConfigOption)CheckpointingOptions.ENABLE_UNALIGNED, (Object[])new Boolean[]{true, false});
            PseudoRandomValueSelector.randomize((Configuration)conf, (ConfigOption)CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, (Object[])new Duration[]{Duration.ofSeconds(0L), Duration.ofMillis(100L), Duration.ofSeconds(2L)});
            PseudoRandomValueSelector.randomize((Configuration)conf, (ConfigOption)CheckpointingOptions.CLEANER_PARALLEL_MODE, (Object[])new Boolean[]{true, false});
            PseudoRandomValueSelector.randomize((Configuration)conf, (ConfigOption)CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS, (Object[])new Boolean[]{true, false});
            PseudoRandomValueSelector.randomize((Configuration)conf, (ConfigOption)ExecutionOptions.SNAPSHOT_COMPRESSION, (Object[])new Boolean[]{true, false});
            if (!conf.contains(CheckpointingOptions.FILE_MERGING_ENABLED)) {
                PseudoRandomValueSelector.randomize((Configuration)conf, (ConfigOption)CheckpointingOptions.FILE_MERGING_ENABLED, (Object[])new Boolean[]{true});
            }
        }
        PseudoRandomValueSelector.randomize((Configuration)conf, (ConfigOption)ConfigOptions.key((String)"state.backend.rocksdb.use-ingest-db-restore-mode").booleanType().noDefaultValue(), (Object[])new Boolean[]{true, false});
        if (!conf.contains(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG) && !((Boolean)conf.get(CheckpointingOptions.FILE_MERGING_ENABLED)).booleanValue()) {
            if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) {
                conf.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, (Object)true);
            } else if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_RAND)) {
                PseudoRandomValueSelector.randomize((Configuration)conf, (ConfigOption)StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, (Object[])new Boolean[]{true, false});
            }
        }
        if (((Boolean)conf.get(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)).booleanValue()) {
            if (!conf.contains(StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED)) {
                PseudoRandomValueSelector.randomize((Configuration)conf, (ConfigOption)StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED, (Object[])new Boolean[]{true, true, true, false});
            }
            if (!conf.contains(StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL)) {
                PseudoRandomValueSelector.randomize((Configuration)conf, (ConfigOption)StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, (Object[])new Duration[]{Duration.ofMillis(100L), Duration.ofMillis(500L), Duration.ofSeconds(1L), Duration.ofSeconds(5L)});
            }
            miniCluster.overrideRestoreModeForChangelogStateBackend();
        }
        PseudoRandomValueSelector.randomize((Configuration)conf, (ConfigOption)ConfigOptions.key((String)"table.exec.unbounded-over.version").intType().noDefaultValue(), (Object[])new Integer[]{1, 2});
        PseudoRandomValueSelector.randomize((Configuration)conf, (ConfigOption)ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY, (Object[])new ExecutionConfigOptions.SinkUpsertMaterializeStrategy[]{ExecutionConfigOptions.SinkUpsertMaterializeStrategy.LEGACY, ExecutionConfigOptions.SinkUpsertMaterializeStrategy.VALUE, ExecutionConfigOptions.SinkUpsertMaterializeStrategy.MAP, ExecutionConfigOptions.SinkUpsertMaterializeStrategy.ADAPTIVE});
    }

    public static void setAsContext(MiniCluster miniCluster, int parallelism) {
        TestStreamEnvironment.setAsContext(miniCluster, parallelism, Collections.emptyList(), Collections.emptyList());
    }

    public static void unsetAsContext() {
        TestStreamEnvironment.resetContextEnvironment();
    }

    public JobExecutionResult execute(String jobName) throws Exception {
        JobExecutionResult result = super.execute(jobName);
        lastJobExecutionResult.set(result);
        return result;
    }

    public JobClient executeAsync(String jobName) throws Exception {
        JobClient jobClient = super.executeAsync(jobName);
        CompletableFuture jobExecutionResultFuture = jobClient.getJobExecutionResult();
        jobExecutionResultFuture.thenAccept(e -> lastJobExecutionResult.set((JobExecutionResult)e));
        return jobClient;
    }

    public JobExecutionResult getLastJobExecutionResult() {
        return lastJobExecutionResult.get();
    }
}

