/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.deployment.application;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.client.deployment.application.JobStatusPollingUtils;
import org.apache.flink.client.deployment.application.UnsuccessfulExecutionException;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.TriggerSavepointMode;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;

@Internal
public class EmbeddedJobClient
implements JobClient,
CoordinationRequestGateway {
    private final JobID jobId;
    private final DispatcherGateway dispatcherGateway;
    private final ScheduledExecutor retryExecutor;
    private final Duration timeout;
    private final ClassLoader classLoader;

    public EmbeddedJobClient(JobID jobId, DispatcherGateway dispatcherGateway, ScheduledExecutor retryExecutor, Duration rpcTimeout, ClassLoader classLoader) {
        this.jobId = Preconditions.checkNotNull(jobId);
        this.dispatcherGateway = Preconditions.checkNotNull(dispatcherGateway);
        this.retryExecutor = Preconditions.checkNotNull(retryExecutor);
        this.timeout = Preconditions.checkNotNull(rpcTimeout);
        this.classLoader = classLoader;
    }

    @Override
    public JobID getJobID() {
        return this.jobId;
    }

    @Override
    public CompletableFuture<JobStatus> getJobStatus() {
        return this.dispatcherGateway.requestJobStatus(this.jobId, this.timeout);
    }

    @Override
    public CompletableFuture<Void> cancel() {
        return this.dispatcherGateway.cancelJob(this.jobId, this.timeout).thenApply(ignores -> null);
    }

    @Override
    public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, @Nullable String savepointDirectory, SavepointFormatType formatType) {
        return this.dispatcherGateway.stopWithSavepointAndGetLocation(this.jobId, savepointDirectory, formatType, advanceToEndOfEventTime ? TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT : TriggerSavepointMode.SUSPEND_WITH_SAVEPOINT, this.timeout);
    }

    @Override
    public CompletableFuture<String> triggerSavepoint(@Nullable String savepointDirectory, SavepointFormatType formatType) {
        return this.dispatcherGateway.triggerSavepointAndGetLocation(this.jobId, savepointDirectory, formatType, TriggerSavepointMode.SAVEPOINT, this.timeout);
    }

    @Override
    public CompletableFuture<Map<String, Object>> getAccumulators() {
        Preconditions.checkNotNull(this.classLoader);
        return ((CompletableFuture)this.dispatcherGateway.requestJob(this.jobId, this.timeout).thenApply(ArchivedExecutionGraph::getAccumulatorsSerialized)).thenApply(accumulators -> {
            try {
                return AccumulatorHelper.deserializeAndUnwrapAccumulators(accumulators, this.classLoader);
            }
            catch (Exception e) {
                throw new CompletionException("Cannot deserialize and unwrap accumulators properly.", e);
            }
        });
    }

    @Override
    public CompletableFuture<JobExecutionResult> getJobExecutionResult() {
        Preconditions.checkNotNull(this.classLoader);
        Duration retryPeriod = Duration.ofMillis(100L);
        return JobStatusPollingUtils.getJobResult(this.dispatcherGateway, this.jobId, this.retryExecutor, this.timeout, retryPeriod).thenApply(jobResult -> {
            try {
                return jobResult.toJobExecutionResult(this.classLoader);
            }
            catch (Throwable t) {
                throw new CompletionException(UnsuccessfulExecutionException.fromJobResult(jobResult, this.classLoader));
            }
        });
    }

    @Override
    public CompletableFuture<CoordinationResponse> sendCoordinationRequest(String operatorUid, CoordinationRequest request) {
        try {
            SerializedValue<CoordinationRequest> serializedRequest = new SerializedValue<CoordinationRequest>(request);
            return this.dispatcherGateway.deliverCoordinationRequestToCoordinator(this.jobId, operatorUid, serializedRequest, this.timeout);
        }
        catch (IOException e) {
            return FutureUtils.completedExceptionally(e);
        }
    }
}

