package org.apache.flink.runtime.state;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Reference;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.class */
public class TaskExecutorLocalStateStoresManager {
    private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorLocalStateStoresManager.class);
    public static final String ALLOCATION_DIR_PREFIX = "aid_";

    @GuardedBy("lock")
    private final Map<AllocationID, Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore>> taskStateStoresByAllocationID;
    private final boolean localRecoveryEnabled;
    private final boolean localBackupEnabled;
    private final Reference<File[]> localStateRootDirectories;
    private final Executor discardExecutor;
    private final Object lock;
    private final Thread shutdownHook;

    @GuardedBy("lock")
    private boolean closed;

    /* loaded from: input_file:org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager$JobVertexSubtaskKey.class */
    private static final class JobVertexSubtaskKey {

        @Nonnull
        final JobVertexID jobVertexID;

        @Nonnegative
        final int subtaskIndex;

        JobVertexSubtaskKey(@Nonnull JobVertexID jobVertexID, @Nonnegative int i) {
            this.jobVertexID = jobVertexID;
            this.subtaskIndex = i;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            JobVertexSubtaskKey jobVertexSubtaskKey = (JobVertexSubtaskKey) obj;
            return this.subtaskIndex == jobVertexSubtaskKey.subtaskIndex && this.jobVertexID.equals(jobVertexSubtaskKey.jobVertexID);
        }

        public int hashCode() {
            return (31 * this.jobVertexID.hashCode()) + this.subtaskIndex;
        }
    }

    public TaskExecutorLocalStateStoresManager(boolean z, boolean z2, @Nonnull Reference<File[]> reference, @Nonnull Executor executor) throws IOException {
        LOG.debug("Start {} with local state root directories {}.", getClass().getSimpleName(), reference);
        this.taskStateStoresByAllocationID = new HashMap();
        this.localRecoveryEnabled = z;
        this.localBackupEnabled = z2;
        this.localStateRootDirectories = reference;
        this.discardExecutor = executor;
        this.lock = new Object();
        this.closed = false;
        for (File file : (File[]) reference.deref()) {
            if (!file.exists() && !file.mkdirs() && !file.exists()) {
                throw new IOException("Could not create root directory for local recovery: " + file);
            }
        }
        this.shutdownHook = ShutdownHookUtil.addShutdownHook(this::shutdown, getClass().getSimpleName(), LOG);
    }

    @Nonnull
    public TaskLocalStateStore localStateStoreForSubtask(@Nonnull JobID jobID, @Nonnull AllocationID allocationID, @Nonnull JobVertexID jobVertexID, @Nonnegative int i, Configuration configuration, Configuration configuration2) {
        OwnedTaskLocalStateStore ownedTaskLocalStateStore;
        synchronized (this.lock) {
            if (this.closed) {
                throw new IllegalStateException("TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.");
            }
            Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore> map = this.taskStateStoresByAllocationID.get(allocationID);
            if (map == null) {
                map = new HashMap();
                this.taskStateStoresByAllocationID.put(allocationID, map);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Registered new allocation id {} for local state stores for job {}.", allocationID.toHexString(), jobID);
                }
            }
            JobVertexSubtaskKey jobVertexSubtaskKey = new JobVertexSubtaskKey(jobVertexID, i);
            OwnedTaskLocalStateStore ownedTaskLocalStateStore2 = map.get(jobVertexSubtaskKey);
            if (ownedTaskLocalStateStore2 == null) {
                LocalSnapshotDirectoryProviderImpl localSnapshotDirectoryProviderImpl = null;
                if (this.localRecoveryEnabled || this.localBackupEnabled) {
                    localSnapshotDirectoryProviderImpl = new LocalSnapshotDirectoryProviderImpl(allocationBaseDirectories(allocationID), jobID, jobVertexID, i);
                }
                LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(this.localRecoveryEnabled, this.localBackupEnabled, localSnapshotDirectoryProviderImpl);
                ownedTaskLocalStateStore2 = (localRecoveryConfig.isLocalRecoveryOrLocalBackupEnabled() && ((Boolean) configuration2.getOptional(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG).orElse(configuration.get(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG))).booleanValue()) ? new ChangelogTaskLocalStateStore(jobID, allocationID, jobVertexID, i, localRecoveryConfig, this.discardExecutor) : localRecoveryConfig.isLocalRecoveryOrLocalBackupEnabled() ? new TaskLocalStateStoreImpl(jobID, allocationID, jobVertexID, i, localRecoveryConfig, this.discardExecutor) : new NoOpTaskLocalStateStoreImpl(localRecoveryConfig);
                map.put(jobVertexSubtaskKey, ownedTaskLocalStateStore2);
                LOG.debug("Registered new local state store with configuration {} for {} - {} - {} under allocation id {}.", new Object[]{localRecoveryConfig, jobID, jobVertexID, Integer.valueOf(i), allocationID});
            } else {
                LOG.debug("Found existing local state store for {} - {} - {} under allocation id {}: {}", new Object[]{jobID, jobVertexID, Integer.valueOf(i), allocationID, ownedTaskLocalStateStore2});
            }
            ownedTaskLocalStateStore = ownedTaskLocalStateStore2;
        }
        return ownedTaskLocalStateStore;
    }

    public void releaseLocalStateForAllocationId(@Nonnull AllocationID allocationID) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Releasing local state under allocation id {}.", allocationID);
        }
        synchronized (this.lock) {
            if (this.closed) {
                return;
            }
            Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore> remove = this.taskStateStoresByAllocationID.remove(allocationID);
            if (remove != null) {
                doRelease(remove.values());
            }
            cleanupAllocationBaseDirs(allocationID);
        }
    }

    public void retainLocalStateForAllocations(Set<AllocationID> set) {
        findStoredAllocations().stream().filter(allocationID -> {
            return !set.contains(allocationID);
        }).forEach(this::releaseLocalStateForAllocationId);
    }

    private Collection<AllocationID> findStoredAllocations() {
        HashSet hashSet = new HashSet();
        for (File file : (File[]) this.localStateRootDirectories.deref()) {
            try {
                Iterator<Path> it = listAllocationDirectoriesIn(file).iterator();
                while (it.hasNext()) {
                    hashSet.add(AllocationID.fromHexString(it.next().getFileName().toString().substring(ALLOCATION_DIR_PREFIX.length())));
                }
            } catch (IOException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Could not list local state directory {}. This entails that some orphaned local state might not be cleaned up properly.", file, e);
                } else {
                    LOG.info("Could not list local state directory {}. This entails that some orphaned local state might not be cleaned up properly.", file);
                }
            }
        }
        return hashSet;
    }

    @VisibleForTesting
    @Nonnull
    static Collection<Path> listAllocationDirectoriesIn(File file) throws IOException {
        Stream<Path> list = Files.list(file.toPath());
        Throwable th = null;
        try {
            Collection<Path> collection = (Collection) list.filter(path -> {
                return path.getFileName().toString().startsWith(ALLOCATION_DIR_PREFIX);
            }).collect(Collectors.toList());
            if (list != null) {
                if (0 != 0) {
                    try {
                        list.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    list.close();
                }
            }
            return collection;
        } catch (Throwable th3) {
            if (list != null) {
                if (0 != 0) {
                    try {
                        list.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    list.close();
                }
            }
            throw th3;
        }
    }

    public void shutdown() {
        synchronized (this.lock) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.taskStateStoresByAllocationID.clear();
            LOG.info("Shutting down TaskExecutorLocalStateStoresManager.");
            ShutdownHookUtil.removeShutdownHook(this.shutdownHook, getClass().getSimpleName(), LOG);
            if (this.localStateRootDirectories.isOwned()) {
                for (File file : (File[]) this.localStateRootDirectories.deref()) {
                    try {
                        FileUtils.deleteDirectory(file);
                    } catch (IOException e) {
                        LOG.warn("Could not delete local state directory {}.", file, e);
                    }
                }
            }
        }
    }

    @VisibleForTesting
    boolean isLocalRecoveryEnabled() {
        return this.localRecoveryEnabled;
    }

    @VisibleForTesting
    File[] getLocalStateRootDirectories() {
        return (File[]) this.localStateRootDirectories.deref();
    }

    @VisibleForTesting
    String allocationSubDirString(AllocationID allocationID) {
        return ALLOCATION_DIR_PREFIX + allocationID.toHexString();
    }

    private File[] allocationBaseDirectories(AllocationID allocationID) {
        String allocationSubDirString = allocationSubDirString(allocationID);
        File[] fileArr = (File[]) this.localStateRootDirectories.deref();
        File[] fileArr2 = new File[fileArr.length];
        for (int i = 0; i < fileArr.length; i++) {
            fileArr2[i] = new File(fileArr[i], allocationSubDirString);
        }
        return fileArr2;
    }

    private void doRelease(Iterable<OwnedTaskLocalStateStore> iterable) {
        if (iterable != null) {
            for (OwnedTaskLocalStateStore ownedTaskLocalStateStore : iterable) {
                try {
                    ownedTaskLocalStateStore.dispose();
                } catch (Exception e) {
                    LOG.warn("Exception while disposing local state store {}.", ownedTaskLocalStateStore, e);
                }
            }
        }
    }

    private void cleanupAllocationBaseDirs(AllocationID allocationID) {
        for (File file : allocationBaseDirectories(allocationID)) {
            try {
                FileUtils.deleteFileOrDirectory(file);
            } catch (IOException e) {
                LOG.warn("Exception while deleting local state directory for allocation id {}.", allocationID, e);
            }
        }
    }
}
