/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.shaded.guava33.com.google.common.base.Ticker;
import org.apache.flink.shaded.guava33.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava33.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava33.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.guava33.com.google.common.cache.LoadingCache;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileExecutionGraphInfoStore
implements ExecutionGraphInfoStore {
    private static final Logger LOG = LoggerFactory.getLogger(FileExecutionGraphInfoStore.class);
    private final File storageDir;
    private final Cache<JobID, JobDetails> jobDetailsCache;
    private final LoadingCache<JobID, ExecutionGraphInfo> executionGraphInfoCache;
    private final ScheduledFuture<?> cleanupFuture;
    private int numFinishedJobs;
    private int numFailedJobs;
    private int numCanceledJobs;

    public FileExecutionGraphInfoStore(File rootDir, Duration expirationTime, int maximumCapacity, long maximumCacheSizeBytes, ScheduledExecutor scheduledExecutor, Ticker ticker) throws IOException {
        File storageDirectory = FileExecutionGraphInfoStore.initExecutionGraphStorageDirectory(rootDir);
        LOG.info("Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.", new Object[]{FileExecutionGraphInfoStore.class.getSimpleName(), storageDirectory, expirationTime.toMillis(), maximumCacheSizeBytes});
        this.storageDir = Preconditions.checkNotNull(storageDirectory);
        Preconditions.checkArgument(storageDirectory.exists() && storageDirectory.isDirectory(), "The storage directory must exist and be a directory.");
        this.jobDetailsCache = CacheBuilder.newBuilder().expireAfterWrite(expirationTime.toMillis(), TimeUnit.MILLISECONDS).maximumSize(maximumCapacity).removalListener(notification -> this.deleteExecutionGraphFile((JobID)notification.getKey())).ticker(ticker).build();
        this.executionGraphInfoCache = CacheBuilder.newBuilder().maximumWeight(maximumCacheSizeBytes).weigher(this::calculateSize).build(new CacheLoader<JobID, ExecutionGraphInfo>(){

            @Override
            public ExecutionGraphInfo load(JobID jobId) throws Exception {
                return FileExecutionGraphInfoStore.this.loadExecutionGraph(jobId);
            }
        });
        this.cleanupFuture = scheduledExecutor.scheduleWithFixedDelay(this.jobDetailsCache::cleanUp, expirationTime.toMillis(), expirationTime.toMillis(), TimeUnit.MILLISECONDS);
        this.numFinishedJobs = 0;
        this.numFailedJobs = 0;
        this.numCanceledJobs = 0;
    }

    @Override
    public int size() {
        return Math.toIntExact(this.jobDetailsCache.size());
    }

    @Override
    @Nullable
    public ExecutionGraphInfo get(JobID jobId) {
        try {
            return this.executionGraphInfoCache.get(jobId);
        }
        catch (ExecutionException e) {
            LOG.debug("Could not load archived execution graph information for job id {}.", (Object)jobId, (Object)e);
            return null;
        }
    }

    @Override
    public void put(ExecutionGraphInfo executionGraphInfo) throws IOException {
        JobID jobId = executionGraphInfo.getJobId();
        ArchivedExecutionGraph archivedExecutionGraph = executionGraphInfo.getArchivedExecutionGraph();
        JobStatus jobStatus = archivedExecutionGraph.getState();
        String jobName = archivedExecutionGraph.getJobName();
        Preconditions.checkArgument(jobStatus.isTerminalState(), "The job " + jobName + "(" + String.valueOf(jobId) + ") is not in a terminal state. Instead it is in state " + String.valueOf((Object)jobStatus) + ".");
        switch (jobStatus) {
            case FINISHED: {
                ++this.numFinishedJobs;
                break;
            }
            case CANCELED: {
                ++this.numCanceledJobs;
                break;
            }
            case FAILED: {
                ++this.numFailedJobs;
                break;
            }
            case SUSPENDED: {
                break;
            }
            default: {
                throw new IllegalStateException("The job " + jobName + "(" + String.valueOf(jobId) + ") should have been in a known terminal state. Instead it was in state " + String.valueOf((Object)jobStatus) + ".");
            }
        }
        this.storeExecutionGraphInfo(executionGraphInfo);
        JobDetails detailsForJob = JobDetails.createDetailsForJob(archivedExecutionGraph);
        this.jobDetailsCache.put(jobId, detailsForJob);
        this.executionGraphInfoCache.put(jobId, executionGraphInfo);
    }

    @Override
    public JobsOverview getStoredJobsOverview() {
        return new JobsOverview(0, this.numFinishedJobs, this.numCanceledJobs, this.numFailedJobs);
    }

    @Override
    public Collection<JobDetails> getAvailableJobDetails() {
        return this.jobDetailsCache.asMap().values();
    }

    @Override
    @Nullable
    public JobDetails getAvailableJobDetails(JobID jobId) {
        return this.jobDetailsCache.getIfPresent(jobId);
    }

    @Override
    public void close() throws IOException {
        this.cleanupFuture.cancel(false);
        this.jobDetailsCache.invalidateAll();
        FileUtils.deleteFileOrDirectory(this.storageDir);
    }

    private int calculateSize(JobID jobId, ExecutionGraphInfo serializableExecutionGraphInfo) {
        File executionGraphInfoFile = this.getExecutionGraphFile(jobId);
        if (executionGraphInfoFile.exists()) {
            return Math.toIntExact(executionGraphInfoFile.length());
        }
        LOG.debug("Could not find execution graph information file for {}. Estimating the size instead.", (Object)jobId);
        ArchivedExecutionGraph serializableExecutionGraph = serializableExecutionGraphInfo.getArchivedExecutionGraph();
        return serializableExecutionGraph.getAllVertices().size() * 1000 + serializableExecutionGraph.getAccumulatorsSerialized().size() * 1000;
    }

    private ExecutionGraphInfo loadExecutionGraph(JobID jobId) throws IOException, ClassNotFoundException {
        File executionGraphInfoFile = this.getExecutionGraphFile(jobId);
        if (executionGraphInfoFile.exists()) {
            try (FileInputStream fileInputStream = new FileInputStream(executionGraphInfoFile);){
                ExecutionGraphInfo executionGraphInfo = (ExecutionGraphInfo)InstantiationUtil.deserializeObject(fileInputStream, this.getClass().getClassLoader());
                return executionGraphInfo;
            }
        }
        throw new FileNotFoundException("Could not find file for archived execution graph " + String.valueOf(jobId) + ". This indicates that the file either has been deleted or never written.");
    }

    private void storeExecutionGraphInfo(ExecutionGraphInfo executionGraphInfo) throws IOException {
        File archivedExecutionGraphFile = this.getExecutionGraphFile(executionGraphInfo.getJobId());
        try (FileOutputStream fileOutputStream = new FileOutputStream(archivedExecutionGraphFile);){
            InstantiationUtil.serializeObject(fileOutputStream, executionGraphInfo);
        }
    }

    private File getExecutionGraphFile(JobID jobId) {
        return new File(this.storageDir, jobId.toString());
    }

    private void deleteExecutionGraphFile(JobID jobId) {
        Preconditions.checkNotNull(jobId);
        File archivedExecutionGraphFile = this.getExecutionGraphFile(jobId);
        try {
            FileUtils.deleteFileOrDirectory(archivedExecutionGraphFile);
        }
        catch (IOException e) {
            LOG.debug("Could not delete file {}.", (Object)archivedExecutionGraphFile, (Object)e);
        }
        this.executionGraphInfoCache.invalidate(jobId);
        this.jobDetailsCache.invalidate(jobId);
    }

    private static File initExecutionGraphStorageDirectory(File tmpDir) throws IOException {
        int maxAttempts = 10;
        for (int attempt = 0; attempt < 10; ++attempt) {
            File storageDirectory = new File(tmpDir, "executionGraphStore-" + String.valueOf(UUID.randomUUID()));
            if (!storageDirectory.mkdir()) continue;
            return storageDirectory;
        }
        throw new IOException("Could not create executionGraphStorage directory in " + String.valueOf(tmpDir) + ".");
    }

    @VisibleForTesting
    File getStorageDir() {
        return this.storageDir;
    }

    @VisibleForTesting
    LoadingCache<JobID, ExecutionGraphInfo> getExecutionGraphInfoCache() {
        return this.executionGraphInfoCache;
    }
}

