/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst.datatransfer;

import java.io.IOException;
import java.util.Collection;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
import org.apache.flink.state.forst.StateHandleTransferSpec;
import org.apache.flink.state.forst.datatransfer.CopyDataTransferStrategy;
import org.apache.flink.state.forst.datatransfer.DataTransferStrategy;
import org.apache.flink.state.forst.datatransfer.ReusableDataTransferStrategy;
import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataTransferStrategyBuilder {
    private static final Logger LOG = LoggerFactory.getLogger(DataTransferStrategyBuilder.class);

    public static DataTransferStrategy buildForSnapshot(SnapshotType.SharingFilesStrategy sharingFilesStrategy, @Nullable ForStFlinkFileSystem forStFlinkFileSystem, @Nullable CheckpointStreamFactory checkpointStreamFactory, boolean forceCopy) {
        return DataTransferStrategyBuilder.buildForSnapshot(sharingFilesStrategy, forStFlinkFileSystem, DataTransferStrategyBuilder.isDbPathUnderCheckpointPathForSnapshot(forStFlinkFileSystem, checkpointStreamFactory), forceCopy);
    }

    @VisibleForTesting
    static DataTransferStrategy buildForSnapshot(SnapshotType.SharingFilesStrategy sharingFilesStrategy, @Nullable ForStFlinkFileSystem forStFlinkFileSystem, boolean isDbPathUnderCheckpointPathForSnapshot, boolean forceCopy) {
        if (forceCopy || sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING || forStFlinkFileSystem == null || !isDbPathUnderCheckpointPathForSnapshot) {
            CopyDataTransferStrategy strategy = forStFlinkFileSystem == null ? new CopyDataTransferStrategy() : new CopyDataTransferStrategy(forStFlinkFileSystem);
            LOG.info("Build DataTransferStrategy for Snapshot: {}", (Object)strategy);
            return strategy;
        }
        ReusableDataTransferStrategy strategy = new ReusableDataTransferStrategy(forStFlinkFileSystem);
        LOG.info("Build DataTransferStrategy for Snapshot: {}", (Object)strategy);
        return strategy;
    }

    private static boolean isDbPathUnderCheckpointPathForSnapshot(@Nullable ForStFlinkFileSystem forStFlinkFileSystem, @Nullable CheckpointStreamFactory checkpointStreamFactory) {
        FileSystem cpSharedFs;
        if (forStFlinkFileSystem == null) {
            return false;
        }
        if (checkpointStreamFactory == null || checkpointStreamFactory.getClass() != FsCheckpointStorageLocation.class) {
            return false;
        }
        FsCheckpointStorageLocation fsCheckpointStreamFactory = (FsCheckpointStorageLocation)checkpointStreamFactory;
        Path cpSharedPath = fsCheckpointStreamFactory.getTargetPath(CheckpointedStateScope.SHARED);
        try {
            cpSharedFs = cpSharedPath.getFileSystem();
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to get FileSystem from cpSharedPath: " + cpSharedPath, e);
        }
        if (!cpSharedFs.getUri().equals(forStFlinkFileSystem.getDelegateFS().getUri())) {
            return false;
        }
        String dbRemotePathStr = forStFlinkFileSystem.getRemoteBase();
        String cpSharedPathStr = cpSharedPath.toString();
        return cpSharedPathStr.equals(dbRemotePathStr) || dbRemotePathStr.startsWith(cpSharedPathStr);
    }

    public static DataTransferStrategy buildForRestore(@Nullable ForStFlinkFileSystem forStFlinkFileSystem, Collection<StateHandleTransferSpec> specs, RecoveryClaimMode recoveryClaimMode) {
        FileSystem cpSharedFs = DataTransferStrategyBuilder.getSharedStateFileSystem(specs);
        if (forStFlinkFileSystem == null || cpSharedFs == null || !forStFlinkFileSystem.getUri().equals(cpSharedFs.getUri()) || recoveryClaimMode == RecoveryClaimMode.NO_CLAIM) {
            CopyDataTransferStrategy strategy = forStFlinkFileSystem == null ? new CopyDataTransferStrategy() : new CopyDataTransferStrategy(forStFlinkFileSystem);
            LOG.info("Build DataTransferStrategy for Restore: {}, forStFlinkFileSystem: {}, cpSharedFs:{}, recoveryClaimMode:{}", new Object[]{strategy, forStFlinkFileSystem, cpSharedFs, recoveryClaimMode});
            return strategy;
        }
        ReusableDataTransferStrategy strategy = new ReusableDataTransferStrategy(forStFlinkFileSystem);
        LOG.info("Build DataTransferStrategy for Restore: {}", (Object)strategy);
        return strategy;
    }

    @Nullable
    private static FileSystem getSharedStateFileSystem(Collection<StateHandleTransferSpec> specs) {
        for (StateHandleTransferSpec spec : specs) {
            IncrementalRemoteKeyedStateHandle stateHandle = spec.getStateHandle();
            for (IncrementalKeyedStateHandle.HandleAndLocalPath handleAndLocalPath : stateHandle.getSharedState()) {
                StreamStateHandle handle = handleAndLocalPath.getHandle();
                if (!(handle instanceof FileStateHandle)) continue;
                Path dbRemotePath = ((FileStateHandle)handle).getFilePath();
                try {
                    return dbRemotePath.getFileSystem();
                }
                catch (IOException e) {
                    throw new RuntimeException("Failed to get FileSystem from handle: " + handle, e);
                }
            }
        }
        return null;
    }
}

