/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.program.rest;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.ConnectException;
import java.net.MalformedURLException;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClientConfiguration;
import org.apache.flink.client.program.rest.UrlPrefixDecorator;
import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
import org.apache.flink.client.program.rest.retry.WaitStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.highavailability.ClientHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.ClientHighAvailabilityServicesFactory;
import org.apache.flink.runtime.highavailability.DefaultClientHighAvailabilityServicesFactory;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobStatusInfo;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.HttpHeader;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
import org.apache.flink.runtime.rest.messages.CustomHeadersDecorator;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters;
import org.apache.flink.runtime.rest.messages.JobCancellationHeaders;
import org.apache.flink.runtime.rest.messages.JobCancellationMessageParameters;
import org.apache.flink.runtime.rest.messages.JobClientHeartbeatHeaders;
import org.apache.flink.runtime.rest.messages.JobClientHeartbeatParameters;
import org.apache.flink.runtime.rest.messages.JobClientHeartbeatRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointInfo;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody;
import org.apache.flink.runtime.rest.messages.cluster.ShutdownHeaders;
import org.apache.flink.runtime.rest.messages.dataset.ClusterDataSetDeleteStatusHeaders;
import org.apache.flink.runtime.rest.messages.dataset.ClusterDataSetDeleteStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.dataset.ClusterDataSetDeleteTriggerHeaders;
import org.apache.flink.runtime.rest.messages.dataset.ClusterDataSetDeleteTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.dataset.ClusterDataSetEntry;
import org.apache.flink.runtime.rest.messages.dataset.ClusterDataSetListHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
import org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders;
import org.apache.flink.runtime.rest.messages.job.JobStatusInfoHeaders;
import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationHeaders;
import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationMessageParameters;
import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
import org.apache.flink.runtime.rest.messages.job.savepoints.stop.StopWithSavepointRequestBody;
import org.apache.flink.runtime.rest.messages.job.savepoints.stop.StopWithSavepointTriggerHeaders;
import org.apache.flink.runtime.rest.messages.queue.AsynchronouslyCreatedResource;
import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FixedRetryStrategy;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.util.function.CheckedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RestClusterClient<T>
implements ClusterClient<T> {
    private static final Logger LOG = LoggerFactory.getLogger(RestClusterClient.class);
    private final RestClusterClientConfiguration restClusterClientConfiguration;
    private final Configuration configuration;
    private final RestClient restClient;
    private final ExecutorService executorService = Executors.newFixedThreadPool(4, new ExecutorThreadFactory("Flink-RestClusterClient-IO"));
    private final WaitStrategy waitStrategy;
    private final T clusterId;
    private final ClientHighAvailabilityServices clientHAServices;
    private final LeaderRetrievalService webMonitorRetrievalService;
    private final LeaderRetriever webMonitorLeaderRetriever = new LeaderRetriever();
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final ScheduledExecutorService retryExecutorService;
    private final Predicate<Throwable> unknownJobStateRetryable = exception -> ExceptionUtils.findThrowable(exception, JobStateUnknownException.class).isPresent();
    private final URL jobmanagerUrl;
    private final Collection<HttpHeader> customHttpHeaders;

    public RestClusterClient(Configuration config, T clusterId) throws Exception {
        this(config, clusterId, DefaultClientHighAvailabilityServicesFactory.INSTANCE);
    }

    public RestClusterClient(Configuration config, T clusterId, ClientHighAvailabilityServicesFactory factory) throws Exception {
        this(config, null, clusterId, new ExponentialWaitStrategy(10L, 2000L), factory, null);
    }

    public RestClusterClient(Configuration config, T clusterId, ClientHighAvailabilityServicesFactory factory, EventLoopGroup group) throws Exception {
        this(config, null, clusterId, new ExponentialWaitStrategy(10L, 2000L), factory, group);
    }

    @VisibleForTesting
    RestClusterClient(Configuration configuration, @Nullable RestClient restClient, T clusterId, WaitStrategy waitStrategy) throws Exception {
        this(configuration, restClient, clusterId, waitStrategy, DefaultClientHighAvailabilityServicesFactory.INSTANCE, null);
    }

    private RestClusterClient(Configuration configuration, @Nullable RestClient restClient, T clusterId, WaitStrategy waitStrategy, ClientHighAvailabilityServicesFactory clientHAServicesFactory, @Nullable EventLoopGroup group) throws Exception {
        this.configuration = Preconditions.checkNotNull(configuration);
        this.restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(configuration);
        this.customHttpHeaders = ClientUtils.readHeadersFromEnvironmentVariable("FLINK_REST_CLIENT_HEADERS");
        this.jobmanagerUrl = new URL(SecurityOptions.isRestSSLEnabled(configuration) ? "https" : "http", configuration.get(JobManagerOptions.ADDRESS), configuration.get(JobManagerOptions.PORT), configuration.get(RestOptions.PATH));
        this.restClient = restClient != null ? restClient : RestClient.forUrl(configuration, this.executorService, this.jobmanagerUrl, group);
        this.waitStrategy = Preconditions.checkNotNull(waitStrategy);
        this.clusterId = Preconditions.checkNotNull(clusterId);
        this.clientHAServices = clientHAServicesFactory.create(configuration, exception -> this.webMonitorLeaderRetriever.handleError(new FlinkException("Fatal error happened with client HA services.", exception)));
        this.webMonitorRetrievalService = this.clientHAServices.getClusterRestEndpointLeaderRetriever();
        this.retryExecutorService = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-RestClusterClient-Retry"));
        this.startLeaderRetrievers();
    }

    private void startLeaderRetrievers() throws Exception {
        this.webMonitorRetrievalService.start(this.webMonitorLeaderRetriever);
    }

    @Override
    public Configuration getFlinkConfiguration() {
        return new Configuration(this.configuration);
    }

    @Override
    public void close() {
        if (this.running.compareAndSet(true, false)) {
            ExecutorUtils.gracefulShutdown(this.restClusterClientConfiguration.getRetryDelay(), TimeUnit.MILLISECONDS, this.retryExecutorService);
            this.restClient.shutdown(Duration.ofSeconds(5L));
            ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, this.executorService);
            try {
                this.webMonitorRetrievalService.stop();
            }
            catch (Exception e) {
                LOG.error("An error occurred during stopping the WebMonitorRetrievalService", (Throwable)e);
            }
            try {
                this.clientHAServices.close();
            }
            catch (Exception e) {
                LOG.error("An error occurred during stopping the ClientHighAvailabilityServices", (Throwable)e);
            }
        }
    }

    public CompletableFuture<JobDetailsInfo> getJobDetails(JobID jobId) {
        JobDetailsHeaders detailsHeaders = JobDetailsHeaders.getInstance();
        JobMessageParameters params = new JobMessageParameters();
        params.jobPathParameter.resolve(jobId);
        return this.sendRequest((MessageHeaders)detailsHeaders, (RequestBody)((Object)params));
    }

    @Override
    public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
        CheckedSupplier operation = () -> this.requestJobStatus(jobId);
        return this.retry(operation, this.unknownJobStateRetryable);
    }

    @Override
    public CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId) {
        CheckedSupplier operation = () -> this.requestJobResultInternal(jobId);
        return this.retry(operation, this.unknownJobStateRetryable);
    }

    @Override
    public CompletableFuture<JobID> submitJob(@Nonnull ExecutionPlan executionPlan) {
        CompletableFuture<java.nio.file.Path> executionPlanFileFuture = CompletableFuture.supplyAsync(() -> {
            try {
                java.nio.file.Path executionPlanFile = Files.createTempFile("flink-executionPlan-" + executionPlan.getJobID(), ".bin", new FileAttribute[0]);
                try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(executionPlanFile, new OpenOption[0]));){
                    objectOut.writeObject(executionPlan);
                }
                return executionPlanFile;
            }
            catch (IOException e) {
                throw new CompletionException(new FlinkException("Failed to serialize ExecutionPlan.", e));
            }
        }, this.executorService);
        CompletionStage requestFuture = executionPlanFileFuture.thenApply(executionPlanFile -> {
            ArrayList<String> jarFileNames = new ArrayList<String>(8);
            ArrayList<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames = new ArrayList<JobSubmitRequestBody.DistributedCacheFile>(8);
            ArrayList<FileUpload> filesToUpload = new ArrayList<FileUpload>(8);
            filesToUpload.add(new FileUpload((java.nio.file.Path)executionPlanFile, "application/octet-stream"));
            for (Path path : executionPlan.getUserJars()) {
                jarFileNames.add(path.getName());
                filesToUpload.add(new FileUpload(Paths.get(path.toUri()), "application/java-archive"));
            }
            for (Map.Entry entry : executionPlan.getUserArtifacts().entrySet()) {
                Path artifactFilePath = new Path(((DistributedCache.DistributedCacheEntry)entry.getValue()).filePath);
                try {
                    if (artifactFilePath.getFileSystem().isDistributedFS()) continue;
                    artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile((String)entry.getKey(), artifactFilePath.getName()));
                    filesToUpload.add(new FileUpload(Paths.get(artifactFilePath.getPath(), new String[0]), "application/octet-stream"));
                }
                catch (IOException e) {
                    throw new CompletionException(new FlinkException("Failed to get the FileSystem of artifact " + artifactFilePath + ".", e));
                }
            }
            JobSubmitRequestBody requestBody = new JobSubmitRequestBody(executionPlanFile.getFileName().toString(), jarFileNames, artifactFileNames);
            return Tuple2.of(requestBody, Collections.unmodifiableCollection(filesToUpload));
        });
        CompletionStage submissionFuture = ((CompletableFuture)requestFuture).thenCompose(requestAndFileUploads -> {
            LOG.info("Submitting job '{}' ({}).", (Object)executionPlan.getName(), (Object)executionPlan.getJobID());
            return this.sendRetriableRequest(JobSubmitHeaders.getInstance(), EmptyMessageParameters.getInstance(), (RequestBody)requestAndFileUploads.f0, (Collection)requestAndFileUploads.f1, RestClusterClient.isConnectionProblemOrServiceUnavailable(), (receiver, error) -> {
                if (error != null) {
                    LOG.warn("Attempt to submit job '{}' ({}) to '{}' has failed.", new Object[]{executionPlan.getName(), executionPlan.getJobID(), receiver, error});
                } else {
                    LOG.info("Successfully submitted job '{}' ({}) to '{}'.", new Object[]{executionPlan.getName(), executionPlan.getJobID(), receiver});
                }
            });
        });
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)submissionFuture).exceptionally(ignored -> null)).thenCompose(ignored -> executionPlanFileFuture)).thenAccept(executionPlanFile -> {
            try {
                Files.delete(executionPlanFile);
            }
            catch (IOException e) {
                LOG.warn("Could not delete temporary file {}.", executionPlanFile, (Object)e);
            }
        });
        return ((CompletableFuture)((CompletableFuture)submissionFuture).thenApply(ignore -> executionPlan.getJobID())).exceptionally(throwable -> {
            throw new CompletionException(new JobSubmissionException(executionPlan.getJobID(), "Failed to submit ExecutionPlan.", ExceptionUtils.stripCompletionException(throwable)));
        });
    }

    @Override
    public CompletableFuture<Acknowledge> cancel(JobID jobID) {
        JobCancellationMessageParameters params = new JobCancellationMessageParameters().resolveJobId(jobID).resolveTerminationMode(TerminationModeQueryParameter.TerminationMode.CANCEL);
        CompletableFuture responseFuture = this.sendRequest((MessageHeaders)JobCancellationHeaders.getInstance(), (RequestBody)((Object)params));
        return responseFuture.thenApply(ignore -> Acknowledge.get());
    }

    @Override
    public CompletableFuture<String> stopWithSavepoint(JobID jobId, boolean advanceToEndOfTime, @Nullable String savepointDirectory, SavepointFormatType formatType) {
        return this.stopWithSavepoint(jobId, advanceToEndOfTime, savepointDirectory, formatType, false);
    }

    @Override
    public CompletableFuture<String> stopWithDetachedSavepoint(JobID jobId, boolean advanceToEndOfTime, @Nullable String savepointDirectory, SavepointFormatType formatType) {
        return this.stopWithSavepoint(jobId, advanceToEndOfTime, savepointDirectory, formatType, true);
    }

    @Override
    public CompletableFuture<String> cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory, SavepointFormatType formatType) {
        return this.triggerSavepoint(jobId, savepointDirectory, true, formatType, false);
    }

    @Override
    public CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory, SavepointFormatType formatType) {
        return this.triggerSavepoint(jobId, savepointDirectory, false, formatType, false);
    }

    @Override
    public CompletableFuture<Long> triggerCheckpoint(JobID jobId, CheckpointType checkpointType) {
        CheckpointTriggerHeaders checkpointTriggerHeaders = CheckpointTriggerHeaders.getInstance();
        CheckpointTriggerMessageParameters checkpointTriggerMessageParameters = checkpointTriggerHeaders.getUnresolvedMessageParameters();
        checkpointTriggerMessageParameters.jobID.resolve(jobId);
        CompletableFuture responseFuture = this.sendRequest(checkpointTriggerHeaders, checkpointTriggerMessageParameters, new CheckpointTriggerRequestBody(checkpointType, null));
        return ((CompletableFuture)responseFuture.thenCompose(checkpointTriggerResponseBody -> {
            TriggerId checkpointTriggerId = checkpointTriggerResponseBody.getTriggerId();
            return this.pollCheckpointAsync(jobId, checkpointTriggerId);
        })).thenApply(checkpointInfo -> {
            if (checkpointInfo.getFailureCause() != null) {
                throw new CompletionException(checkpointInfo.getFailureCause());
            }
            return checkpointInfo.getCheckpointId();
        });
    }

    @Override
    public CompletableFuture<String> triggerDetachedSavepoint(JobID jobId, @Nullable String savepointDirectory, SavepointFormatType formatType) {
        return this.triggerSavepoint(jobId, savepointDirectory, false, formatType, true);
    }

    @Override
    public CompletableFuture<CoordinationResponse> sendCoordinationRequest(JobID jobId, String operatorUid, CoordinationRequest request) {
        SerializedValue<CoordinationRequest> serializedRequest;
        ClientCoordinationHeaders headers = ClientCoordinationHeaders.getInstance();
        ClientCoordinationMessageParameters params = new ClientCoordinationMessageParameters();
        params.jobPathParameter.resolve(jobId);
        params.operatorUidPathParameter.resolve(operatorUid);
        try {
            serializedRequest = new SerializedValue<CoordinationRequest>(request);
        }
        catch (IOException e) {
            return FutureUtils.completedExceptionally(e);
        }
        ClientCoordinationRequestBody requestBody = new ClientCoordinationRequestBody(serializedRequest);
        return this.sendRequest(headers, params, requestBody).thenApply(responseBody -> {
            try {
                return responseBody.getSerializedCoordinationResponse().deserializeValue(this.getClass().getClassLoader());
            }
            catch (IOException | ClassNotFoundException e) {
                throw new CompletionException("Failed to deserialize coordination response", e);
            }
        });
    }

    public CompletableFuture<String> stopWithSavepoint(JobID jobId, boolean advanceToEndOfTime, @Nullable String savepointDirectory, SavepointFormatType formatType, boolean isDetachedMode) {
        StopWithSavepointTriggerHeaders stopWithSavepointTriggerHeaders = StopWithSavepointTriggerHeaders.getInstance();
        SavepointTriggerMessageParameters stopWithSavepointTriggerMessageParameters = stopWithSavepointTriggerHeaders.getUnresolvedMessageParameters();
        stopWithSavepointTriggerMessageParameters.jobID.resolve(jobId);
        CompletableFuture<TriggerResponse> responseFuture = this.sendRequest(stopWithSavepointTriggerHeaders, stopWithSavepointTriggerMessageParameters, new StopWithSavepointRequestBody(savepointDirectory, advanceToEndOfTime, formatType, null));
        return this.getSavepointTriggerFuture(jobId, isDetachedMode, responseFuture);
    }

    private CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory, boolean cancelJob, SavepointFormatType formatType, boolean isDetachedMode) {
        SavepointTriggerHeaders savepointTriggerHeaders = SavepointTriggerHeaders.getInstance();
        SavepointTriggerMessageParameters savepointTriggerMessageParameters = savepointTriggerHeaders.getUnresolvedMessageParameters();
        savepointTriggerMessageParameters.jobID.resolve(jobId);
        CompletableFuture<TriggerResponse> responseFuture = this.sendRequest(savepointTriggerHeaders, savepointTriggerMessageParameters, new SavepointTriggerRequestBody(savepointDirectory, cancelJob, formatType, null));
        return this.getSavepointTriggerFuture(jobId, isDetachedMode, responseFuture);
    }

    private CompletableFuture<String> getSavepointTriggerFuture(JobID jobId, boolean isDetachedMode, CompletableFuture<TriggerResponse> responseFuture) {
        CompletionStage futureResult = isDetachedMode ? responseFuture.thenApply(tr -> tr.getTriggerId().toString()) : ((CompletableFuture)responseFuture.thenCompose(savepointTriggerResponseBody -> {
            TriggerId savepointTriggerId = savepointTriggerResponseBody.getTriggerId();
            return this.pollSavepointAsync(jobId, savepointTriggerId);
        })).thenApply(savepointInfo -> {
            if (savepointInfo.getFailureCause() != null) {
                throw new CompletionException(savepointInfo.getFailureCause());
            }
            return savepointInfo.getLocation();
        });
        return futureResult;
    }

    @Override
    public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader) {
        JobAccumulatorsHeaders accumulatorsHeaders = JobAccumulatorsHeaders.getInstance();
        JobAccumulatorsMessageParameters accMsgParams = accumulatorsHeaders.getUnresolvedMessageParameters();
        accMsgParams.jobPathParameter.resolve(jobID);
        accMsgParams.includeSerializedAccumulatorsParameter.resolve(Collections.singletonList(true));
        CompletableFuture responseFuture = this.sendRequest((MessageHeaders)accumulatorsHeaders, (RequestBody)((Object)accMsgParams));
        return ((CompletableFuture)responseFuture.thenApply(JobAccumulatorsInfo::getSerializedUserAccumulators)).thenApply(accumulators -> {
            try {
                return AccumulatorHelper.deserializeAndUnwrapAccumulators(accumulators, loader);
            }
            catch (Exception e) {
                throw new CompletionException("Cannot deserialize and unwrap accumulators properly.", e);
            }
        });
    }

    private CompletableFuture<SavepointInfo> pollSavepointAsync(JobID jobId, TriggerId triggerID) {
        return this.pollResourceAsync(() -> {
            SavepointStatusHeaders savepointStatusHeaders = SavepointStatusHeaders.getInstance();
            SavepointStatusMessageParameters savepointStatusMessageParameters = savepointStatusHeaders.getUnresolvedMessageParameters();
            savepointStatusMessageParameters.jobIdPathParameter.resolve(jobId);
            savepointStatusMessageParameters.triggerIdPathParameter.resolve(triggerID);
            return this.sendRequest((MessageHeaders)savepointStatusHeaders, (RequestBody)((Object)savepointStatusMessageParameters));
        });
    }

    private CompletableFuture<CheckpointInfo> pollCheckpointAsync(JobID jobId, TriggerId triggerID) {
        return this.pollResourceAsync(() -> {
            CheckpointStatusHeaders checkpointStatusHeaders = CheckpointStatusHeaders.getInstance();
            CheckpointStatusMessageParameters checkpointStatusMessageParameters = checkpointStatusHeaders.getUnresolvedMessageParameters();
            checkpointStatusMessageParameters.jobIdPathParameter.resolve(jobId);
            checkpointStatusMessageParameters.triggerIdPathParameter.resolve(triggerID);
            return this.sendRequest((MessageHeaders)checkpointStatusHeaders, (RequestBody)((Object)checkpointStatusMessageParameters));
        });
    }

    @Override
    public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
        return this.sendRequest(JobsOverviewHeaders.getInstance()).thenApply(multipleJobsDetails -> multipleJobsDetails.getJobs().stream().map(detail -> new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())).collect(Collectors.toList()));
    }

    @Override
    public T getClusterId() {
        return this.clusterId;
    }

    @Override
    public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) {
        SavepointDisposalRequest savepointDisposalRequest = new SavepointDisposalRequest(savepointPath);
        CompletableFuture savepointDisposalTriggerFuture = this.sendRequest(SavepointDisposalTriggerHeaders.getInstance(), savepointDisposalRequest);
        CompletionStage savepointDisposalFuture = savepointDisposalTriggerFuture.thenCompose(triggerResponse -> {
            TriggerId triggerId = triggerResponse.getTriggerId();
            SavepointDisposalStatusHeaders savepointDisposalStatusHeaders = SavepointDisposalStatusHeaders.getInstance();
            SavepointDisposalStatusMessageParameters savepointDisposalStatusMessageParameters = savepointDisposalStatusHeaders.getUnresolvedMessageParameters();
            savepointDisposalStatusMessageParameters.triggerIdPathParameter.resolve(triggerId);
            return this.pollResourceAsync(() -> this.sendRequest((MessageHeaders)savepointDisposalStatusHeaders, (RequestBody)((Object)savepointDisposalStatusMessageParameters)));
        });
        return ((CompletableFuture)savepointDisposalFuture).thenApply(asynchronousOperationInfo -> {
            if (asynchronousOperationInfo.getFailureCause() == null) {
                return Acknowledge.get();
            }
            throw new CompletionException(asynchronousOperationInfo.getFailureCause());
        });
    }

    @Override
    public CompletableFuture<Set<AbstractID>> listCompletedClusterDatasetIds() {
        return this.sendRequest(ClusterDataSetListHeaders.INSTANCE).thenApply(clusterDataSetListResponseBody -> clusterDataSetListResponseBody.getDataSets().stream().filter(ClusterDataSetEntry::isComplete).map(ClusterDataSetEntry::getDataSetId).map(id -> new AbstractID(StringUtils.hexStringToByte(id))).collect(Collectors.toSet()));
    }

    @Override
    public CompletableFuture<Void> invalidateClusterDataset(AbstractID clusterDatasetId) {
        ClusterDataSetDeleteTriggerHeaders triggerHeader = ClusterDataSetDeleteTriggerHeaders.INSTANCE;
        ClusterDataSetDeleteTriggerMessageParameters parameters = triggerHeader.getUnresolvedMessageParameters();
        parameters.clusterDataSetIdPathParameter.resolve(new IntermediateDataSetID(clusterDatasetId));
        CompletableFuture triggerFuture = this.sendRequest((MessageHeaders)triggerHeader, (RequestBody)((Object)parameters));
        CompletionStage clusterDatasetDeleteFuture = triggerFuture.thenCompose(triggerResponse -> {
            TriggerId triggerId = triggerResponse.getTriggerId();
            ClusterDataSetDeleteStatusHeaders statusHeaders = ClusterDataSetDeleteStatusHeaders.INSTANCE;
            ClusterDataSetDeleteStatusMessageParameters statusMessageParameters = statusHeaders.getUnresolvedMessageParameters();
            statusMessageParameters.triggerIdPathParameter.resolve(triggerId);
            return this.pollResourceAsync(() -> this.sendRequest((MessageHeaders)statusHeaders, (RequestBody)((Object)statusMessageParameters)));
        });
        return ((CompletableFuture)clusterDatasetDeleteFuture).thenApply(asynchronousOperationInfo -> {
            if (asynchronousOperationInfo.getFailureCause() == null) {
                return null;
            }
            throw new CompletionException(asynchronousOperationInfo.getFailureCause());
        });
    }

    @Override
    public CompletableFuture<Void> reportHeartbeat(JobID jobId, long expiredTimestamp) {
        JobClientHeartbeatParameters params = new JobClientHeartbeatParameters().resolveJobId(jobId);
        CompletableFuture responseFuture = this.sendRequest(JobClientHeartbeatHeaders.getInstance(), params, new JobClientHeartbeatRequestBody(expiredTimestamp));
        return responseFuture.thenApply(ignore -> null);
    }

    @Override
    public void shutDownCluster() {
        try {
            this.sendRequest(ShutdownHeaders.getInstance()).get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException e) {
            LOG.error("Error while shutting down cluster", (Throwable)e);
        }
    }

    private <R, A extends AsynchronouslyCreatedResource<R>> CompletableFuture<R> pollResourceAsync(Supplier<CompletableFuture<A>> resourceFutureSupplier) {
        return this.pollResourceAsync(resourceFutureSupplier, new CompletableFuture(), 0L);
    }

    private <R, A extends AsynchronouslyCreatedResource<R>> CompletableFuture<R> pollResourceAsync(Supplier<CompletableFuture<A>> resourceFutureSupplier, CompletableFuture<R> resultFuture, long attempt) {
        resourceFutureSupplier.get().whenComplete((asynchronouslyCreatedResource, throwable) -> {
            if (throwable != null) {
                resultFuture.completeExceptionally((Throwable)throwable);
            } else if (asynchronouslyCreatedResource.queueStatus().getId() == QueueStatus.Id.COMPLETED) {
                resultFuture.complete(asynchronouslyCreatedResource.resource());
            } else {
                this.retryExecutorService.schedule(() -> this.lambda$pollResourceAsync$34((Supplier)resourceFutureSupplier, resultFuture, attempt), this.waitStrategy.sleepTime(attempt), TimeUnit.MILLISECONDS);
            }
        });
        return resultFuture;
    }

    public CompletableFuture<Acknowledge> updateJobResourceRequirements(JobID jobId, JobResourceRequirements jobResourceRequirements) {
        JobMessageParameters params = new JobMessageParameters();
        params.jobPathParameter.resolve(jobId);
        return this.sendRequest(JobResourcesRequirementsUpdateHeaders.INSTANCE, params, new JobResourceRequirementsBody(jobResourceRequirements)).thenApply(ignored -> Acknowledge.get());
    }

    @VisibleForTesting
    URL getJobmanagerUrl() {
        return this.jobmanagerUrl;
    }

    @VisibleForTesting
    Collection<HttpHeader> getCustomHttpHeaders() {
        return this.customHttpHeaders;
    }

    public CompletableFuture<ClusterOverviewWithVersion> getClusterOverview() {
        return this.sendRequest(ClusterOverviewHeaders.getInstance(), EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
    }

    @Override
    public String getWebInterfaceURL() {
        try {
            return this.getWebMonitorBaseUrl().get().toString();
        }
        catch (InterruptedException | ExecutionException e) {
            ExceptionUtils.checkInterrupted(e);
            LOG.warn("Could not retrieve the web interface URL for the cluster.", (Throwable)e);
            return "Unknown address.";
        }
    }

    private CompletableFuture<JobStatus> requestJobStatus(JobID jobId) {
        JobStatusInfoHeaders jobStatusInfoHeaders = JobStatusInfoHeaders.getInstance();
        JobMessageParameters params = new JobMessageParameters();
        params.jobPathParameter.resolve(jobId);
        return ((CompletableFuture)this.sendRequest((MessageHeaders)jobStatusInfoHeaders, (RequestBody)((Object)params)).thenApply(JobStatusInfo::getJobStatus)).thenApply(jobStatus -> {
            if (jobStatus == JobStatus.SUSPENDED) {
                throw new JobStateUnknownException(String.format("Job %s is in state SUSPENDED", jobId));
            }
            return jobStatus;
        });
    }

    private CompletableFuture<JobResult> requestJobResultInternal(@Nonnull JobID jobId) {
        return this.pollResourceAsync(() -> {
            JobMessageParameters messageParameters = new JobMessageParameters();
            messageParameters.jobPathParameter.resolve(jobId);
            return this.sendRequest((MessageHeaders)JobExecutionResultHeaders.getInstance(), (RequestBody)((Object)messageParameters));
        }).thenApply(jobResult -> {
            if (jobResult.getApplicationStatus() == ApplicationStatus.UNKNOWN) {
                throw new JobStateUnknownException(String.format("Result for Job %s is UNKNOWN", jobId));
            }
            return jobResult;
        });
    }

    private <M extends MessageHeaders<EmptyRequestBody, P, U>, U extends MessageParameters, P extends ResponseBody> CompletableFuture<P> sendRequest(M messageHeaders, U messageParameters) {
        return this.sendRequest(messageHeaders, messageParameters, EmptyRequestBody.getInstance());
    }

    private <M extends MessageHeaders<R, P, EmptyMessageParameters>, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(M messageHeaders, R request) {
        return this.sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), request);
    }

    @VisibleForTesting
    <M extends MessageHeaders<EmptyRequestBody, P, EmptyMessageParameters>, P extends ResponseBody> CompletableFuture<P> sendRequest(M messageHeaders) {
        return this.sendRequest(messageHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
    }

    @VisibleForTesting
    public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(M messageHeaders, U messageParameters, R request) {
        return this.sendRetriableRequest(messageHeaders, messageParameters, request, RestClusterClient.isConnectionProblemOrServiceUnavailable());
    }

    private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRetriableRequest(M messageHeaders, U messageParameters, R request, Predicate<Throwable> retryPredicate) {
        return this.sendRetriableRequest(messageHeaders, messageParameters, request, Collections.emptyList(), retryPredicate, (receiver, error) -> {});
    }

    private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRetriableRequest(M messageHeaders, U messageParameters, R request, Collection<FileUpload> filesToUpload, Predicate<Throwable> retryPredicate, BiConsumer<String, Throwable> consumer) {
        return this.retry(() -> this.getWebMonitorBaseUrl().thenCompose(webMonitorBaseUrl -> {
            try {
                CustomHeadersDecorator headers = new CustomHeadersDecorator(new UrlPrefixDecorator(messageHeaders, this.jobmanagerUrl.getPath()));
                headers.setCustomHeaders(this.customHttpHeaders);
                CompletableFuture future = this.restClient.sendRequest(webMonitorBaseUrl.getHost(), webMonitorBaseUrl.getPort(), headers, messageParameters, request, filesToUpload);
                future.whenComplete((result, error) -> consumer.accept(webMonitorBaseUrl.toString(), (Throwable)error));
                return future;
            }
            catch (IOException e) {
                throw new CompletionException(e);
            }
        }), retryPredicate);
    }

    private <C> CompletableFuture<C> retry(CheckedSupplier<CompletableFuture<C>> operation, Predicate<Throwable> retryPredicate) {
        return FutureUtils.retryWithDelay(CheckedSupplier.unchecked(operation), new FixedRetryStrategy(this.restClusterClientConfiguration.getRetryMaxAttempts(), Duration.ofMillis(this.restClusterClientConfiguration.getRetryDelay())), retryPredicate, new ScheduledExecutorServiceAdapter(this.retryExecutorService));
    }

    private static Predicate<Throwable> isConnectionProblemOrServiceUnavailable() {
        return RestClusterClient.isConnectionProblemException().or(RestClusterClient.isServiceUnavailable());
    }

    private static Predicate<Throwable> isConnectionProblemException() {
        return throwable -> ExceptionUtils.findThrowable(throwable, ConnectException.class).isPresent() || ExceptionUtils.findThrowable(throwable, SocketTimeoutException.class).isPresent() || ExceptionUtils.findThrowable(throwable, ConnectTimeoutException.class).isPresent() || ExceptionUtils.findThrowable(throwable, IOException.class).isPresent();
    }

    private static Predicate<Throwable> isServiceUnavailable() {
        return RestClusterClient.httpExceptionCodePredicate(code -> code.intValue() == HttpResponseStatus.SERVICE_UNAVAILABLE.code());
    }

    private static Predicate<Throwable> httpExceptionCodePredicate(Predicate<Integer> statusCodePredicate) {
        return throwable -> ExceptionUtils.findThrowable(throwable, RestClientException.class).map(restClientException -> {
            int code = restClientException.getHttpResponseStatus().code();
            return statusCodePredicate.test(code);
        }).orElse(false);
    }

    @VisibleForTesting
    CompletableFuture<URL> getWebMonitorBaseUrl() {
        return FutureUtils.orTimeout(this.webMonitorLeaderRetriever.getLeaderFuture(), this.restClusterClientConfiguration.getAwaitLeaderTimeout(), TimeUnit.MILLISECONDS, String.format("Waiting for leader address of WebMonitorEndpoint timed out after %d ms.", this.restClusterClientConfiguration.getAwaitLeaderTimeout())).thenApplyAsync(leaderAddressSessionId -> {
            String url = (String)leaderAddressSessionId.f0;
            try {
                return new URL(url);
            }
            catch (MalformedURLException e) {
                throw new IllegalArgumentException("Could not parse URL from " + url, e);
            }
        }, (Executor)this.executorService);
    }

    private /* synthetic */ void lambda$pollResourceAsync$34(Supplier resourceFutureSupplier, CompletableFuture resultFuture, long attempt) {
        this.pollResourceAsync(resourceFutureSupplier, resultFuture, attempt + 1L);
    }

    private static class JobStateUnknownException
    extends RuntimeException {
        public JobStateUnknownException(String message) {
            super(message);
        }
    }
}

