/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
import org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphOptimizationStrategy;
import org.apache.flink.streaming.api.graph.StreamGraphContext;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.apache.flink.util.Preconditions;

public class StreamGraphOptimizer {
    private final List<StreamGraphOptimizationStrategy> optimizationStrategies;

    public StreamGraphOptimizer(Configuration jobConfiguration, ClassLoader userClassLoader) throws DynamicCodeLoadingException {
        Preconditions.checkNotNull(jobConfiguration);
        Optional<List<String>> optional = jobConfiguration.getOptional(StreamGraphOptimizationStrategy.STREAM_GRAPH_OPTIMIZATION_STRATEGY);
        this.optimizationStrategies = optional.isPresent() ? this.loadOptimizationStrategies(optional.get(), userClassLoader) : new ArrayList<StreamGraphOptimizationStrategy>();
    }

    public void initializeStrategies(StreamGraphContext context) {
        Preconditions.checkNotNull(this.optimizationStrategies).forEach(strategy -> strategy.initialize(context));
    }

    public void onOperatorsFinished(OperatorsFinished operatorsFinished, StreamGraphContext context) throws Exception {
        for (StreamGraphOptimizationStrategy strategy : this.optimizationStrategies) {
            strategy.onOperatorsFinished(operatorsFinished, context);
        }
    }

    private List<StreamGraphOptimizationStrategy> loadOptimizationStrategies(List<String> strategyClassNames, ClassLoader userClassLoader) throws DynamicCodeLoadingException {
        ArrayList<StreamGraphOptimizationStrategy> strategies = new ArrayList<StreamGraphOptimizationStrategy>(strategyClassNames.size());
        for (String strategyClassName : strategyClassNames) {
            strategies.add(this.loadOptimizationStrategy(strategyClassName, userClassLoader));
        }
        return strategies;
    }

    private StreamGraphOptimizationStrategy loadOptimizationStrategy(String strategyClassName, ClassLoader userClassLoader) throws DynamicCodeLoadingException {
        try {
            Class<StreamGraphOptimizationStrategy> clazz = Class.forName(strategyClassName, false, userClassLoader).asSubclass(StreamGraphOptimizationStrategy.class);
            return clazz.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        }
        catch (ClassNotFoundException e) {
            throw new DynamicCodeLoadingException("Cannot find configured stream graph optimization strategy class: " + strategyClassName, e);
        }
        catch (ClassCastException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new DynamicCodeLoadingException("The configured class '" + strategyClassName + "' is not a valid stream graph optimization strategy", e);
        }
    }
}

