/*
 * Decompiled with CFR 0.152.
 */
package io.milvus.bulkwriter;

import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobStorageException;
import com.google.common.collect.Lists;
import com.google.gson.JsonObject;
import io.milvus.bulkwriter.LocalBulkWriter;
import io.milvus.bulkwriter.RemoteBulkWriterParam;
import io.milvus.bulkwriter.connect.AzureConnectParam;
import io.milvus.bulkwriter.connect.S3ConnectParam;
import io.milvus.bulkwriter.connect.StorageConnectParam;
import io.milvus.bulkwriter.storage.StorageClient;
import io.milvus.bulkwriter.storage.client.AzureStorageClient;
import io.milvus.bulkwriter.storage.client.MinioStorageClient;
import io.milvus.common.utils.ExceptionUtils;
import io.minio.errors.ErrorResponseException;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteBulkWriter
extends LocalBulkWriter {
    private static final Logger logger = LoggerFactory.getLogger(RemoteBulkWriter.class);
    private String remotePath;
    private StorageConnectParam connectParam;
    private StorageClient storageClient;
    private List<List<String>> remoteFiles;

    public RemoteBulkWriter(RemoteBulkWriterParam bulkWriterParam) throws IOException {
        super(bulkWriterParam.getCollectionSchema(), bulkWriterParam.getChunkSize(), bulkWriterParam.getFileType(), RemoteBulkWriter.generatorLocalPath(), bulkWriterParam.getConfig());
        Path path = Paths.get(bulkWriterParam.getRemotePath(), new String[0]);
        Path remoteDirPath = path.resolve(this.getUUID());
        this.remotePath = remoteDirPath.toString();
        this.connectParam = bulkWriterParam.getConnectParam();
        this.getStorageClient();
        this.remoteFiles = Lists.newArrayList();
        logger.info("Remote buffer writer initialized, target path: {}", (Object)this.remotePath);
    }

    @Override
    public void appendRow(JsonObject rowData) throws IOException, InterruptedException {
        super.appendRow(rowData);
    }

    @Override
    public void commit(boolean async) throws InterruptedException {
        super.commit(async);
    }

    @Override
    protected String getDataPath() {
        return this.remotePath;
    }

    @Override
    public List<List<String>> getBatchFiles() {
        return this.remoteFiles;
    }

    @Override
    protected void exit() throws InterruptedException {
        super.exit();
        Path parentPath = Paths.get(this.localPath, new String[0]).getParent();
        if (parentPath.toFile().exists() && RemoteBulkWriter.isEmptyDirectory(parentPath)) {
            try {
                Files.delete(parentPath);
                logger.info("Delete empty directory: " + parentPath);
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private static boolean isEmptyDirectory(Path directory) {
        try {
            return !Files.walk(directory, 1, FileVisitOption.FOLLOW_LINKS).skip(1L).findFirst().isPresent();
        }
        catch (IOException e) {
            e.printStackTrace();
            return false;
        }
    }

    private void getStorageClient() {
        if (this.storageClient != null) {
            return;
        }
        if (this.connectParam instanceof S3ConnectParam) {
            S3ConnectParam s3ConnectParam = (S3ConnectParam)this.connectParam;
            this.storageClient = MinioStorageClient.getStorageClient(s3ConnectParam.getCloudName(), s3ConnectParam.getEndpoint(), s3ConnectParam.getAccessKey(), s3ConnectParam.getSecretKey(), s3ConnectParam.getSessionToken(), s3ConnectParam.getRegion(), s3ConnectParam.getHttpClient());
        } else if (this.connectParam instanceof AzureConnectParam) {
            AzureConnectParam azureConnectParam = (AzureConnectParam)this.connectParam;
            this.storageClient = AzureStorageClient.getStorageClient(azureConnectParam.getConnStr(), azureConnectParam.getAccountUrl(), azureConnectParam.getCredential());
        }
    }

    private void rmLocal(String file) {
        try {
            Path filePath = Paths.get(file, new String[0]);
            filePath.toFile().delete();
            Path parentDir = filePath.getParent();
            if (parentDir != null && !parentDir.toString().equals(this.localPath)) {
                try {
                    Files.delete(parentDir);
                    logger.info("Delete empty directory: " + parentDir);
                }
                catch (IOException ex) {
                    logger.warn("Failed to delete empty directory: " + parentDir);
                }
            }
        }
        catch (Exception ex) {
            logger.warn("Failed to delete local file: " + file);
        }
    }

    @Override
    protected void callBack(List<String> fileList) {
        ArrayList<String> remoteFileList = new ArrayList<String>();
        try {
            if (!this.bucketExists()) {
                ExceptionUtils.throwUnExpectedException((String)"Blob storage bucket/container doesn't exist");
            }
            for (String filePath : fileList) {
                String relativeFilePath = filePath.replace(super.getDataPath(), "");
                String minioFilePath = RemoteBulkWriter.getMinioFilePath(this.remotePath, relativeFilePath);
                if (this.objectExists(minioFilePath)) {
                    logger.info(String.format("Remote file %s already exists, will overwrite it", minioFilePath));
                }
                this.uploadObject(filePath, minioFilePath);
                remoteFileList.add(minioFilePath);
                this.rmLocal(filePath);
            }
        }
        catch (Exception e) {
            ExceptionUtils.throwUnExpectedException((String)String.format("Failed to upload files, error: %s", e));
        }
        logger.info("Successfully upload files: " + fileList);
        this.remoteFiles.add(remoteFileList);
    }

    @Override
    public void close() throws Exception {
        logger.info("execute remaining actions to prevent loss of memory data or residual empty directories.");
        this.exit();
        logger.info(String.format("RemoteBulkWriter done! output remote files: %s", this.getBatchFiles()));
    }

    private void getObjectEntity(String objectName) throws Exception {
        if (this.connectParam instanceof S3ConnectParam) {
            S3ConnectParam s3ConnectParam = (S3ConnectParam)this.connectParam;
            this.storageClient.getObjectEntity(s3ConnectParam.getBucketName(), objectName);
        } else if (this.connectParam instanceof AzureConnectParam) {
            AzureConnectParam azureConnectParam = (AzureConnectParam)this.connectParam;
            this.storageClient.getObjectEntity(azureConnectParam.getContainerName(), objectName);
        }
        ExceptionUtils.throwUnExpectedException((String)"Blob storage client is not initialized");
    }

    private boolean objectExists(String objectName) throws Exception {
        try {
            this.getObjectEntity(objectName);
        }
        catch (ErrorResponseException e) {
            if ("NoSuchKey".equals(e.errorResponse().code())) {
                return false;
            }
            String msg = String.format("Failed to stat MinIO/S3 object %s, error: %s", objectName, e.errorResponse().message());
            ExceptionUtils.throwUnExpectedException((String)msg);
        }
        catch (BlobStorageException e) {
            if (BlobErrorCode.BLOB_NOT_FOUND == e.getErrorCode()) {
                return false;
            }
            String msg = String.format("Failed to stat Azure object %s, error: %s", objectName, e.getServiceMessage());
            ExceptionUtils.throwUnExpectedException((String)msg);
        }
        catch (ExecutionException e) {
            if (e.getCause().getCause() instanceof ErrorResponseException && "NoSuchKey".equals(((ErrorResponseException)e.getCause().getCause()).errorResponse().code())) {
                return false;
            }
            String msg = String.format("Failed to stat MinIO/S3 object %s, error: %s", objectName, e.getCause().getMessage());
            ExceptionUtils.throwUnExpectedException((String)msg);
        }
        return true;
    }

    private boolean bucketExists() throws Exception {
        if (this.connectParam instanceof S3ConnectParam) {
            S3ConnectParam s3ConnectParam = (S3ConnectParam)this.connectParam;
            return this.storageClient.checkBucketExist(s3ConnectParam.getBucketName());
        }
        if (this.connectParam instanceof AzureConnectParam) {
            AzureConnectParam azureConnectParam = (AzureConnectParam)this.connectParam;
            return this.storageClient.checkBucketExist(azureConnectParam.getContainerName());
        }
        ExceptionUtils.throwUnExpectedException((String)"Blob storage client is not initialized");
        return false;
    }

    private void uploadObject(String filePath, String objectName) throws Exception {
        logger.info(String.format("Prepare to upload %s to %s", filePath, objectName));
        File file = new File(filePath);
        if (this.connectParam instanceof S3ConnectParam) {
            S3ConnectParam s3ConnectParam = (S3ConnectParam)this.connectParam;
            this.storageClient.putObject(file, s3ConnectParam.getBucketName(), objectName);
        } else if (this.connectParam instanceof AzureConnectParam) {
            AzureConnectParam azureConnectParam = (AzureConnectParam)this.connectParam;
            this.storageClient.putObject(file, azureConnectParam.getContainerName(), objectName);
        } else {
            ExceptionUtils.throwUnExpectedException((String)"Blob storage client is not initialized");
        }
        logger.info(String.format("Upload file %s to %s", filePath, objectName));
    }

    private static String generatorLocalPath() {
        Path currentWorkingDirectory = Paths.get("", new String[0]).toAbsolutePath();
        Path currentScriptPath = currentWorkingDirectory.resolve("bulk_writer");
        return currentScriptPath.toString();
    }

    private static String getMinioFilePath(String remotePath, String relativeFilePath) {
        remotePath = remotePath.startsWith("/") ? remotePath.substring(1) : remotePath;
        Path remote = Paths.get(remotePath, new String[0]);
        relativeFilePath = relativeFilePath.startsWith("/") ? relativeFilePath.substring(1) : relativeFilePath;
        Path relative = Paths.get(relativeFilePath, new String[0]);
        Path joinedPath = remote.resolve(relative);
        return joinedPath.toString();
    }
}

