/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.shuffle;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleMasterSnapshot;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ShuffleMasterSnapshotUtil {
    private static final Logger LOG = LoggerFactory.getLogger(ShuffleMasterSnapshotUtil.class);

    public static void restoreOrSnapshotShuffleMaster(ShuffleMaster<?> shuffleMaster, Configuration configuration, Executor ioExecutor) throws IOException {
        boolean isJobRecoveryEnabled;
        boolean bl = isJobRecoveryEnabled = configuration.get(BatchExecutionOptions.JOB_RECOVERY_ENABLED) != false && shuffleMaster.supportsBatchSnapshot();
        if (isJobRecoveryEnabled) {
            String clusterId = configuration.get(HighAvailabilityOptions.HA_CLUSTER_ID);
            Path path = new Path(HighAvailabilityServicesUtils.getClusterHighAvailableStoragePath(configuration), "shuffleMaster-snapshot");
            if (ShuffleMasterSnapshotUtil.isShuffleMasterSnapshotExist(path, clusterId)) {
                ShuffleMasterSnapshot snapshot = ShuffleMasterSnapshotUtil.readSnapshot(path, clusterId);
                LOG.info("Restore shuffle master state from cluster level snapshot.");
                shuffleMaster.restoreState(snapshot);
            } else {
                shuffleMaster.restoreState(null);
                CompletableFuture snapshotFuture = new CompletableFuture();
                ioExecutor.execute(() -> {
                    LOG.info("Take a cluster level shuffle master snapshot.");
                    shuffleMaster.snapshotState(snapshotFuture);
                    snapshotFuture.thenAccept(shuffleMasterSnapshot -> {
                        try {
                            ShuffleMasterSnapshotUtil.writeSnapshot(shuffleMasterSnapshot, path, clusterId);
                        }
                        catch (IOException e) {
                            LOG.warn("Write cluster level shuffle master snapshot failed.", (Throwable)e);
                        }
                    });
                });
            }
        }
    }

    private static void writeSnapshot(ShuffleMasterSnapshot snapshot, Path workingDir, String clusterId) throws IOException {
        FileSystem fileSystem = workingDir.getFileSystem();
        if (fileSystem.exists(workingDir)) {
            throw new IOException("Shuffle master dir " + workingDir + " already exists.");
        }
        fileSystem.mkdirs(workingDir);
        LOG.info("Create shuffle master snapshot dir {}.", (Object)workingDir);
        Path writeFile = new Path(workingDir, clusterId);
        try (FSDataOutputStream outputStream = fileSystem.create(writeFile, FileSystem.WriteMode.NO_OVERWRITE);){
            byte[] bytes = InstantiationUtil.serializeObject(snapshot);
            ShuffleMasterSnapshotUtil.writeInt(outputStream, bytes.length);
            outputStream.write(bytes);
        }
    }

    private static void writeInt(FSDataOutputStream outputStream, int num) throws IOException {
        outputStream.write(num >>> 24 & 0xFF);
        outputStream.write(num >>> 16 & 0xFF);
        outputStream.write(num >>> 8 & 0xFF);
        outputStream.write(num & 0xFF);
    }

    @VisibleForTesting
    static boolean isShuffleMasterSnapshotExist(Path workingDir, String clusterId) throws IOException {
        FileSystem fileSystem = workingDir.getFileSystem();
        return fileSystem.exists(new Path(workingDir, clusterId));
    }

    @VisibleForTesting
    static ShuffleMasterSnapshot readSnapshot(Path workingDir, String clusterId) throws IOException {
        ShuffleMasterSnapshot shuffleMasterSnapshot;
        FileSystem fileSystem = workingDir.getFileSystem();
        Path file = new Path(workingDir, clusterId);
        DataInputStream inputStream = new DataInputStream(fileSystem.open(file));
        try {
            int byteLength = inputStream.readInt();
            byte[] bytes = new byte[byteLength];
            inputStream.readFully(bytes);
            shuffleMasterSnapshot = (ShuffleMasterSnapshot)InstantiationUtil.deserializeObject(bytes, ClassLoader.getSystemClassLoader());
        }
        catch (Throwable throwable) {
            try {
                try {
                    inputStream.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
                throw throwable;
            }
            catch (ClassNotFoundException exception) {
                throw new IOException("Deserialize ShuffleMasterSnapshot failed.", exception);
            }
        }
        inputStream.close();
        return shuffleMasterSnapshot;
    }
}

