package org.apache.flink.runtime.jobmanager;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.runtime.persistence.ResourceVersion;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.class */
public class DefaultJobGraphStore<R extends ResourceVersion<R>> implements JobGraphStore, JobGraphStore.JobGraphListener {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultJobGraphStore.class);
    private final StateHandleStore<JobGraph, R> jobGraphStateHandleStore;

    @GuardedBy("lock")
    private final JobGraphStoreWatcher jobGraphStoreWatcher;
    private final JobGraphStoreUtil jobGraphStoreUtil;

    @GuardedBy("lock")
    private JobGraphStore.JobGraphListener jobGraphListener;
    private final Object lock = new Object();

    @GuardedBy("lock")
    private final Set<JobID> addedJobGraphs = new HashSet();

    @GuardedBy("lock")
    private volatile boolean running = false;

    public DefaultJobGraphStore(StateHandleStore<JobGraph, R> stateHandleStore, JobGraphStoreWatcher jobGraphStoreWatcher, JobGraphStoreUtil jobGraphStoreUtil) {
        this.jobGraphStateHandleStore = (StateHandleStore) Preconditions.checkNotNull(stateHandleStore);
        this.jobGraphStoreWatcher = (JobGraphStoreWatcher) Preconditions.checkNotNull(jobGraphStoreWatcher);
        this.jobGraphStoreUtil = (JobGraphStoreUtil) Preconditions.checkNotNull(jobGraphStoreUtil);
    }

    @Override // org.apache.flink.runtime.jobmanager.JobGraphStore
    public void start(JobGraphStore.JobGraphListener jobGraphListener) throws Exception {
        synchronized (this.lock) {
            if (!this.running) {
                this.jobGraphListener = (JobGraphStore.JobGraphListener) Preconditions.checkNotNull(jobGraphListener);
                this.jobGraphStoreWatcher.start(this);
                this.running = true;
            }
        }
    }

    @Override // org.apache.flink.runtime.jobmanager.JobGraphStore
    public void stop() throws Exception {
        synchronized (this.lock) {
            if (this.running) {
                this.running = false;
                LOG.info("Stopping DefaultJobGraphStore.");
                Exception exc = null;
                try {
                    this.jobGraphStateHandleStore.releaseAll();
                } catch (Exception e) {
                    exc = e;
                }
                try {
                    this.jobGraphStoreWatcher.stop();
                } catch (Exception e2) {
                    exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
                }
                if (exc != null) {
                    throw new FlinkException("Could not properly stop the DefaultJobGraphStore.", exc);
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.jobmanager.JobGraphStore
    @Nullable
    public JobGraph recoverJobGraph(JobID jobID) throws Exception {
        JobGraph retrieveState;
        Preconditions.checkNotNull(jobID, "Job ID");
        LOG.debug("Recovering job graph {} from {}.", jobID, this.jobGraphStateHandleStore);
        String jobIDToName = this.jobGraphStoreUtil.jobIDToName(jobID);
        synchronized (this.lock) {
            verifyIsRunning();
            try {
                try {
                    try {
                        retrieveState = this.jobGraphStateHandleStore.getAndLock(jobIDToName).retrieveState();
                        this.addedJobGraphs.add(retrieveState.getJobID());
                        LOG.info("Recovered {}.", retrieveState);
                        if (1 == 0) {
                            this.jobGraphStateHandleStore.release(jobIDToName);
                        }
                    } catch (IOException e) {
                        throw new FlinkException("Could not retrieve submitted JobGraph from state handle under " + jobIDToName + ". This indicates that the retrieved state handle is broken. Try cleaning the state handle store.", e);
                    } catch (ClassNotFoundException e2) {
                        throw new FlinkException("Could not retrieve submitted JobGraph from state handle under " + jobIDToName + ". This indicates that you are trying to recover from state written by an older Flink version which is not compatible. Try cleaning the state handle store.", e2);
                    }
                } catch (Throwable th) {
                    if (0 == 0) {
                        this.jobGraphStateHandleStore.release(jobIDToName);
                    }
                    throw th;
                }
            } catch (Exception e3) {
                throw new FlinkException("Could not retrieve the submitted job graph state handle for " + jobIDToName + " from the submitted job graph store.", e3);
            } catch (StateHandleStore.NotExistException e4) {
                if (1 == 0) {
                    this.jobGraphStateHandleStore.release(jobIDToName);
                }
                return null;
            }
        }
        return retrieveState;
    }

    @Override // org.apache.flink.runtime.jobmanager.JobGraphWriter
    public void putJobGraph(JobGraph jobGraph) throws Exception {
        Preconditions.checkNotNull(jobGraph, "Job graph");
        JobID jobID = jobGraph.getJobID();
        String jobIDToName = this.jobGraphStoreUtil.jobIDToName(jobID);
        LOG.debug("Adding job graph {} to {}.", jobID, this.jobGraphStateHandleStore);
        boolean z = false;
        while (!z) {
            synchronized (this.lock) {
                verifyIsRunning();
                R exists = this.jobGraphStateHandleStore.exists(jobIDToName);
                if (!exists.isExisting()) {
                    try {
                        this.jobGraphStateHandleStore.addAndLock(jobIDToName, jobGraph);
                        this.addedJobGraphs.add(jobID);
                        z = true;
                    } catch (StateHandleStore.AlreadyExistException e) {
                        LOG.warn("{} already exists in {}.", jobGraph, this.jobGraphStateHandleStore);
                    }
                } else {
                    if (!this.addedJobGraphs.contains(jobID)) {
                        throw new IllegalStateException("Trying to update a graph you didn't #getAllSubmittedJobGraphs() or #putJobGraph() yourself before.");
                    }
                    try {
                        this.jobGraphStateHandleStore.replace(jobIDToName, exists, jobGraph);
                        LOG.info("Updated {} in {}.", jobGraph, getClass().getSimpleName());
                        z = true;
                    } catch (StateHandleStore.NotExistException e2) {
                        LOG.warn("{} does not exists in {}.", jobGraph, this.jobGraphStateHandleStore);
                    }
                }
            }
        }
        LOG.info("Added {} to {}.", jobGraph, this.jobGraphStateHandleStore);
    }

    @Override // org.apache.flink.runtime.jobmanager.JobGraphWriter
    public void putJobResourceRequirements(JobID jobID, JobResourceRequirements jobResourceRequirements) throws Exception {
        synchronized (this.lock) {
            JobGraph recoverJobGraph = recoverJobGraph(jobID);
            if (recoverJobGraph == null) {
                throw new NoSuchElementException(String.format("JobGraph for job [%s] was not found in JobGraphStore and is needed for attaching JobResourceRequirements.", jobID));
            }
            JobResourceRequirements.writeToJobGraph(recoverJobGraph, jobResourceRequirements);
            putJobGraph(recoverJobGraph);
        }
    }

    @Override // org.apache.flink.runtime.jobmanager.JobGraphWriter, org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableResource
    public CompletableFuture<Void> globalCleanupAsync(JobID jobID, Executor executor) {
        Preconditions.checkNotNull(jobID, "Job ID");
        return runAsyncWithLockAssertRunning(() -> {
            LOG.debug("Removing job graph {} from {}.", jobID, this.jobGraphStateHandleStore);
            releaseAndRemoveOrThrowCompletionException(jobID, this.jobGraphStoreUtil.jobIDToName(jobID));
            this.addedJobGraphs.remove(jobID);
            LOG.info("Removed job graph {} from {}.", jobID, this.jobGraphStateHandleStore);
        }, executor);
    }

    @GuardedBy("lock")
    private void releaseAndRemoveOrThrowCompletionException(JobID jobID, String str) {
        try {
            if (!this.jobGraphStateHandleStore.releaseAndTryRemove(str)) {
                throw new CompletionException((Throwable) new FlinkException(String.format("Could not remove job graph with job id %s from %s.", jobID, this.jobGraphStateHandleStore)));
            }
        } catch (Exception e) {
            throw new CompletionException(e);
        }
    }

    @Override // org.apache.flink.runtime.jobmanager.JobGraphWriter, org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource
    public CompletableFuture<Void> localCleanupAsync(JobID jobID, Executor executor) {
        Preconditions.checkNotNull(jobID, "Job ID");
        return runAsyncWithLockAssertRunning(() -> {
            LOG.debug("Releasing job graph {} from {}.", jobID, this.jobGraphStateHandleStore);
            this.jobGraphStateHandleStore.release(this.jobGraphStoreUtil.jobIDToName(jobID));
            this.addedJobGraphs.remove(jobID);
            LOG.info("Released job graph {} from {}.", jobID, this.jobGraphStateHandleStore);
        }, executor);
    }

    private CompletableFuture<Void> runAsyncWithLockAssertRunning(ThrowingRunnable<Exception> throwingRunnable, Executor executor) {
        return CompletableFuture.runAsync(() -> {
            synchronized (this.lock) {
                verifyIsRunning();
                try {
                    throwingRunnable.run();
                } catch (Exception e) {
                    throw new CompletionException(e);
                }
            }
        }, executor);
    }

    @Override // org.apache.flink.runtime.jobmanager.JobGraphStore
    public Collection<JobID> getJobIds() throws Exception {
        LOG.debug("Retrieving all stored job ids from {}.", this.jobGraphStateHandleStore);
        try {
            Collection<String> allHandles = this.jobGraphStateHandleStore.getAllHandles();
            ArrayList arrayList = new ArrayList(allHandles.size());
            for (String str : allHandles) {
                try {
                    arrayList.add(this.jobGraphStoreUtil.nameToJobID(str));
                } catch (Exception e) {
                    LOG.warn("Could not parse job id from {}. This indicates a malformed name.", str, e);
                }
            }
            LOG.info("Retrieved job ids {} from {}", arrayList, this.jobGraphStateHandleStore);
            return arrayList;
        } catch (Exception e2) {
            throw new Exception("Failed to retrieve all job ids from " + this.jobGraphStateHandleStore + ScopeFormat.SCOPE_SEPARATOR, e2);
        }
    }

    @Override // org.apache.flink.runtime.jobmanager.JobGraphStore.JobGraphListener
    public void onAddedJobGraph(JobID jobID) {
        synchronized (this.lock) {
            if (this.running && !this.addedJobGraphs.contains(jobID)) {
                try {
                    this.jobGraphListener.onAddedJobGraph(jobID);
                } catch (Throwable th) {
                    LOG.error("Failed to notify job graph listener onAddedJobGraph event for {}", jobID, th);
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.jobmanager.JobGraphStore.JobGraphListener
    public void onRemovedJobGraph(JobID jobID) {
        synchronized (this.lock) {
            if (this.running && this.addedJobGraphs.contains(jobID)) {
                try {
                    this.jobGraphListener.onRemovedJobGraph(jobID);
                } catch (Throwable th) {
                    LOG.error("Failed to notify job graph listener onRemovedJobGraph event for {}", jobID, th);
                }
            }
        }
    }

    private void verifyIsRunning() {
        Preconditions.checkState(this.running, "Not running. Forgot to call start()?");
    }
}
