package org.apache.flink.runtime.checkpoint.filemerging;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.EntropyInjector;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.OutputStreamAndPath;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile;
import org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile;
import org.apache.flink.runtime.checkpoint.filemerging.PhysicalFilePool;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.class */
public abstract class FileMergingSnapshotManagerBase implements FileMergingSnapshotManager {
    private static final Logger LOG = LoggerFactory.getLogger(FileMergingSnapshotManager.class);
    private final String id;
    protected final Executor ioExecutor;
    protected FileSystem fs;
    protected Path checkpointDir;
    protected Path sharedStateDir;
    protected Path taskOwnedStateDir;
    protected int writeBufferSize;
    protected boolean shouldSyncAfterClosingLogicalFile;
    protected long maxPhysicalFileSize;
    protected PhysicalFilePool.Type filePoolType;
    protected Path managedExclusiveStateDir;
    protected DirectoryStreamStateHandle managedExclusiveStateDirHandle;
    protected final Object lock = new Object();

    @GuardedBy("lock")
    protected TreeMap<Long, Set<LogicalFile>> uploadedStates = new TreeMap<>();
    private final Map<LogicalFile.LogicalFileId, LogicalFile> knownLogicalFiles = new ConcurrentHashMap();
    private boolean fileSystemInitiated = false;
    protected PhysicalFile.PhysicalFileDeleter physicalFileDeleter = this::deletePhysicalFile;
    private final Map<FileMergingSnapshotManager.SubtaskKey, Path> managedSharedStateDir = new ConcurrentHashMap();
    private final Map<FileMergingSnapshotManager.SubtaskKey, DirectoryStreamStateHandle> managedSharedStateDirHandles = new ConcurrentHashMap();

    public FileMergingSnapshotManagerBase(String str, long j, PhysicalFilePool.Type type, Executor executor) {
        this.id = str;
        this.maxPhysicalFileSize = j;
        this.filePoolType = type;
        this.ioExecutor = executor;
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public void initFileSystem(FileSystem fileSystem, Path path, Path path2, Path path3, int i) throws IllegalArgumentException {
        if (this.fileSystemInitiated) {
            Preconditions.checkArgument(path.equals(this.checkpointDir), "The checkpoint base dir is not deterministic across subtasks.");
            Preconditions.checkArgument(path2.equals(this.sharedStateDir), "The shared checkpoint dir is not deterministic across subtasks.");
            Preconditions.checkArgument(path3.equals(this.taskOwnedStateDir), "The task-owned checkpoint dir is not deterministic across subtasks.");
            return;
        }
        this.fs = fileSystem;
        this.checkpointDir = (Path) Preconditions.checkNotNull(path);
        this.sharedStateDir = (Path) Preconditions.checkNotNull(path2);
        this.taskOwnedStateDir = (Path) Preconditions.checkNotNull(path3);
        this.fileSystemInitiated = true;
        this.shouldSyncAfterClosingLogicalFile = shouldSyncAfterClosingLogicalFile(fileSystem);
        Path path4 = new Path(path3, this.id);
        createManagedDirectory(path4);
        this.managedExclusiveStateDir = path4;
        this.managedExclusiveStateDirHandle = DirectoryStreamStateHandle.forPathWithZeroSize(new File(path4.getPath()).toPath());
        this.writeBufferSize = i;
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public void registerSubtaskForSharedStates(FileMergingSnapshotManager.SubtaskKey subtaskKey) {
        Path path = new Path(this.sharedStateDir, subtaskKey.getManagedDirName());
        if (this.managedSharedStateDir.containsKey(subtaskKey)) {
            return;
        }
        createManagedDirectory(path);
        this.managedSharedStateDir.put(subtaskKey, path);
        this.managedSharedStateDirHandles.put(subtaskKey, DirectoryStreamStateHandle.forPathWithZeroSize(new File(path.getPath()).toPath()));
    }

    protected LogicalFile createLogicalFile(@Nonnull PhysicalFile physicalFile, long j, long j2, @Nonnull FileMergingSnapshotManager.SubtaskKey subtaskKey) {
        LogicalFile.LogicalFileId generateRandomId = LogicalFile.LogicalFileId.generateRandomId();
        LogicalFile logicalFile = new LogicalFile(generateRandomId, physicalFile, j, j2, subtaskKey);
        this.knownLogicalFiles.put(generateRandomId, logicalFile);
        return logicalFile;
    }

    @Nonnull
    protected PhysicalFile createPhysicalFile(FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope checkpointedStateScope) throws IOException {
        Exception exc = null;
        Path managedDir = getManagedDir(subtaskKey, checkpointedStateScope);
        if (managedDir == null) {
            throw new IOException("Could not get " + checkpointedStateScope + " path for subtask " + subtaskKey + ", the directory may have not been created.");
        }
        for (int i = 0; i < 10; i++) {
            try {
                OutputStreamAndPath createEntropyAware = EntropyInjector.createEntropyAware(this.fs, generatePhysicalFilePath(managedDir), FileSystem.WriteMode.NO_OVERWRITE);
                FSDataOutputStream stream = createEntropyAware.stream();
                Path path = createEntropyAware.path();
                PhysicalFile physicalFile = new PhysicalFile(stream, path, this.physicalFileDeleter, checkpointedStateScope);
                updateFileCreationMetrics(path);
                return physicalFile;
            } catch (Exception e) {
                exc = e;
            }
        }
        throw new IOException("Could not open output stream for state file merging.", exc);
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public FileMergingCheckpointStateOutputStream createCheckpointStateOutputStream(final FileMergingSnapshotManager.SubtaskKey subtaskKey, final long j, final CheckpointedStateScope checkpointedStateScope) {
        return new FileMergingCheckpointStateOutputStream(this.writeBufferSize, new FileMergingCheckpointStateOutputStream.FileMergingSnapshotManagerProxy() { // from class: org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.1
            PhysicalFile physicalFile;
            LogicalFile logicalFile;

            @Override // org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream.FileMergingSnapshotManagerProxy
            public Tuple2<FSDataOutputStream, Path> providePhysicalFile() throws IOException {
                this.physicalFile = FileMergingSnapshotManagerBase.this.getOrCreatePhysicalFileForCheckpoint(subtaskKey, j, checkpointedStateScope);
                return new Tuple2<>(this.physicalFile.getOutputStream(), this.physicalFile.getFilePath());
            }

            @Override // org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream.FileMergingSnapshotManagerProxy
            public SegmentFileStateHandle closeStreamAndCreateStateHandle(Path path, long j2, long j3) throws IOException {
                if (this.physicalFile == null) {
                    return null;
                }
                this.logicalFile = FileMergingSnapshotManagerBase.this.createLogicalFile(this.physicalFile, j2, j3, subtaskKey);
                this.logicalFile.advanceLastCheckpointId(j);
                synchronized (FileMergingSnapshotManagerBase.this.lock) {
                    ((Set) FileMergingSnapshotManagerBase.this.uploadedStates.computeIfAbsent(Long.valueOf(j), l -> {
                        return new HashSet();
                    })).add(this.logicalFile);
                }
                this.physicalFile.incSize(j3);
                FileMergingSnapshotManagerBase.this.returnPhysicalFileForNextReuse(subtaskKey, j, this.physicalFile);
                return new SegmentFileStateHandle(this.physicalFile.getFilePath(), j2, j3, checkpointedStateScope, this.logicalFile.getFileId());
            }

            @Override // org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream.FileMergingSnapshotManagerProxy
            public void closeStreamExceptionally() throws IOException {
                if (this.physicalFile != null) {
                    if (this.logicalFile != null) {
                        this.logicalFile.discardWithCheckpointId(j);
                    } else {
                        this.physicalFile.close();
                        this.physicalFile.deleteIfNecessary();
                    }
                }
            }
        });
    }

    private void updateFileCreationMetrics(Path path) {
        LOG.debug("Create a new physical file {} for checkpoint file merging.", path);
    }

    protected Path generatePhysicalFilePath(Path path) {
        return new Path(path, UUID.randomUUID().toString());
    }

    @VisibleForTesting
    boolean isResponsibleForFile(Path path) {
        Path parent = path.getParent();
        return parent.equals(this.managedExclusiveStateDir) || this.managedSharedStateDir.containsValue(parent);
    }

    protected final void deletePhysicalFile(Path path) {
        this.ioExecutor.execute(() -> {
            try {
                this.fs.delete(path, false);
                LOG.debug("Physical file deleted: {}.", path);
            } catch (IOException e) {
                LOG.warn("Fail to delete file: {}", path);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final PhysicalFilePool createPhysicalPool() {
        switch (this.filePoolType) {
            case NON_BLOCKING:
                return new NonBlockingPhysicalFilePool(this.maxPhysicalFileSize, this::createPhysicalFile);
            case BLOCKING:
                return new BlockingPhysicalFilePool(this.maxPhysicalFileSize, this::createPhysicalFile);
            default:
                throw new UnsupportedOperationException("Unsupported type of physical file pool: " + this.filePoolType);
        }
    }

    @Nonnull
    protected abstract PhysicalFile getOrCreatePhysicalFileForCheckpoint(FileMergingSnapshotManager.SubtaskKey subtaskKey, long j, CheckpointedStateScope checkpointedStateScope) throws IOException;

    protected abstract void returnPhysicalFileForNextReuse(FileMergingSnapshotManager.SubtaskKey subtaskKey, long j, PhysicalFile physicalFile) throws IOException;

    protected abstract void discardCheckpoint(long j) throws IOException;

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public void notifyCheckpointComplete(FileMergingSnapshotManager.SubtaskKey subtaskKey, long j) throws Exception {
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public void notifyCheckpointAborted(FileMergingSnapshotManager.SubtaskKey subtaskKey, long j) throws Exception {
        synchronized (this.lock) {
            Set<LogicalFile> set = this.uploadedStates.get(Long.valueOf(j));
            if (set == null) {
                return;
            }
            if (discardLogicalFiles(subtaskKey, j, set)) {
                this.uploadedStates.remove(Long.valueOf(j));
            }
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public void notifyCheckpointSubsumed(FileMergingSnapshotManager.SubtaskKey subtaskKey, long j) throws Exception {
        synchronized (this.lock) {
            Iterator<Map.Entry<Long, Set<LogicalFile>>> it = this.uploadedStates.headMap(Long.valueOf(j), true).entrySet().iterator();
            while (it.hasNext()) {
                if (discardLogicalFiles(subtaskKey, j, it.next().getValue())) {
                    it.remove();
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public void reusePreviousStateHandle(long j, Collection<? extends StreamStateHandle> collection) {
        LogicalFile logicalFile;
        for (StreamStateHandle streamStateHandle : collection) {
            if ((streamStateHandle instanceof SegmentFileStateHandle) && (logicalFile = this.knownLogicalFiles.get(((SegmentFileStateHandle) streamStateHandle).getLogicalFileId())) != null) {
                logicalFile.advanceLastCheckpointId(j);
            }
        }
    }

    private boolean discardLogicalFiles(FileMergingSnapshotManager.SubtaskKey subtaskKey, long j, Set<LogicalFile> set) throws Exception {
        Iterator<LogicalFile> it = set.iterator();
        while (it.hasNext()) {
            LogicalFile next = it.next();
            if (next.getSubtaskKey().equals(subtaskKey) && next.getLastUsedCheckpointID() <= j) {
                next.discardWithCheckpointId(j);
                it.remove();
                this.knownLogicalFiles.remove(next.getFileId());
            }
        }
        if (!set.isEmpty()) {
            return false;
        }
        discardCheckpoint(j);
        return true;
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public Path getManagedDir(FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope checkpointedStateScope) {
        return checkpointedStateScope.equals(CheckpointedStateScope.SHARED) ? this.managedSharedStateDir.get(subtaskKey) : this.managedExclusiveStateDir;
    }

    @Override // org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager
    public DirectoryStreamStateHandle getManagedDirStateHandle(FileMergingSnapshotManager.SubtaskKey subtaskKey, CheckpointedStateScope checkpointedStateScope) {
        return checkpointedStateScope.equals(CheckpointedStateScope.SHARED) ? this.managedSharedStateDirHandles.get(subtaskKey) : this.managedExclusiveStateDirHandle;
    }

    static boolean shouldSyncAfterClosingLogicalFile(FileSystem fileSystem) {
        return true;
    }

    private void createManagedDirectory(Path path) {
        FileStatus fileStatus = null;
        try {
            try {
                fileStatus = this.fs.getFileStatus(path);
            } catch (FileNotFoundException e) {
            }
            if (fileStatus == null) {
                this.fs.mkdirs(path);
                LOG.info("Created a directory {} for checkpoint file-merging.", path);
            } else {
                if (!fileStatus.isDir()) {
                    throw new FlinkRuntimeException("The managed path " + path + " for file-merging is occupied by another file. Cannot create directory.");
                }
                LOG.info("Reusing previous directory {} for checkpoint file-merging.", path);
            }
        } catch (IOException e2) {
            throw new FlinkRuntimeException("Cannot create directory " + path + " for file-merging ", e2);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
    }

    @VisibleForTesting
    public LogicalFile getLogicalFile(LogicalFile.LogicalFileId logicalFileId) {
        return this.knownLogicalFiles.get(logicalFileId);
    }
}
