package org.apache.flink.client.deployment.executors;

import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientJobClientAdapter;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.CacheSupportedPipelineExecutor;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobStatusChangedListener;
import org.apache.flink.core.execution.JobStatusChangedListenerUtils;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.function.FunctionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.class */
public class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements CacheSupportedPipelineExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractSessionClusterExecutor.class);
    private final ExecutorService executorService = Executors.newFixedThreadPool(1, new ExecutorThreadFactory("Flink-SessionClusterExecutor-IO"));
    private final ClientFactory clusterClientFactory;
    private final Configuration configuration;
    private final List<JobStatusChangedListener> jobStatusChangedListeners;

    public AbstractSessionClusterExecutor(@Nonnull ClientFactory clientfactory, Configuration configuration) {
        this.clusterClientFactory = (ClientFactory) Preconditions.checkNotNull(clientfactory);
        this.configuration = configuration;
        this.jobStatusChangedListeners = JobStatusChangedListenerUtils.createJobStatusChangedListeners(Thread.currentThread().getContextClassLoader(), configuration, this.executorService);
    }

    public CompletableFuture<JobClient> execute(@Nonnull Pipeline pipeline, @Nonnull Configuration configuration, @Nonnull ClassLoader classLoader) throws Exception {
        ExecutionPlan jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration, classLoader);
        ClusterDescriptor createClusterDescriptor2 = this.clusterClientFactory.createClusterDescriptor2(configuration);
        try {
            Object clusterId = this.clusterClientFactory.getClusterId(configuration);
            Preconditions.checkState(clusterId != null);
            ClusterClientProvider retrieve = createClusterDescriptor2.retrieve(clusterId);
            ClusterClient clusterClient = retrieve.getClusterClient();
            CompletableFuture<JobClient> whenCompleteAsync = clusterClient.submitJob(jobGraph).thenApplyAsync(FunctionUtils.uncheckedFunction(jobID -> {
                ClientUtils.waitUntilJobInitializationFinished(() -> {
                    return clusterClient.getJobStatus(jobID).get();
                }, () -> {
                    return clusterClient.requestJobResult(jobID).get();
                }, classLoader);
                return jobID;
            })).thenApplyAsync((Function<? super U, ? extends U>) jobID2 -> {
                return new ClusterClientJobClientAdapter(retrieve, jobID2, classLoader);
            }).whenCompleteAsync((jobClient, th) -> {
                if (th == null) {
                    PipelineExecutorUtils.notifyJobStatusListeners(pipeline, jobGraph, this.jobStatusChangedListeners);
                } else {
                    LOG.error("Failed to submit job graph to remote session cluster.", th);
                }
                clusterClient.close();
            });
            if (createClusterDescriptor2 != null) {
                createClusterDescriptor2.close();
            }
            return whenCompleteAsync;
        } catch (Throwable th2) {
            if (createClusterDescriptor2 != null) {
                try {
                    createClusterDescriptor2.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    public CompletableFuture<Set<AbstractID>> listCompletedClusterDatasetIds(Configuration configuration, ClassLoader classLoader) throws Exception {
        ClusterDescriptor createClusterDescriptor2 = this.clusterClientFactory.createClusterDescriptor2(configuration);
        try {
            Object clusterId = this.clusterClientFactory.getClusterId(configuration);
            Preconditions.checkState(clusterId != null);
            CompletableFuture<Set<AbstractID>> listCompletedClusterDatasetIds = createClusterDescriptor2.retrieve(clusterId).getClusterClient().listCompletedClusterDatasetIds();
            if (createClusterDescriptor2 != null) {
                createClusterDescriptor2.close();
            }
            return listCompletedClusterDatasetIds;
        } catch (Throwable th) {
            if (createClusterDescriptor2 != null) {
                try {
                    createClusterDescriptor2.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public CompletableFuture<Void> invalidateClusterDataset(AbstractID abstractID, Configuration configuration, ClassLoader classLoader) throws Exception {
        ClusterDescriptor createClusterDescriptor2 = this.clusterClientFactory.createClusterDescriptor2(configuration);
        try {
            Object clusterId = this.clusterClientFactory.getClusterId(configuration);
            Preconditions.checkState(clusterId != null);
            CompletableFuture thenApply = createClusterDescriptor2.retrieve(clusterId).getClusterClient().invalidateClusterDataset(new IntermediateDataSetID(abstractID)).thenApply(r2 -> {
                return null;
            });
            if (createClusterDescriptor2 != null) {
                createClusterDescriptor2.close();
            }
            return thenApply;
        } catch (Throwable th) {
            if (createClusterDescriptor2 != null) {
                try {
                    createClusterDescriptor2.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
