/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.core.DistributedObject;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ResourceConfig;
import com.hazelcast.jet.core.JetProperties;
import com.hazelcast.jet.core.JobNotFoundException;
import com.hazelcast.jet.impl.JobExecutionRecord;
import com.hazelcast.jet.impl.JobRecord;
import com.hazelcast.jet.impl.JobResult;
import com.hazelcast.jet.impl.MasterContext;
import com.hazelcast.jet.impl.SnapshotValidationRecord;
import com.hazelcast.jet.impl.deployment.IMapOutputStream;
import com.hazelcast.jet.impl.execution.init.JetInitDataSerializerHook;
import com.hazelcast.jet.impl.metrics.RawJobMetrics;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.IOUtil;
import com.hazelcast.jet.impl.util.ImdgUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.EntryProcessor;
import com.hazelcast.map.IMap;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.query.Predicate;
import com.hazelcast.spi.impl.NodeEngine;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.LambdaMetafactory;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.jar.JarEntry;
import java.util.jar.JarInputStream;
import java.util.stream.Collectors;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class JobRepository {
    public static final String INTERNAL_JET_OBJECTS_PREFIX = "__jet.";
    public static final String EXPORTED_SNAPSHOTS_PREFIX = "__jet.exportedSnapshot.";
    public static final String EXPORTED_SNAPSHOTS_DETAIL_CACHE = "__jet.exportedSnapshotsCache";
    public static final String RESOURCES_MAP_NAME_PREFIX = "__jet.resources.";
    public static final String FILE_STORAGE_KEY_NAME_PREFIX = "f.";
    public static final String CLASS_STORAGE_KEY_NAME_PREFIX = "c.";
    public static final String RANDOM_ID_GENERATOR_NAME = "__jet.ids";
    public static final String JOB_RECORDS_MAP_NAME = "__jet.records";
    public static final String JOB_EXECUTION_RECORDS_MAP_NAME = "__jet.executionRecords";
    public static final String JOB_RESULTS_MAP_NAME = "__jet.results";
    public static final String JOB_METRICS_MAP_NAME = "__jet.results.metrics";
    public static final String SNAPSHOT_DATA_MAP_PREFIX = "__jet.snapshot.";
    private static final int MAX_NO_RESULTS_OVERHEAD = 20;
    private static final long DEFAULT_RESOURCES_EXPIRATION_MILLIS = TimeUnit.HOURS.toMillis(2L);
    private static final int JOB_ID_STRING_LENGTH = com.hazelcast.jet.Util.idToString(0L).length();
    private final HazelcastInstance instance;
    private final ILogger logger;
    private final IMap<Long, JobRecord> jobRecords;
    private final IMap<Long, JobExecutionRecord> jobExecutionRecords;
    private final IMap<Long, JobResult> jobResults;
    private final IMap<Long, List<RawJobMetrics>> jobMetrics;
    private final IMap<String, SnapshotValidationRecord> exportedSnapshotDetailsCache;
    private final FlakeIdGenerator idGenerator;
    private long resourcesExpirationMillis = DEFAULT_RESOURCES_EXPIRATION_MILLIS;

    public JobRepository(JetInstance jetInstance) {
        this.instance = jetInstance.getHazelcastInstance();
        this.logger = this.instance.getLoggingService().getLogger(this.getClass());
        this.idGenerator = this.instance.getFlakeIdGenerator(RANDOM_ID_GENERATOR_NAME);
        this.jobRecords = this.instance.getMap(JOB_RECORDS_MAP_NAME);
        this.jobExecutionRecords = this.instance.getMap(JOB_EXECUTION_RECORDS_MAP_NAME);
        this.jobResults = this.instance.getMap(JOB_RESULTS_MAP_NAME);
        this.jobMetrics = this.instance.getMap(JOB_METRICS_MAP_NAME);
        this.exportedSnapshotDetailsCache = this.instance.getMap(EXPORTED_SNAPSHOTS_DETAIL_CACHE);
    }

    void setResourcesExpirationMillis(long resourcesExpirationMillis) {
        this.resourcesExpirationMillis = resourcesExpirationMillis;
    }

    /*
     * Unable to fully structure code
     */
    @SuppressFBWarnings(value={"RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE"}, justification="it's a false positive since java 11: https://github.com/spotbugs/spotbugs/issues/756")
    long uploadJobResources(JobConfig jobConfig) {
        jobId = this.newJobId();
        tmpMap = new HashMap<String, byte[]>();
        try {
            jobFileStorage = Util.memoize((Supplier<IMap>)LambdaMetafactory.metafactory(null, null, null, ()Ljava/lang/Object;, lambda$uploadJobResources$0(long ), ()Lcom/hazelcast/map/IMap;)((JobRepository)this, (long)jobId));
            block44: for (ResourceConfig rc : jobConfig.getResourceConfigs().values()) {
                switch (1.$SwitchMap$com$hazelcast$jet$config$ResourceType[rc.getResourceType().ordinal()]) {
                    case 1: 
                    case 2: {
                        in = rc.getUrl().openStream();
                        var9_10 = null;
                        this.readStreamAndPutCompressedToMap(rc.getId(), tmpMap, in);
                        if (in == null) continue block44;
                        if (var9_10 == null) ** GOTO lbl20
                        try {
                            in.close();
                        }
                        catch (Throwable var10_12) {
                            var9_10.addSuppressed(var10_12);
                        }
                        break;
lbl20:
                        // 1 sources

                        in.close();
                        break;
                        catch (Throwable var10_13) {
                            try {
                                var9_10 = var10_13;
                                throw var10_13;
                            }
                            catch (Throwable var11_17) {
                                if (in != null) {
                                    if (var9_10 != null) {
                                        try {
                                            in.close();
                                        }
                                        catch (Throwable var12_21) {
                                            var9_10.addSuppressed(var12_21);
                                        }
                                    } else {
                                        in.close();
                                    }
                                }
                                throw var11_17;
                            }
                        }
                    }
                    case 3: {
                        in = rc.getUrl().openStream();
                        var9_10 = null;
                        os = new IMapOutputStream(jobFileStorage.get(), JobRepository.fileKeyName(rc.getId()));
                        var11_16 = null;
                        try {
                            IOUtil.packStreamIntoZip(in, os, Objects.requireNonNull(IOUtil.fileNameFromUrl(rc.getUrl())));
                        }
                        catch (Throwable var12_22) {
                            var11_16 = var12_22;
                            throw var12_22;
                        }
                        finally {
                            if (os != null) {
                                if (var11_16 != null) {
                                    try {
                                        os.close();
                                    }
                                    catch (Throwable var12_20) {
                                        var11_16.addSuppressed(var12_20);
                                    }
                                } else {
                                    os.close();
                                }
                            }
                        }
                        if (in == null) continue block44;
                        if (var9_10 == null) ** GOTO lbl68
                        try {
                            in.close();
                        }
                        catch (Throwable var10_14) {
                            var9_10.addSuppressed(var10_14);
                        }
                        break;
lbl68:
                        // 1 sources

                        in.close();
                        break;
                        catch (Throwable var10_15) {
                            try {
                                var9_10 = var10_15;
                                throw var10_15;
                            }
                            catch (Throwable var15_25) {
                                if (in != null) {
                                    if (var9_10 != null) {
                                        try {
                                            in.close();
                                        }
                                        catch (Throwable var16_26) {
                                            var9_10.addSuppressed(var16_26);
                                        }
                                    } else {
                                        in.close();
                                    }
                                }
                                throw var15_25;
                            }
                        }
                    }
                    case 4: {
                        baseDir = this.validateAndGetDirectoryPath(rc);
                        os = new IMapOutputStream(jobFileStorage.get(), JobRepository.fileKeyName(rc.getId()));
                        var10_11 = null;
                        IOUtil.packDirectoryIntoZip(baseDir, os);
                        if (os == null) continue block44;
                        if (var10_11 == null) ** GOTO lbl99
                        try {
                            os.close();
                        }
                        catch (Throwable var11_18) {
                            var10_11.addSuppressed(var11_18);
                        }
                        break;
lbl99:
                        // 1 sources

                        os.close();
                        break;
                        catch (Throwable var11_19) {
                            try {
                                var10_11 = var11_19;
                                throw var11_19;
                            }
                            catch (Throwable var17_27) {
                                if (os != null) {
                                    if (var10_11 != null) {
                                        try {
                                            os.close();
                                        }
                                        catch (Throwable var18_28) {
                                            var10_11.addSuppressed(var18_28);
                                        }
                                    } else {
                                        os.close();
                                    }
                                }
                                throw var17_27;
                            }
                        }
                    }
                    case 5: {
                        this.loadJar(tmpMap, rc);
                        break;
                    }
                    case 6: {
                        this.loadJarsInZip(tmpMap, rc.getUrl());
                        break;
                    }
                    default: {
                        throw new JetException("Unsupported resource type: " + (Object)rc.getResourceType());
                    }
                }
            }
        }
        catch (IOException | URISyntaxException e) {
            throw new JetException("Job resource upload failed", e);
        }
        if (tmpMap.size() > 0) {
            jobResourcesMap = this.getJobResources(jobId);
            try {
                jobResourcesMap.putAll(tmpMap);
            }
            catch (Exception e) {
                jobResourcesMap.destroy();
                throw new JetException("Job resource upload failed", e);
            }
        }
        return jobId;
    }

    private Path validateAndGetDirectoryPath(ResourceConfig rc) throws URISyntaxException, IOException {
        Path baseDir = Paths.get(rc.getUrl().toURI());
        if (!Files.isDirectory(baseDir, new LinkOption[0])) {
            throw new FileNotFoundException(baseDir + " is not a valid directory");
        }
        return baseDir;
    }

    private long newJobId() {
        return this.idGenerator.newId();
    }

    @SuppressFBWarnings(value={"RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE"}, justification="it's a false positive since java 11: https://github.com/spotbugs/spotbugs/issues/756")
    private void loadJar(Map<String, byte[]> tmpMap, ResourceConfig rc) throws IOException {
        try (InputStream in = rc.getUrl().openStream();){
            this.loadJarFromInputStream(tmpMap, in);
        }
    }

    private void loadJarsInZip(Map<String, byte[]> map, URL url) throws IOException {
        try (ZipInputStream zis = new ZipInputStream(new BufferedInputStream(url.openStream()));){
            ZipEntry zipEntry;
            while ((zipEntry = zis.getNextEntry()) != null) {
                if (zipEntry.isDirectory() || !zipEntry.getName().toLowerCase().endsWith(".jar")) continue;
                this.loadJarFromInputStream(map, zis);
            }
        }
    }

    private void loadJarFromInputStream(Map<String, byte[]> map, InputStream is) throws IOException {
        JarEntry jarEntry;
        JarInputStream jis = new JarInputStream(is);
        while ((jarEntry = jis.getNextJarEntry()) != null) {
            if (jarEntry.isDirectory()) continue;
            this.readStreamAndPutCompressedToMap(jarEntry.getName(), map, jis);
        }
    }

    private void readStreamAndPutCompressedToMap(String resourceName, Map<String, byte[]> map, InputStream in) throws IOException {
        if (map.containsKey(resourceName)) {
            return;
        }
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try (DeflaterOutputStream compressor = new DeflaterOutputStream(baos);){
            com.hazelcast.internal.nio.IOUtil.drainTo(in, compressor);
        }
        map.put(JobRepository.classKeyName(resourceName), baos.toByteArray());
    }

    void putNewJobRecord(JobRecord jobRecord) {
        long jobId = jobRecord.getJobId();
        JobRecord prev = this.jobRecords.putIfAbsent(jobId, jobRecord);
        if (prev != null && !prev.getDag().equals(jobRecord.getDag())) {
            throw new IllegalStateException("Cannot put job record for job " + com.hazelcast.jet.Util.idToString(jobId) + " because it already exists with a different DAG");
        }
    }

    void updateJobQuorumSizeIfSmaller(long jobId, int newQuorumSize) {
        this.jobExecutionRecords.executeOnKey(jobId, ImdgUtil.entryProcessor((key, value) -> {
            if (value == null) {
                return null;
            }
            value.setLargerQuorumSize(newQuorumSize);
            return value;
        }));
    }

    long newExecutionId() {
        return this.idGenerator.newId();
    }

    void completeJob(@Nonnull MasterContext masterContext, @Nullable List<RawJobMetrics> terminalMetrics, @Nullable Throwable error) {
        JobResult prev;
        List<RawJobMetrics> prevMetrics;
        long jobId = masterContext.jobId();
        JobRecord jobRecord = this.getJobRecord(jobId);
        if (jobRecord == null) {
            throw new JobNotFoundException(jobId);
        }
        JobConfig config = jobRecord.getConfig();
        long creationTime = jobRecord.getCreationTime();
        JobResult jobResult = new JobResult(jobId, config, creationTime, System.currentTimeMillis(), JobRepository.toErrorMsg(error));
        if (terminalMetrics != null && (prevMetrics = this.jobMetrics.put(jobId, terminalMetrics)) != null) {
            this.logger.warning("Overwriting job metrics for job " + jobResult);
        }
        if ((prev = this.jobResults.putIfAbsent(jobId, jobResult)) != null) {
            throw new IllegalStateException("Job result already exists in the " + this.jobResults.getName() + " map:\nprevious record: " + prev + "\nnew record: " + jobResult);
        }
        this.deleteJob(jobId);
    }

    void deleteJob(long jobId) {
        this.jobExecutionRecords.remove(jobId);
        this.jobRecords.remove(jobId);
    }

    void cleanup(NodeEngine nodeEngine) {
        long start = System.nanoTime();
        this.cleanupMaps(nodeEngine);
        this.cleanupJobResults(nodeEngine);
        long elapsed = System.nanoTime() - start;
        this.logger.fine("Job cleanup took " + TimeUnit.NANOSECONDS.toMillis(elapsed) + "ms");
    }

    private void cleanupMaps(NodeEngine nodeEngine) {
        Collection<DistributedObject> maps = nodeEngine.getProxyService().getDistributedObjects("hz:impl:mapService");
        Set<Long> activeJobs = this.jobRecords.keySet();
        for (DistributedObject map : maps) {
            if (map.getName().startsWith(SNAPSHOT_DATA_MAP_PREFIX)) {
                long id = JobRepository.jobIdFromPrefixedName(map.getName(), SNAPSHOT_DATA_MAP_PREFIX);
                if (activeJobs.contains(id)) continue;
                LoggingUtil.logFine(this.logger, "Deleting snapshot data map '%s' because job already finished", map.getName());
                map.destroy();
                continue;
            }
            if (!map.getName().startsWith(RESOURCES_MAP_NAME_PREFIX)) continue;
            this.deleteMap(activeJobs, map);
        }
    }

    private void deleteMap(Set<Long> activeJobs, DistributedObject map) {
        long id = JobRepository.jobIdFromPrefixedName(map.getName(), RESOURCES_MAP_NAME_PREFIX);
        if (activeJobs.contains(id)) {
            return;
        }
        if (this.jobResults.containsKey(id)) {
            LoggingUtil.logFine(this.logger, "Deleting job resource map '%s' because job is already finished", map.getName());
            map.destroy();
        } else {
            IMap resourceMap = (IMap)map;
            long creationTime = resourceMap.getLocalMapStats().getCreationTime();
            if (this.isResourceMapExpired(creationTime)) {
                this.logger.fine("Deleting job resource map " + map.getName() + " because the map was created long ago and job record or result still doesn't exist");
                resourceMap.destroy();
            }
        }
    }

    private void cleanupJobResults(NodeEngine nodeEngine) {
        int maxNoResults = Math.max(1, nodeEngine.getProperties().getInteger(JetProperties.JOB_RESULTS_MAX_SIZE));
        if ((long)this.jobResults.size() > Util.addClamped(maxNoResults, maxNoResults / 20)) {
            this.jobResults.values().stream().sorted(Comparator.comparing(JobResult::getCompletionTime).reversed()).skip(maxNoResults).map(JobResult::getJobId).collect(Collectors.toList()).forEach(id -> {
                this.jobMetrics.delete(id);
                this.jobResults.delete(id);
            });
        }
    }

    private static String toErrorMsg(@Nullable Throwable error) {
        if (error == null) {
            return null;
        }
        if (error.getClass().equals(JetException.class) && error.getMessage() != null) {
            String stackTrace = ExceptionUtil.stackTraceToString(error);
            return stackTrace.substring(stackTrace.indexOf(32) + 1);
        }
        return ExceptionUtil.stackTraceToString(error);
    }

    private static long jobIdFromPrefixedName(String name, String prefix) {
        int idx = prefix.length();
        String jobId = name.substring(idx, idx + JOB_ID_STRING_LENGTH);
        return com.hazelcast.jet.Util.idFromString(jobId);
    }

    private boolean isResourceMapExpired(long creationTime) {
        return System.currentTimeMillis() - creationTime >= this.resourcesExpirationMillis;
    }

    Set<Long> getAllJobIds() {
        HashSet<Long> ids = new HashSet<Long>();
        ids.addAll(this.jobRecords.keySet());
        ids.addAll(this.jobResults.keySet());
        return ids;
    }

    Collection<JobRecord> getJobRecords() {
        return this.jobRecords.values();
    }

    public JobRecord getJobRecord(long jobId) {
        return this.jobRecords.get(jobId);
    }

    public JobExecutionRecord getJobExecutionRecord(long jobId) {
        return this.jobExecutionRecords.get(jobId);
    }

    public IMap<String, byte[]> getJobResources(long jobId) {
        return this.instance.getMap(JobRepository.jobResourcesMapName(jobId));
    }

    @Nullable
    public JobResult getJobResult(long jobId) {
        return this.jobResults.get(jobId);
    }

    @Nullable
    List<RawJobMetrics> getJobMetrics(long jobId) {
        return this.jobMetrics.get(jobId);
    }

    Collection<JobResult> getJobResults() {
        return this.jobResults.values();
    }

    List<JobResult> getJobResults(String name) {
        return this.jobResults.values(new FilterJobResultByNamePredicate(name)).stream().sorted(Comparator.comparing(JobResult::getCreationTime).reversed()).collect(Collectors.toList());
    }

    void writeJobExecutionRecord(long jobId, JobExecutionRecord record, boolean canCreate) {
        record.updateTimestamp();
        String message = (String)this.jobExecutionRecords.executeOnKey(jobId, new UpdateJobExecutionRecordEntryProcessor(jobId, record, canCreate));
        if (message != null) {
            this.logger.fine(message);
        }
    }

    public static String snapshotDataMapName(long jobId, int dataMapIndex) {
        return SNAPSHOT_DATA_MAP_PREFIX + com.hazelcast.jet.Util.idToString(jobId) + '.' + dataMapIndex;
    }

    public static String jobResourcesMapName(long jobId) {
        return RESOURCES_MAP_NAME_PREFIX + com.hazelcast.jet.Util.idToString(jobId);
    }

    public static String fileKeyName(String id) {
        return FILE_STORAGE_KEY_NAME_PREFIX + id;
    }

    public static String classKeyName(String id) {
        return CLASS_STORAGE_KEY_NAME_PREFIX + id;
    }

    public static String exportedSnapshotMapName(String name) {
        return EXPORTED_SNAPSHOTS_PREFIX + name;
    }

    void clearSnapshotData(long jobId, int dataMapIndex) {
        String mapName = JobRepository.snapshotDataMapName(jobId, dataMapIndex);
        try {
            this.instance.getMap(mapName).clear();
            LoggingUtil.logFine(this.logger, "Cleared snapshot data map %s", mapName);
        }
        catch (Exception logged) {
            this.logger.warning("Cannot delete old snapshot data  " + com.hazelcast.jet.Util.idToString(jobId), logged);
        }
    }

    void cacheValidationRecord(@Nonnull String snapshotName, @Nonnull SnapshotValidationRecord validationRecord) {
        this.exportedSnapshotDetailsCache.set(snapshotName, validationRecord);
    }

    private /* synthetic */ IMap lambda$uploadJobResources$0(long jobId) {
        return this.getJobResources(jobId);
    }

    public static class FilterJobResultByNamePredicate
    implements Predicate<Long, JobResult>,
    IdentifiedDataSerializable {
        private String name;

        public FilterJobResultByNamePredicate() {
        }

        FilterJobResultByNamePredicate(String name) {
            this.name = name;
        }

        @Override
        public boolean apply(Map.Entry<Long, JobResult> entry) {
            return this.name.equals(entry.getValue().getJobConfig().getName());
        }

        @Override
        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        @Override
        public int getClassId() {
            return 18;
        }

        @Override
        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeUTF(this.name);
        }

        @Override
        public void readData(ObjectDataInput in) throws IOException {
            this.name = in.readUTF();
        }
    }

    public static final class UpdateJobExecutionRecordEntryProcessor
    implements EntryProcessor<Long, JobExecutionRecord, Object>,
    IdentifiedDataSerializable {
        private long jobId;
        @SuppressFBWarnings(value={"SE_BAD_FIELD"}, justification="this class is not going to be java-serialized")
        private JobExecutionRecord jobExecutionRecord;
        private boolean canCreate;

        public UpdateJobExecutionRecordEntryProcessor() {
        }

        UpdateJobExecutionRecordEntryProcessor(long jobId, JobExecutionRecord jobExecutionRecord, boolean canCreate) {
            this.jobId = jobId;
            this.jobExecutionRecord = jobExecutionRecord;
            this.canCreate = canCreate;
        }

        @Override
        public Object process(Map.Entry<Long, JobExecutionRecord> entry) {
            if (entry.getValue() == null && !this.canCreate) {
                return "Update to JobRecord for job " + com.hazelcast.jet.Util.idToString(this.jobId) + " ignored, oldValue == null";
            }
            if (entry.getValue() != null && entry.getValue().getTimestamp() >= this.jobExecutionRecord.getTimestamp()) {
                return "Update to JobRecord for job " + com.hazelcast.jet.Util.idToString(this.jobId) + " ignored, newer timestamp found. Stored timestamp=" + entry.getValue().getTimestamp() + ", timestamp of the update=" + this.jobExecutionRecord.getTimestamp();
            }
            entry.setValue(this.jobExecutionRecord);
            return null;
        }

        @Override
        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        @Override
        public int getClassId() {
            return 16;
        }

        @Override
        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeLong(this.jobId);
            out.writeObject(this.jobExecutionRecord);
            out.writeBoolean(this.canCreate);
        }

        @Override
        public void readData(ObjectDataInput in) throws IOException {
            this.jobId = in.readLong();
            this.jobExecutionRecord = (JobExecutionRecord)in.readObject();
            this.canCreate = in.readBoolean();
        }
    }
}

