/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.ChangelogTaskLocalStateStore;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalSnapshotDirectoryProviderImpl;
import org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl;
import org.apache.flink.runtime.state.OwnedTaskLocalStateStore;
import org.apache.flink.runtime.state.TaskLocalStateStore;
import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Reference;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskExecutorLocalStateStoresManager {
    private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorLocalStateStoresManager.class);
    public static final String ALLOCATION_DIR_PREFIX = "aid_";
    @GuardedBy(value="lock")
    private final Map<AllocationID, Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore>> taskStateStoresByAllocationID;
    private final boolean localRecoveryEnabled;
    private final boolean localBackupEnabled;
    private final Reference<File[]> localStateRootDirectories;
    private final Executor discardExecutor;
    private final Object lock;
    private final Thread shutdownHook;
    @GuardedBy(value="lock")
    private boolean closed;

    public TaskExecutorLocalStateStoresManager(boolean localRecoveryEnabled, boolean localBackupEnabled, @Nonnull Reference<File[]> localStateRootDirectories, @Nonnull Executor discardExecutor) throws IOException {
        LOG.debug("Start {} with local state root directories {}.", (Object)this.getClass().getSimpleName(), localStateRootDirectories);
        this.taskStateStoresByAllocationID = new HashMap<AllocationID, Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore>>();
        this.localRecoveryEnabled = localRecoveryEnabled;
        this.localBackupEnabled = localBackupEnabled;
        this.localStateRootDirectories = localStateRootDirectories;
        this.discardExecutor = discardExecutor;
        this.lock = new Object();
        this.closed = false;
        for (File localStateRecoveryRootDir : localStateRootDirectories.deref()) {
            if (localStateRecoveryRootDir.exists() || localStateRecoveryRootDir.mkdirs() || localStateRecoveryRootDir.exists()) continue;
            throw new IOException("Could not create root directory for local recovery: " + String.valueOf(localStateRecoveryRootDir));
        }
        this.shutdownHook = ShutdownHookUtil.addShutdownHook(this::shutdown, this.getClass().getSimpleName(), LOG);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nonnull
    public TaskLocalStateStore localStateStoreForSubtask(@Nonnull JobID jobId, @Nonnull AllocationID allocationID, @Nonnull JobVertexID jobVertexID, @Nonnegative int subtaskIndex, Configuration clusterConfiguration, Configuration jobConfiguration) {
        Object object = this.lock;
        synchronized (object) {
            JobVertexSubtaskKey taskKey;
            OwnedTaskLocalStateStore taskLocalStateStore;
            if (this.closed) {
                throw new IllegalStateException("TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.");
            }
            Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore> taskStateManagers = this.taskStateStoresByAllocationID.get(allocationID);
            if (taskStateManagers == null) {
                taskStateManagers = new HashMap<JobVertexSubtaskKey, OwnedTaskLocalStateStore>();
                this.taskStateStoresByAllocationID.put(allocationID, taskStateManagers);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Registered new allocation id {} for local state stores for job {}.", (Object)allocationID.toHexString(), (Object)jobId);
                }
            }
            if ((taskLocalStateStore = taskStateManagers.get(taskKey = new JobVertexSubtaskKey(jobVertexID, subtaskIndex))) == null) {
                LocalSnapshotDirectoryProviderImpl directoryProvider = null;
                if (this.localRecoveryEnabled || this.localBackupEnabled) {
                    File[] allocationBaseDirectories = this.allocationBaseDirectories(allocationID);
                    directoryProvider = new LocalSnapshotDirectoryProviderImpl(allocationBaseDirectories, jobId, jobVertexID, subtaskIndex);
                }
                LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(this.localRecoveryEnabled, this.localBackupEnabled, directoryProvider);
                boolean changelogEnabled = jobConfiguration.getOptional(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG).orElse(clusterConfiguration.get(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG));
                taskLocalStateStore = localRecoveryConfig.isLocalRecoveryOrLocalBackupEnabled() && changelogEnabled ? new ChangelogTaskLocalStateStore(jobId, allocationID, jobVertexID, subtaskIndex, localRecoveryConfig, this.discardExecutor) : (localRecoveryConfig.isLocalRecoveryOrLocalBackupEnabled() ? new TaskLocalStateStoreImpl(jobId, allocationID, jobVertexID, subtaskIndex, localRecoveryConfig, this.discardExecutor) : new NoOpTaskLocalStateStoreImpl(localRecoveryConfig));
                taskStateManagers.put(taskKey, taskLocalStateStore);
                LOG.debug("Registered new local state store with configuration {} for {} - {} - {} under allocation id {}.", new Object[]{localRecoveryConfig, jobId, jobVertexID, subtaskIndex, allocationID});
            } else {
                LOG.debug("Found existing local state store for {} - {} - {} under allocation id {}: {}", new Object[]{jobId, jobVertexID, subtaskIndex, allocationID, taskLocalStateStore});
            }
            return taskLocalStateStore;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void releaseLocalStateForAllocationId(@Nonnull AllocationID allocationID) {
        Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore> cleanupLocalStores;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Releasing local state under allocation id {}.", (Object)allocationID);
        }
        Object object = this.lock;
        synchronized (object) {
            if (this.closed) {
                return;
            }
            cleanupLocalStores = this.taskStateStoresByAllocationID.remove(allocationID);
        }
        if (cleanupLocalStores != null) {
            this.doRelease(cleanupLocalStores.values());
        }
        this.cleanupAllocationBaseDirs(allocationID);
    }

    public void retainLocalStateForAllocations(Set<AllocationID> allocationsToRetain) {
        Collection<AllocationID> allocationIds = this.findStoredAllocations();
        allocationIds.stream().filter(allocationId -> !allocationsToRetain.contains(allocationId)).forEach(this::releaseLocalStateForAllocationId);
    }

    private Collection<AllocationID> findStoredAllocations() {
        HashSet<AllocationID> storedAllocations = new HashSet<AllocationID>();
        for (File localStateRootDirectory : this.localStateRootDirectories.deref()) {
            try {
                Collection<Path> allocationDirectories = TaskExecutorLocalStateStoresManager.listAllocationDirectoriesIn(localStateRootDirectory);
                for (Path allocationDirectory : allocationDirectories) {
                    String hexString = allocationDirectory.getFileName().toString().substring(ALLOCATION_DIR_PREFIX.length());
                    storedAllocations.add(AllocationID.fromHexString(hexString));
                }
            }
            catch (IOException ioe) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Could not list local state directory {}. This entails that some orphaned local state might not be cleaned up properly.", (Object)localStateRootDirectory, (Object)ioe);
                    continue;
                }
                LOG.info("Could not list local state directory {}. This entails that some orphaned local state might not be cleaned up properly.", (Object)localStateRootDirectory);
            }
        }
        return storedAllocations;
    }

    @Nonnull
    @VisibleForTesting
    static Collection<Path> listAllocationDirectoriesIn(File localStateRootDirectory) throws IOException {
        try (Stream<Path> fileListStream = Files.list(localStateRootDirectory.toPath());){
            Collection collection = fileListStream.filter(path -> path.getFileName().toString().startsWith(ALLOCATION_DIR_PREFIX)).collect(Collectors.toList());
            return collection;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        File[] fileArray = this.lock;
        synchronized (this.lock) {
            if (this.closed) {
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return;
            }
            this.closed = true;
            this.taskStateStoresByAllocationID.clear();
            // ** MonitorExit[var1_1] (shouldn't be in output)
            LOG.info("Shutting down TaskExecutorLocalStateStoresManager.");
            ShutdownHookUtil.removeShutdownHook(this.shutdownHook, this.getClass().getSimpleName(), LOG);
            if (this.localStateRootDirectories.isOwned()) {
                for (File localStateRootDirectory : this.localStateRootDirectories.deref()) {
                    try {
                        FileUtils.deleteDirectory(localStateRootDirectory);
                    }
                    catch (IOException ioe) {
                        LOG.warn("Could not delete local state directory {}.", (Object)localStateRootDirectory, (Object)ioe);
                    }
                }
            }
            return;
        }
    }

    @VisibleForTesting
    boolean isLocalRecoveryEnabled() {
        return this.localRecoveryEnabled;
    }

    @VisibleForTesting
    File[] getLocalStateRootDirectories() {
        return this.localStateRootDirectories.deref();
    }

    @VisibleForTesting
    String allocationSubDirString(AllocationID allocationID) {
        return ALLOCATION_DIR_PREFIX + allocationID.toHexString();
    }

    private File[] allocationBaseDirectories(AllocationID allocationID) {
        String allocationSubDirString = this.allocationSubDirString(allocationID);
        File[] derefLocalStateRootDirectories = this.localStateRootDirectories.deref();
        File[] allocationDirectories = new File[derefLocalStateRootDirectories.length];
        for (int i = 0; i < derefLocalStateRootDirectories.length; ++i) {
            allocationDirectories[i] = new File(derefLocalStateRootDirectories[i], allocationSubDirString);
        }
        return allocationDirectories;
    }

    private void doRelease(Iterable<OwnedTaskLocalStateStore> toRelease) {
        if (toRelease != null) {
            for (OwnedTaskLocalStateStore stateStore : toRelease) {
                try {
                    stateStore.dispose();
                }
                catch (Exception disposeEx) {
                    LOG.warn("Exception while disposing local state store {}.", (Object)stateStore, (Object)disposeEx);
                }
            }
        }
    }

    private void cleanupAllocationBaseDirs(AllocationID allocationID) {
        File[] allocationDirectories;
        for (File directory : allocationDirectories = this.allocationBaseDirectories(allocationID)) {
            try {
                FileUtils.deleteFileOrDirectory(directory);
            }
            catch (IOException e) {
                LOG.warn("Exception while deleting local state directory for allocation id {}.", (Object)allocationID, (Object)e);
            }
        }
    }

    private static final class JobVertexSubtaskKey {
        @Nonnull
        final JobVertexID jobVertexID;
        @Nonnegative
        final int subtaskIndex;

        JobVertexSubtaskKey(@Nonnull JobVertexID jobVertexID, @Nonnegative int subtaskIndex) {
            this.jobVertexID = jobVertexID;
            this.subtaskIndex = subtaskIndex;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            JobVertexSubtaskKey that = (JobVertexSubtaskKey)o;
            return this.subtaskIndex == that.subtaskIndex && this.jobVertexID.equals(that.jobVertexID);
        }

        public int hashCode() {
            int result = this.jobVertexID.hashCode();
            result = 31 * result + this.subtaskIndex;
            return result;
        }
    }
}

