/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.engine.server;

import com.hazelcast.cluster.Address;
import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.services.MembershipServiceEvent;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.IMap;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.seatunnel.api.common.metrics.JobMetrics;
import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
import org.apache.seatunnel.api.event.EventProcessor;
import org.apache.seatunnel.api.tracing.MDCExecutorService;
import org.apache.seatunnel.api.tracing.MDCTracer;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.common.utils.StringFormatUtils;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
import org.apache.seatunnel.engine.common.exception.JobException;
import org.apache.seatunnel.engine.common.exception.JobNotFoundException;
import org.apache.seatunnel.engine.common.exception.SavePointFailedException;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.job.JobResult;
import org.apache.seatunnel.engine.common.job.JobStatus;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobInfo;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
import org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
import org.apache.seatunnel.engine.server.event.JobEventHttpReportHandler;
import org.apache.seatunnel.engine.server.event.JobEventProcessor;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.PendingSourceState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.master.JobHistoryService;
import org.apache.seatunnel.engine.server.master.JobMaster;
import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
import org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService;
import org.apache.seatunnel.engine.server.task.operation.GetMetricsOperation;
import org.apache.seatunnel.engine.server.telemetry.metrics.entity.JobCounter;
import org.apache.seatunnel.engine.server.telemetry.metrics.entity.ThreadPoolStatus;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import org.apache.seatunnel.engine.server.utils.PeekBlockingQueue;
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import scala.Tuple2;

public class CoordinatorService {
    private final NodeEngineImpl nodeEngine;
    private final ILogger logger;
    private volatile ResourceManager resourceManager;
    private JobHistoryService jobHistoryService;
    private IMap<Long, JobInfo> runningJobInfoIMap;
    private IMap<Object, Object> runningJobStateIMap;
    private IMap<Object, Long[]> runningJobStateTimestampsIMap;
    private final Map<Long, JobMaster> runningJobMasterMap = new ConcurrentHashMap<Long, JobMaster>();
    protected final Map<Long, Tuple2<PendingSourceState, JobMaster>> pendingJobMasterMap = new ConcurrentHashMap<Long, Tuple2<PendingSourceState, JobMaster>>();
    private IMap<PipelineLocation, Map<TaskGroupLocation, SlotProfile>> ownedSlotProfilesIMap;
    private IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap;
    private volatile boolean isActive = false;
    private ExecutorService executorService;
    private final SeaTunnelServer seaTunnelServer;
    private final ScheduledExecutorService masterActiveListener;
    private final EngineConfig engineConfig;
    private ConnectorPackageService connectorPackageService;
    private EventProcessor eventProcessor;
    private PassiveCompletableFuture restoreAllJobFromMasterNodeSwitchFuture;
    private PeekBlockingQueue<JobMaster> pendingJob = new PeekBlockingQueue();
    private final boolean isWaitStrategy;
    private final ScheduleStrategy scheduleStrategy;

    public CoordinatorService(@NonNull NodeEngineImpl nodeEngine, @NonNull SeaTunnelServer seaTunnelServer, EngineConfig engineConfig) {
        if (nodeEngine == null) {
            throw new NullPointerException("nodeEngine is marked non-null but is null");
        }
        if (seaTunnelServer == null) {
            throw new NullPointerException("seaTunnelServer is marked non-null but is null");
        }
        this.nodeEngine = nodeEngine;
        this.engineConfig = engineConfig;
        this.logger = nodeEngine.getLogger(this.getClass());
        this.executorService = new ThreadPoolExecutor(engineConfig.getCoordinatorServiceConfig().getCoreThreadNum(), engineConfig.getCoordinatorServiceConfig().getMaxThreadNum(), 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder().setNameFormat("seatunnel-coordinator-service-%d").build(), new ThreadPoolStatus.RejectionCountingHandler());
        this.seaTunnelServer = seaTunnelServer;
        this.masterActiveListener = Executors.newSingleThreadScheduledExecutor();
        this.masterActiveListener.scheduleAtFixedRate(this::checkNewActiveMaster, 0L, 100L, TimeUnit.MILLISECONDS);
        this.scheduleStrategy = engineConfig.getScheduleStrategy();
        this.isWaitStrategy = this.scheduleStrategy.equals((Object)ScheduleStrategy.WAIT);
        this.logger.info("Start pending job schedule thread");
        this.startPendingJobScheduleThread();
    }

    private void startPendingJobScheduleThread() {
        Runnable pendingJobScheduleTask = () -> {
            Thread.currentThread().setName("pending-job-schedule-runner");
            while (true) {
                try {
                    while (true) {
                        this.pendingJobSchedule();
                    }
                }
                catch (InterruptedException interrupted) {
                    throw new RuntimeException(interrupted);
                }
                catch (Throwable e) {
                    this.logger.severe("Error in pending job schedule thread", e);
                    try {
                        Thread.sleep(3000L);
                        continue;
                    }
                    catch (InterruptedException ex) {
                        this.logger.severe("Pending job schedule thread interrupted", (Throwable)ex);
                        Thread.currentThread().interrupt();
                        continue;
                    }
                }
                break;
            }
        };
        this.executorService.submit(pendingJobScheduleTask);
    }

    private void pendingJobSchedule() throws InterruptedException {
        JobMaster jobMaster = this.pendingJob.peekBlocking();
        if (Objects.isNull(jobMaster)) {
            this.logger.warning("The peek job master is null");
            Thread.sleep(3000L);
            return;
        }
        Long jobId = jobMaster.getJobId();
        if (!this.pendingJobMasterMap.containsKey(jobId)) {
            this.logger.fine(String.format("Job ID : %s already cancelled", jobId));
            this.queueRemove(jobMaster);
            return;
        }
        this.logger.fine(String.format("Start pending job schedule, pendingJob Size : %s", this.pendingJob.size()));
        this.logger.fine(String.format("Start calculating whether pending task resources are enough: %s", jobId));
        boolean preApplyResources = jobMaster.preApplyResources();
        if (!preApplyResources) {
            this.logger.info(String.format("Current strategy is %s, and resources is not enough, skipping this schedule, JobID: %s", this.scheduleStrategy, jobId));
            if (this.isWaitStrategy) {
                try {
                    Thread.sleep(3000L);
                }
                catch (InterruptedException e) {
                    this.logger.severe(ExceptionUtils.getMessage((Throwable)e));
                }
                return;
            }
            this.queueRemove(jobMaster);
            this.completeFailJob(jobMaster);
            this.pendingJobMasterMap.remove(jobId);
            return;
        }
        this.logger.info(String.format("Resources enough, start running: %s", jobId));
        this.queueRemove(jobMaster);
        PendingSourceState pendingSourceState = (PendingSourceState)((Object)this.pendingJobMasterMap.get((Object)jobId)._1);
        MDCExecutorService mdcExecutorService = MDCTracer.tracing((Long)jobId, (ExecutorService)this.executorService);
        mdcExecutorService.submit(() -> {
            try {
                String jobFullName = jobMaster.getPhysicalPlan().getJobFullName();
                JobStatus jobStatus = (JobStatus)this.runningJobStateIMap.get((Object)jobId);
                if (pendingSourceState == PendingSourceState.RESTORE) {
                    jobMaster.getPhysicalPlan().getPipelineList().forEach(SubPlan::restorePipelineState);
                }
                this.logger.info(String.format("The %s %s is in %s state, restore pipeline and take over this job running", new Object[]{pendingSourceState, jobFullName, jobStatus}));
                this.pendingJobMasterMap.remove(jobId);
                this.runningJobMasterMap.put(jobId, jobMaster);
                jobMaster.run();
            }
            finally {
                if (this.jobMasterCompletedSuccessfully(jobMaster, pendingSourceState)) {
                    this.runningJobMasterMap.remove(jobId);
                }
            }
        });
    }

    private void queueRemove(JobMaster jobMaster) throws InterruptedException {
        JobMaster take = this.pendingJob.take();
        if (take != jobMaster) {
            this.logger.severe("The job master is not equal to the peek job master");
        }
    }

    private void completeFailJob(JobMaster jobMaster) {
        JobResult jobResult = new JobResult(JobStatus.FAILED, ExceptionUtils.getMessage((Throwable)new NoEnoughResourceException()));
        jobMaster.getPhysicalPlan().updateJobState(JobStatus.FAILED);
        jobMaster.getPhysicalPlan().completeJobEndFuture(jobResult);
        this.logger.info(String.format("The job %s is not running because the resources is not enough insufficient", jobMaster.getJobId()));
    }

    private boolean jobMasterCompletedSuccessfully(JobMaster jobMaster, PendingSourceState state) {
        return !jobMaster.getJobMasterCompleteFuture().isCompletedExceptionally() && state == PendingSourceState.RESTORE || !jobMaster.getJobMasterCompleteFuture().isCancelled() && state == PendingSourceState.SUBMIT;
    }

    private JobEventProcessor createJobEventProcessor(String reportHttpEndpoint, Map<String, String> reportHttpHeaders, NodeEngineImpl nodeEngine) {
        List handlers = EventProcessor.loadEventHandlers((ClassLoader)Thread.currentThread().getContextClassLoader());
        if (reportHttpEndpoint != null) {
            String ringBufferName = "zeta-job-event";
            int maxBufferCapacity = 2000;
            nodeEngine.getHazelcastInstance().getConfig().addRingBufferConfig(new Config().getRingbufferConfig(ringBufferName).setCapacity(maxBufferCapacity).setBackupCount(0).setAsyncBackupCount(1).setTimeToLiveSeconds(0));
            Ringbuffer ringbuffer = nodeEngine.getHazelcastInstance().getRingbuffer(ringBufferName);
            JobEventHttpReportHandler httpReportHandler = new JobEventHttpReportHandler(reportHttpEndpoint, reportHttpHeaders, ringbuffer);
            handlers.add(httpReportHandler);
        }
        this.logger.info("Loaded event handlers: " + handlers);
        return new JobEventProcessor(handlers);
    }

    public JobHistoryService getJobHistoryService() {
        return this.jobHistoryService;
    }

    public JobMaster getJobMaster(Long jobId) {
        return Optional.ofNullable(this.pendingJobMasterMap.get(jobId)).map(t -> (JobMaster)t._2).orElse(this.runningJobMasterMap.get(jobId));
    }

    public EventProcessor getEventProcessor() {
        return this.eventProcessor;
    }

    private void initCoordinatorService() {
        this.runningJobInfoIMap = this.nodeEngine.getHazelcastInstance().getMap("engine_runningJobInfo");
        this.runningJobStateIMap = this.nodeEngine.getHazelcastInstance().getMap("engine_runningJobState");
        this.runningJobStateTimestampsIMap = this.nodeEngine.getHazelcastInstance().getMap("engine_stateTimestamps");
        this.ownedSlotProfilesIMap = this.nodeEngine.getHazelcastInstance().getMap("engine_ownedSlotProfilesIMap");
        this.metricsImap = this.nodeEngine.getHazelcastInstance().getMap("engine_runningJobMetrics");
        this.jobHistoryService = new JobHistoryService((NodeEngine)this.nodeEngine, this.runningJobStateIMap, this.logger, this.pendingJobMasterMap, this.runningJobMasterMap, (IMap<Long, JobHistoryService.JobState>)this.nodeEngine.getHazelcastInstance().getMap("engine_finishedJobState"), (IMap<Long, JobMetrics>)this.nodeEngine.getHazelcastInstance().getMap("engine_finishedJobMetrics"), (IMap<Long, JobDAGInfo>)this.nodeEngine.getHazelcastInstance().getMap("engine_finishedJobVertexInfo"), this.engineConfig.getHistoryJobExpireMinutes());
        this.eventProcessor = this.createJobEventProcessor(this.engineConfig.getEventReportHttpApi(), this.engineConfig.getEventReportHttpHeaders(), this.nodeEngine);
        ConnectorJarStorageConfig connectorJarStorageConfig = this.engineConfig.getConnectorJarStorageConfig();
        if (connectorJarStorageConfig.getEnable().booleanValue()) {
            this.connectorPackageService = new ConnectorPackageService(this.seaTunnelServer);
        }
        this.restoreAllJobFromMasterNodeSwitchFuture = new PassiveCompletableFuture(CompletableFuture.runAsync(this::restoreAllRunningJobFromMasterNodeSwitch, (Executor)this.executorService));
    }

    private void restoreAllRunningJobFromMasterNodeSwitch() {
        List needRestoreFromMasterNodeSwitchJobs = this.runningJobInfoIMap.entrySet().stream().filter(entry -> !this.runningJobMasterMap.keySet().contains(entry.getKey())).collect(Collectors.toList());
        if (needRestoreFromMasterNodeSwitchJobs.size() == 0) {
            return;
        }
        while (this.getResourceManager().workerCount(Collections.emptyMap()) == 0) {
            try {
                this.logger.info("Waiting for worker registered");
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                this.logger.severe(ExceptionUtils.getMessage((Throwable)e));
                throw new SeaTunnelEngineException("wait worker register error", (Throwable)e);
            }
        }
        List<CompletableFuture> collect = needRestoreFromMasterNodeSwitchJobs.stream().map(entry -> CompletableFuture.runAsync(() -> {
            this.logger.info(String.format("begin restore job (%s) from master active switch", entry.getKey()));
            try {
                if (!this.runningJobMasterMap.keySet().contains(entry.getKey())) {
                    this.restoreJobFromMasterActiveSwitch((Long)entry.getKey(), (JobInfo)entry.getValue());
                }
            }
            catch (Exception e) {
                this.logger.severe((Throwable)e);
            }
            this.logger.info(String.format("restore job (%s) from master active switch finished", entry.getKey()));
        }, (Executor)MDCTracer.tracing((Long)((Long)entry.getKey()), (ExecutorService)this.executorService))).collect(Collectors.toList());
        try {
            CompletableFuture voidCompletableFuture = CompletableFuture.allOf((CompletableFuture[])collect.toArray(new CompletableFuture[0]));
            voidCompletableFuture.get();
        }
        catch (Exception e) {
            this.logger.severe(ExceptionUtils.getMessage((Throwable)e));
            throw new SeaTunnelEngineException((Throwable)e);
        }
    }

    private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobInfo jobInfo) {
        if (jobId == null) {
            throw new NullPointerException("jobId is marked non-null but is null");
        }
        if (jobInfo == null) {
            throw new NullPointerException("jobInfo is marked non-null but is null");
        }
        if (this.runningJobStateIMap.get((Object)jobId) == null) {
            this.runningJobInfoIMap.remove((Object)jobId);
            return;
        }
        JobMaster jobMaster = new JobMaster(jobId, jobInfo.getJobImmutableInformation(), (NodeEngine)this.nodeEngine, (ExecutorService)MDCTracer.tracing((Long)jobId, (ExecutorService)this.executorService), this.getResourceManager(), this.getJobHistoryService(), this.runningJobStateIMap, this.runningJobStateTimestampsIMap, this.ownedSlotProfilesIMap, this.runningJobInfoIMap, this.engineConfig, this.seaTunnelServer);
        try {
            jobMaster.init(((JobInfo)this.runningJobInfoIMap.get((Object)jobId)).getInitializationTimestamp(), true);
        }
        catch (Exception e) {
            throw new SeaTunnelEngineException(String.format("Job id %s init failed", jobId), (Throwable)e);
        }
        this.pendingJobMasterMap.put(jobId, (Tuple2<PendingSourceState, JobMaster>)new Tuple2((Object)PendingSourceState.RESTORE, (Object)jobMaster));
        this.pendingJob.put(jobMaster);
        jobMaster.getPhysicalPlan().updateJobState(JobStatus.PENDING);
        this.logger.info(String.format("The restore job enter pending queue, JobId: %s", jobId));
    }

    private void checkNewActiveMaster() {
        try {
            if (!this.isActive && this.seaTunnelServer.isMasterNode()) {
                this.logger.info("This node become a new active master node, begin init coordinator service");
                if (this.executorService.isShutdown()) {
                    this.executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("seatunnel-coordinator-service-%d").build());
                }
                this.initCoordinatorService();
                this.isActive = true;
            } else if (this.isActive && !this.seaTunnelServer.isMasterNode()) {
                this.isActive = false;
                this.logger.info("This node become leave active master node, begin clear coordinator service");
                this.clearCoordinatorService();
            }
        }
        catch (Exception e) {
            this.isActive = false;
            this.logger.severe(ExceptionUtils.getMessage((Throwable)e));
            throw new SeaTunnelEngineException("check new active master error, stop loop", (Throwable)e);
        }
    }

    public synchronized void clearCoordinatorService() {
        this.runningJobMasterMap.values().forEach(JobMaster::interrupt);
        if (this.isWaitStrategy) {
            this.pendingJobMasterMap.values().stream().filter(Objects::nonNull).map(Tuple2::_2).forEach(JobMaster::interrupt);
            this.pendingJobMasterMap.clear();
        }
        this.executorService.shutdownNow();
        this.runningJobMasterMap.clear();
        try {
            this.executorService.awaitTermination(20L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            throw new SeaTunnelEngineException("wait clean executor service error", (Throwable)e);
        }
        if (this.resourceManager != null) {
            this.resourceManager.close();
        }
        try {
            if (this.eventProcessor != null) {
                this.eventProcessor.close();
            }
        }
        catch (Exception e) {
            throw new SeaTunnelEngineException("close event processor error", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ResourceManager getResourceManager() {
        if (this.resourceManager == null) {
            CoordinatorService coordinatorService = this;
            synchronized (coordinatorService) {
                if (this.resourceManager == null) {
                    ResourceManager manager = new ResourceManagerFactory((NodeEngine)this.nodeEngine, this.engineConfig).getResourceManager();
                    manager.init();
                    this.resourceManager = manager;
                }
            }
        }
        return this.resourceManager;
    }

    public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInformation, boolean isStartWithSavePoint) {
        CompletableFuture jobSubmitFuture = new CompletableFuture();
        if (this.getJobMaster(jobId) != null) {
            this.logger.warning(String.format("The job %s is currently running; no need to submit again.", jobId));
            jobSubmitFuture.complete(null);
            return new PassiveCompletableFuture(jobSubmitFuture);
        }
        MDCExecutorService mdcExecutorService = MDCTracer.tracing((Long)jobId, (ExecutorService)this.executorService);
        JobMaster jobMaster = new JobMaster(jobId, jobImmutableInformation, (NodeEngine)this.nodeEngine, (ExecutorService)mdcExecutorService, this.getResourceManager(), this.getJobHistoryService(), this.runningJobStateIMap, this.runningJobStateTimestampsIMap, this.ownedSlotProfilesIMap, this.runningJobInfoIMap, this.engineConfig, this.seaTunnelServer);
        mdcExecutorService.submit(() -> {
            try {
                if (!isStartWithSavePoint && this.getJobHistoryService().getJobMetrics(jobId) != JobMetrics.empty()) {
                    throw new JobException(String.format("The job id %s has already been submitted and is not starting with a savepoint.", jobId));
                }
                this.pendingJobMasterMap.put(jobId, (Tuple2<PendingSourceState, JobMaster>)new Tuple2((Object)PendingSourceState.SUBMIT, (Object)jobMaster));
                this.runningJobInfoIMap.put((Object)jobId, (Object)new JobInfo(Long.valueOf(System.currentTimeMillis()), jobImmutableInformation));
                jobMaster.init(((JobInfo)this.runningJobInfoIMap.get((Object)jobId)).getInitializationTimestamp(), false);
                jobSubmitFuture.complete(null);
            }
            catch (Throwable e) {
                String errorMsg = ExceptionUtils.getMessage((Throwable)e);
                this.logger.severe(String.format("submit job %s error %s ", jobId, errorMsg));
                jobSubmitFuture.completeExceptionally((Throwable)new JobException(errorMsg));
            }
            if (!jobSubmitFuture.isCompletedExceptionally()) {
                this.pendingJob.put(jobMaster);
                jobMaster.getPhysicalPlan().updateJobState(JobStatus.PENDING);
                this.logger.info(String.format("The submit job enter the pending queue , jobId: %s , jobName: %s", jobId, jobMaster.getJobImmutableInformation().getJobName()));
            } else {
                this.runningJobInfoIMap.remove((Object)jobId);
                this.runningJobMasterMap.remove(jobId);
            }
        });
        return new PassiveCompletableFuture(jobSubmitFuture);
    }

    public PassiveCompletableFuture<Void> savePoint(long jobId) {
        CompletableFuture voidCompletableFuture = new CompletableFuture();
        if (!this.runningJobMasterMap.containsKey(jobId)) {
            SavePointFailedException exception = new SavePointFailedException("The job with id '" + jobId + "' not running, save point failed");
            this.logger.warning((Throwable)exception);
            voidCompletableFuture.completeExceptionally((Throwable)exception);
        } else {
            voidCompletableFuture = new PassiveCompletableFuture(CompletableFuture.supplyAsync(() -> {
                JobMaster runningJobMaster = this.runningJobMasterMap.get(jobId);
                if (!((Boolean)runningJobMaster.savePoint().join()).booleanValue()) {
                    throw new SavePointFailedException("The job with id '" + jobId + "' save point failed");
                }
                try {
                    this.waitForJobComplete(jobId).get();
                }
                catch (Throwable e) {
                    this.logger.warning(String.format("The job with id '%s' waiting state complete failed", jobId));
                }
                return null;
            }, (Executor)this.executorService));
        }
        return new PassiveCompletableFuture(voidCompletableFuture);
    }

    public PassiveCompletableFuture<JobResult> waitForJobComplete(long jobId) {
        this.restoreAllJobFromMasterNodeSwitchFuture.join();
        JobMaster runningJobMaster = this.getJobMaster(jobId);
        if (runningJobMaster == null) {
            CompletableFuture jobStateFuture = CompletableFuture.supplyAsync(() -> this.jobHistoryService.getJobDetailState(jobId), (Executor)this.executorService);
            JobHistoryService.JobState jobState = null;
            try {
                jobState = (JobHistoryService.JobState)jobStateFuture.get();
            }
            catch (Exception e) {
                throw new SeaTunnelEngineException("get job state error", (Throwable)e);
            }
            CompletableFuture future = new CompletableFuture();
            if (jobState == null) {
                future.complete((Object)new JobResult(JobStatus.UNKNOWABLE, null));
            } else {
                future.complete((Object)new JobResult(jobState.getJobStatus(), jobState.getErrorMessage()));
            }
            return new PassiveCompletableFuture(future);
        }
        return new PassiveCompletableFuture(runningJobMaster.getJobMasterCompleteFuture());
    }

    public PassiveCompletableFuture<Void> cancelJob(long jobId) {
        JobMaster runningJobMaster = this.getJobMaster(jobId);
        if (runningJobMaster == null) {
            CompletableFuture future = new CompletableFuture();
            future.complete(null);
            return new PassiveCompletableFuture(future);
        }
        if (this.pendingJobMasterMap.containsKey(jobId)) {
            this.pendingJobMasterMap.remove(jobId);
            this.logger.fine(String.format("Cancel pending tasks : %s", jobId));
        }
        return new PassiveCompletableFuture(CompletableFuture.supplyAsync(() -> {
            runningJobMaster.cancelJob();
            return null;
        }, (Executor)this.executorService));
    }

    public JobStatus getJobStatus(long jobId) {
        if (this.pendingJobMasterMap.containsKey(jobId)) {
            return JobStatus.PENDING;
        }
        JobMaster runningJobMaster = this.runningJobMasterMap.get(jobId);
        if (runningJobMaster == null) {
            JobHistoryService.JobState jobDetailState = this.jobHistoryService.getJobDetailState(jobId);
            return null == jobDetailState ? JobStatus.UNKNOWABLE : jobDetailState.getJobStatus();
        }
        JobStatus jobStatus = runningJobMaster.getJobStatus();
        if (jobStatus == null) {
            return ((JobHistoryService.JobState)this.jobHistoryService.getFinishedJobStateImap().get((Object)jobId)).getJobStatus();
        }
        return jobStatus;
    }

    public JobMetrics getJobMetrics(long jobId) {
        if (this.pendingJobMasterMap.containsKey(jobId)) {
            return JobMetrics.empty();
        }
        JobMaster runningJobMaster = this.runningJobMasterMap.get(jobId);
        if (runningJobMaster == null) {
            return this.jobHistoryService.getJobMetrics(jobId);
        }
        JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(runningJobMaster.getCurrJobMetrics());
        JobMetrics jobMetricsImap = this.jobHistoryService.getJobMetrics(jobId);
        return jobMetricsImap != JobMetrics.empty() ? jobMetricsImap.merge(jobMetrics) : jobMetrics;
    }

    public Map<Long, JobMetrics> getRunningJobMetrics() {
        Set<Long> runningJobIds = this.runningJobMasterMap.keySet();
        HashSet addresses = new HashSet();
        this.ownedSlotProfilesIMap.forEach((pipelineLocation, ownedSlotProfilesIMap) -> {
            if (runningJobIds.contains(pipelineLocation.getJobId())) {
                ownedSlotProfilesIMap.values().forEach(ownedSlotProfile -> addresses.add(ownedSlotProfile.getWorker()));
            }
        });
        ArrayList<RawJobMetrics> metrics = new ArrayList<RawJobMetrics>();
        addresses.forEach(address -> {
            try {
                if (this.nodeEngine.getClusterService().getMember(address) != null) {
                    RawJobMetrics rawJobMetrics = (RawJobMetrics)NodeEngineUtil.sendOperationToMemberNode((NodeEngine)this.nodeEngine, new GetMetricsOperation(runningJobIds), address).get();
                    metrics.add(rawJobMetrics);
                }
            }
            catch (HazelcastInstanceNotActiveException e) {
                this.logger.warning(String.format("get metrics with exception: %s.", ExceptionUtils.getMessage((Throwable)e)));
            }
            catch (Exception e) {
                throw new SeaTunnelException(e.getMessage());
            }
        });
        Map<Long, JobMetrics> longJobMetricsMap = JobMetricsUtil.toJobMetricsMap(metrics);
        longJobMetricsMap.forEach((jobId, jobMetrics) -> {
            JobMetrics jobMetricsImap = this.jobHistoryService.getJobMetrics((Long)jobId);
            if (jobMetricsImap != JobMetrics.empty()) {
                longJobMetricsMap.put((Long)jobId, jobMetricsImap.merge(jobMetrics));
            }
        });
        return longJobMetricsMap;
    }

    public JobDAGInfo getJobInfo(long jobId) {
        JobDAGInfo jobInfo = this.jobHistoryService.getJobDAGInfo(jobId);
        if (jobInfo != null) {
            return jobInfo;
        }
        return this.runningJobMasterMap.get(jobId).getJobDAGInfo();
    }

    public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
        this.logger.info(String.format("Received task end from execution %s, state %s", taskExecutionState.getTaskGroupLocation(), taskExecutionState.getExecutionState()));
        TaskGroupLocation taskGroupLocation = taskExecutionState.getTaskGroupLocation();
        JobMaster runningJobMaster = this.runningJobMasterMap.get(taskGroupLocation.getJobId());
        if (runningJobMaster == null) {
            throw new JobNotFoundException(String.format("Job %s not running", taskGroupLocation.getJobId()));
        }
        runningJobMaster.updateTaskExecutionState(taskExecutionState);
    }

    public void shutdown() {
        if (this.masterActiveListener != null) {
            this.masterActiveListener.shutdownNow();
        }
        this.clearCoordinatorService();
    }

    public boolean isCoordinatorActive() {
        return this.isActive;
    }

    public void failedTaskOnMemberRemoved(MembershipServiceEvent event) {
        Address lostAddress = event.getMember().getAddress();
        this.runningJobMasterMap.forEach((aLong, jobMaster) -> jobMaster.getPhysicalPlan().getPipelineList().forEach(subPlan -> {
            this.makeTasksFailed(subPlan.getCoordinatorVertexList(), lostAddress);
            this.makeTasksFailed(subPlan.getPhysicalVertexList(), lostAddress);
        }));
    }

    private void makeTasksFailed(@NonNull List<PhysicalVertex> physicalVertexList, @NonNull Address lostAddress) {
        if (physicalVertexList == null) {
            throw new NullPointerException("physicalVertexList is marked non-null but is null");
        }
        if (lostAddress == null) {
            throw new NullPointerException("lostAddress is marked non-null but is null");
        }
        physicalVertexList.forEach(physicalVertex -> {
            Address deployAddress = physicalVertex.getCurrentExecutionAddress();
            ExecutionState executionState = physicalVertex.getExecutionState();
            if (null != deployAddress && deployAddress.equals((Object)lostAddress) && (executionState.equals(ExecutionState.DEPLOYING) || executionState.equals(ExecutionState.RUNNING) || executionState.equals(ExecutionState.CANCELING))) {
                TaskGroupLocation taskGroupLocation = physicalVertex.getTaskGroupLocation();
                physicalVertex.updateStateByExecutionService(new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED, (Throwable)new JobException(String.format("The taskGroup(%s) deployed node(%s) offline", taskGroupLocation, lostAddress))));
            }
        });
    }

    public void memberRemoved(MembershipServiceEvent event) {
        if (this.isCoordinatorActive()) {
            this.getResourceManager().memberRemoved(event);
        }
        this.failedTaskOnMemberRemoved(event);
    }

    public void printExecutionInfo() {
        ThreadPoolStatus threadPoolStatus = this.getThreadPoolStatusMetrics();
        this.logger.info(StringFormatUtils.formatTable((Object[])new Object[]{"CoordinatorService Thread Pool Status", "activeCount", threadPoolStatus.getActiveCount(), "corePoolSize", threadPoolStatus.getCorePoolSize(), "maximumPoolSize", threadPoolStatus.getMaximumPoolSize(), "poolSize", threadPoolStatus.getPoolSize(), "completedTaskCount", threadPoolStatus.getCompletedTaskCount(), "taskCount", threadPoolStatus.getTaskCount()}));
    }

    public void printJobDetailInfo() {
        JobCounter jobCounter = this.getJobCountMetrics();
        this.logger.info(StringFormatUtils.formatTable((Object[])new Object[]{"Job info detail", "createdJobCount", jobCounter.getCreatedJobCount(), "pendingJobCount", jobCounter.getPendingJobCount(), "scheduledJobCount", jobCounter.getScheduledJobCount(), "runningJobCount", jobCounter.getRunningJobCount(), "failingJobCount", jobCounter.getFailingJobCount(), "failedJobCount", jobCounter.getFailedJobCount(), "cancellingJobCount", jobCounter.getCancellingJobCount(), "canceledJobCount", jobCounter.getCanceledJobCount(), "finishedJobCount", jobCounter.getFinishedJobCount()}));
    }

    public JobCounter getJobCountMetrics() {
        AtomicLong createdJobCount = new AtomicLong();
        AtomicLong scheduledJobCount = new AtomicLong();
        AtomicLong runningJobCount = new AtomicLong();
        AtomicLong pendingJobCount = new AtomicLong();
        AtomicLong failingJobCount = new AtomicLong();
        AtomicLong failedJobCount = new AtomicLong();
        AtomicLong cancellingJobCount = new AtomicLong();
        AtomicLong canceledJobCount = new AtomicLong();
        AtomicLong finishedJobCount = new AtomicLong();
        if (this.jobHistoryService != null) {
            this.jobHistoryService.getJobStatusData().forEach(jobStatusData -> {
                JobStatus jobStatus = jobStatusData.getJobStatus();
                switch (jobStatus) {
                    case CREATED: {
                        createdJobCount.addAndGet(1L);
                        break;
                    }
                    case PENDING: {
                        pendingJobCount.addAndGet(1L);
                        break;
                    }
                    case SCHEDULED: {
                        scheduledJobCount.addAndGet(1L);
                        break;
                    }
                    case RUNNING: {
                        runningJobCount.addAndGet(1L);
                        break;
                    }
                    case FAILING: {
                        failingJobCount.addAndGet(1L);
                        break;
                    }
                    case FAILED: {
                        failedJobCount.addAndGet(1L);
                        break;
                    }
                    case CANCELING: {
                        cancellingJobCount.addAndGet(1L);
                        break;
                    }
                    case CANCELED: {
                        canceledJobCount.addAndGet(1L);
                        break;
                    }
                    case FINISHED: {
                        finishedJobCount.addAndGet(1L);
                        break;
                    }
                }
            });
        }
        return new JobCounter(createdJobCount.longValue(), pendingJobCount.longValue(), scheduledJobCount.longValue(), runningJobCount.longValue(), failingJobCount.longValue(), failedJobCount.longValue(), cancellingJobCount.longValue(), canceledJobCount.longValue(), finishedJobCount.longValue());
    }

    public ThreadPoolStatus getThreadPoolStatusMetrics() {
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)this.executorService;
        long rejectionCount = ((ThreadPoolStatus.RejectionCountingHandler)threadPoolExecutor.getRejectedExecutionHandler()).getRejectionCount();
        long queueTaskSize = threadPoolExecutor.getQueue().size();
        return new ThreadPoolStatus(threadPoolExecutor.getActiveCount(), threadPoolExecutor.getCorePoolSize(), threadPoolExecutor.getMaximumPoolSize(), threadPoolExecutor.getPoolSize(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getTaskCount(), queueTaskSize, rejectionCount);
    }

    public ConnectorPackageService getConnectorPackageService() {
        if (this.connectorPackageService == null) {
            throw new SeaTunnelEngineException("The user is not configured to enable connector package service, can not get connector package service service from master node.");
        }
        return this.connectorPackageService;
    }

    @VisibleForTesting
    protected IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> getMetricsImap() {
        return this.metricsImap;
    }
}

