/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.webmonitor.history;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.history.FsJobArchivist;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.HistoryServer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class HistoryServerArchiveFetcher {
    private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
    private static final JsonFactory jacksonFactory = new JsonFactory();
    private static final ObjectMapper mapper = new ObjectMapper();
    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher"));
    private final JobArchiveFetcherTask fetcherTask;
    private final long refreshIntervalMillis;

    HistoryServerArchiveFetcher(long refreshIntervalMillis, List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numFinishedPolls) {
        this.refreshIntervalMillis = refreshIntervalMillis;
        this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir, numFinishedPolls);
        if (LOG.isInfoEnabled()) {
            for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
                LOG.info("Monitoring directory {} for archived jobs.", (Object)refreshDir.getPath());
            }
        }
    }

    void start() {
        this.executor.scheduleWithFixedDelay(this.fetcherTask, 0L, this.refreshIntervalMillis, TimeUnit.MILLISECONDS);
    }

    void stop() {
        this.executor.shutdown();
        try {
            if (!this.executor.awaitTermination(1L, TimeUnit.SECONDS)) {
                this.executor.shutdownNow();
            }
        }
        catch (InterruptedException ignored) {
            this.executor.shutdownNow();
        }
    }

    private static String convertLegacyJobOverview(String legacyOverview) throws IOException {
        JsonNode root = mapper.readTree(legacyOverview);
        JsonNode finishedJobs = root.get("finished");
        JsonNode job = finishedJobs.get(0);
        JobID jobId = JobID.fromHexString((String)job.get("jid").asText());
        String name = job.get("name").asText();
        JobStatus state = JobStatus.valueOf((String)job.get("state").asText());
        long startTime = job.get("start-time").asLong();
        long endTime = job.get("end-time").asLong();
        long duration = job.get("duration").asLong();
        long lastMod = job.get("last-modification").asLong();
        JsonNode tasks = job.get("tasks");
        int numTasks = tasks.get("total").asInt();
        int pending = tasks.get("pending").asInt();
        int running = tasks.get("running").asInt();
        int finished = tasks.get("finished").asInt();
        int canceling = tasks.get("canceling").asInt();
        int canceled = tasks.get("canceled").asInt();
        int failed = tasks.get("failed").asInt();
        int[] tasksPerState = new int[ExecutionState.values().length];
        tasksPerState[ExecutionState.SCHEDULED.ordinal()] = pending;
        tasksPerState[ExecutionState.RUNNING.ordinal()] = running;
        tasksPerState[ExecutionState.FINISHED.ordinal()] = finished;
        tasksPerState[ExecutionState.CANCELING.ordinal()] = canceling;
        tasksPerState[ExecutionState.CANCELED.ordinal()] = canceled;
        tasksPerState[ExecutionState.FAILED.ordinal()] = failed;
        JobDetails jobDetails = new JobDetails(jobId, name, startTime, endTime, duration, state, lastMod, tasksPerState, numTasks);
        MultipleJobsDetails multipleJobsDetails = new MultipleJobsDetails(Collections.singleton(jobDetails));
        StringWriter sw = new StringWriter();
        mapper.writeValue((Writer)sw, (Object)multipleJobsDetails);
        return sw.toString();
    }

    private static void updateJobOverview(File webOverviewDir, File webDir) {
        try (JsonGenerator gen = jacksonFactory.createGenerator((Writer)HistoryServer.createOrGetFile(webDir, "/jobs/overview"));){
            File[] overviews = new File(webOverviewDir.getPath()).listFiles();
            if (overviews != null) {
                ArrayList allJobs = new ArrayList(overviews.length);
                for (File overview : overviews) {
                    MultipleJobsDetails subJobs = (MultipleJobsDetails)mapper.readValue(overview, MultipleJobsDetails.class);
                    allJobs.addAll(subJobs.getJobs());
                }
                mapper.writeValue(gen, (Object)new MultipleJobsDetails(allJobs));
            }
        }
        catch (IOException ioe) {
            LOG.error("Failed to update job overview.", (Throwable)ioe);
        }
    }

    static class JobArchiveFetcherTask
    extends TimerTask {
        private final List<HistoryServer.RefreshLocation> refreshDirs;
        private final CountDownLatch numFinishedPolls;
        private final Set<String> cachedArchives;
        private final File webDir;
        private final File webJobDir;
        private final File webOverviewDir;
        private static final String JSON_FILE_ENDING = ".json";

        JobArchiveFetcherTask(List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numFinishedPolls) {
            this.refreshDirs = (List)Preconditions.checkNotNull(refreshDirs);
            this.numFinishedPolls = numFinishedPolls;
            this.cachedArchives = new HashSet<String>();
            this.webDir = (File)Preconditions.checkNotNull((Object)webDir);
            this.webJobDir = new File(webDir, "jobs");
            this.webJobDir.mkdir();
            this.webOverviewDir = new File(webDir, "overviews");
            this.webOverviewDir.mkdir();
        }

        @Override
        public void run() {
            try {
                for (HistoryServer.RefreshLocation refreshLocation : this.refreshDirs) {
                    FileStatus[] jobArchives;
                    Path refreshDir = refreshLocation.getPath();
                    FileSystem refreshFS = refreshLocation.getFs();
                    try {
                        jobArchives = refreshFS.listStatus(refreshDir);
                    }
                    catch (IOException e) {
                        LOG.error("Failed to access job archive location for path {}.", (Object)refreshDir, (Object)e);
                        continue;
                    }
                    if (jobArchives == null) continue;
                    boolean updateOverview = false;
                    for (FileStatus jobArchive : jobArchives) {
                        Path jobArchivePath = jobArchive.getPath();
                        String jobID = jobArchivePath.getName();
                        try {
                            JobID.fromHexString((String)jobID);
                        }
                        catch (IllegalArgumentException iae) {
                            LOG.debug("Archive directory {} contained file with unexpected name {}. Ignoring file.", new Object[]{refreshDir, jobID, iae});
                            continue;
                        }
                        if (!this.cachedArchives.add(jobID)) continue;
                        try {
                            for (ArchivedJson archive : FsJobArchivist.getArchivedJsons((Path)jobArchive.getPath())) {
                                File target;
                                String path = archive.getPath();
                                String json = archive.getJson();
                                if (path.equals("/jobs/overview")) {
                                    target = new File(this.webOverviewDir, jobID + JSON_FILE_ENDING);
                                } else if (path.equals("/joboverview")) {
                                    json = HistoryServerArchiveFetcher.convertLegacyJobOverview(json);
                                    target = new File(this.webOverviewDir, jobID + JSON_FILE_ENDING);
                                } else {
                                    target = new File(this.webDir, path + JSON_FILE_ENDING);
                                }
                                java.nio.file.Path parent = target.getParentFile().toPath();
                                try {
                                    Files.createDirectories(parent, new FileAttribute[0]);
                                }
                                catch (FileAlreadyExistsException fileAlreadyExistsException) {
                                    // empty catch block
                                }
                                java.nio.file.Path targetPath = target.toPath();
                                Files.deleteIfExists(targetPath);
                                Files.createFile(target.toPath(), new FileAttribute[0]);
                                FileWriter fw = new FileWriter(target);
                                Throwable throwable = null;
                                try {
                                    fw.write(json);
                                    fw.flush();
                                }
                                catch (Throwable throwable2) {
                                    throwable = throwable2;
                                    throw throwable2;
                                }
                                finally {
                                    if (fw == null) continue;
                                    if (throwable != null) {
                                        try {
                                            fw.close();
                                        }
                                        catch (Throwable throwable3) {
                                            throwable.addSuppressed(throwable3);
                                        }
                                        continue;
                                    }
                                    fw.close();
                                }
                            }
                            updateOverview = true;
                        }
                        catch (IOException e) {
                            LOG.error("Failure while fetching/processing job archive for job {}.", (Object)jobID, (Object)e);
                            this.cachedArchives.remove(jobID);
                            try {
                                Files.delete(new File(this.webOverviewDir, jobID + JSON_FILE_ENDING).toPath());
                            }
                            catch (IOException ioe) {
                                LOG.debug("Could not delete file from overview directory.", (Throwable)ioe);
                            }
                            File jobDirectory = new File(this.webJobDir, jobID);
                            try {
                                FileUtils.deleteDirectory((File)jobDirectory);
                            }
                            catch (IOException ioe) {
                                LOG.debug("Could not clean up job directory.", (Throwable)ioe);
                            }
                        }
                    }
                    if (!updateOverview) continue;
                    HistoryServerArchiveFetcher.updateJobOverview(this.webOverviewDir, this.webDir);
                }
            }
            catch (Exception e) {
                LOG.error("Critical failure while fetching/processing job archives.", (Throwable)e);
            }
            this.numFinishedPolls.countDown();
        }
    }
}

